Binance: Fix Orderbook Management (#506)

With the incorrect implementation currently, orderbooks have incorrect
 entries in them which never get removed. This results in an
 unsynchronised state, having issues such as bid prices greater than the
 lowest ask prices

The websocket depth updates must be subscribed to before getting a
 snapshot. Any updates from the websocket that have earlier update ids
 must be discarded.

https://binance-docs.github.io/apidocs/spot/en/#how-to-manage-a-local-order-book-correctly
Signed-off-by: David Ackroyd <daveo.ackroyd@gmail.com>
This commit is contained in:
David Ackroyd
2020-05-18 16:16:09 +10:00
committed by GitHub
parent 802d265d56
commit 0440f14179
5 changed files with 143 additions and 25 deletions

View File

@@ -713,31 +713,101 @@ func TestWsTradeUpdate(t *testing.T) {
}
func TestWsDepthUpdate(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{"stream":"btcusdt@depth","data":{
seedLastUpdateID := int64(161)
book := OrderBook{
Asks: []OrderbookItem{
{Price: 6621.80000000, Quantity: 0.00198100},
{Price: 6622.14000000, Quantity: 4.00000000},
{Price: 6622.46000000, Quantity: 2.30000000},
{Price: 6622.47000000, Quantity: 1.18633300},
{Price: 6622.64000000, Quantity: 4.00000000},
{Price: 6622.73000000, Quantity: 0.02900000},
{Price: 6622.76000000, Quantity: 0.12557700},
{Price: 6622.81000000, Quantity: 2.08994200},
{Price: 6622.82000000, Quantity: 0.01500000},
{Price: 6623.17000000, Quantity: 0.16831300},
},
Bids: []OrderbookItem{
{Price: 6621.55000000, Quantity: 0.16356700},
{Price: 6621.45000000, Quantity: 0.16352600},
{Price: 6621.41000000, Quantity: 0.86091200},
{Price: 6621.25000000, Quantity: 0.16914100},
{Price: 6621.23000000, Quantity: 0.09193600},
{Price: 6621.22000000, Quantity: 0.00755100},
{Price: 6621.13000000, Quantity: 0.08432000},
{Price: 6621.03000000, Quantity: 0.00172000},
{Price: 6620.94000000, Quantity: 0.30506700},
{Price: 6620.93000000, Quantity: 0.00200000},
},
LastUpdateID: seedLastUpdateID,
}
update1 := []byte(`{"stream":"btcusdt@depth","data":{
"e": "depthUpdate",
"E": 123456789,
"E": 123456788,
"s": "BTCUSDT",
"U": 157,
"u": 160,
"b": [
[
"0.0024",
"10"
]
["6621.45", "0.3"]
],
"a": [
[
"0.0026",
"100"
]
["6622.46", "1.5"]
]
}}`)
err := b.wsHandleData(pressXToJSON)
if err.Error() != "Binance - UpdateLocalCache error: ob.Base could not be found for Exchange Binance CurrencyPair: BTC-USDT AssetType: spot" {
p := currency.NewPairWithDelimiter("BTC", "USDT", "-")
if err := b.SeedLocalCacheWithBook(p, &book); err != nil {
t.Error(err)
}
if err := b.wsHandleData(update1); err != nil {
t.Error(err)
}
ob := b.Websocket.Orderbook.GetOrderbook(p, asset.Spot)
if exp, got := seedLastUpdateID, ob.LastUpdateID; got != exp {
t.Fatalf("Unexpected Last update id of orderbook for old update. Exp: %d, got: %d", exp, got)
}
if exp, got := 2.3, ob.Asks[2].Amount; got != exp {
t.Fatalf("Ask altered by outdated update. Exp: %f, got %f", exp, got)
}
if exp, got := 0.163526, ob.Bids[1].Amount; got != exp {
t.Fatalf("Bid altered by outdated update. Exp: %f, got %f", exp, got)
}
update2 := []byte(`{"stream":"btcusdt@depth","data":{
"e": "depthUpdate",
"E": 123456789,
"s": "BTCUSDT",
"U": 161,
"u": 165,
"b": [
["6621.45", "0.163526"]
],
"a": [
["6622.46", "2.3"],
["6622.47", "1.9"]
]
}}`)
if err := b.wsHandleData(update2); err != nil {
t.Error(err)
}
ob = b.Websocket.Orderbook.GetOrderbook(p, asset.Spot)
if exp, got := int64(165), ob.LastUpdateID; got != exp {
t.Fatalf("Unexpected Last update id of orderbook for new update. Exp: %d, got: %d", exp, got)
}
if exp, got := 2.3, ob.Asks[2].Amount; got != exp {
t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got)
}
if exp, got := 1.9, ob.Asks[3].Amount; got != exp {
t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got)
}
if exp, got := 0.163526, ob.Bids[1].Amount; got != exp {
t.Fatalf("Unexpected Bid amount. Exp: %f, got %f", exp, got)
}
}
func TestWsBalanceUpdate(t *testing.T) {

View File

@@ -71,14 +71,6 @@ func (b *Binance) WsConnect() error {
listenKey
}
enabledPairs := b.GetEnabledPairs(asset.Spot)
for i := range enabledPairs {
err = b.SeedLocalCache(enabledPairs[i])
if err != nil {
return err
}
}
b.WebsocketConn.URL = wsurl
b.WebsocketConn.Verbose = b.Verbose
@@ -93,6 +85,15 @@ func (b *Binance) WsConnect() error {
MessageType: websocket.PongMessage,
Delay: pingDelay,
})
enabledPairs := b.GetEnabledPairs(asset.Spot)
for i := range enabledPairs {
err = b.SeedLocalCache(enabledPairs[i])
if err != nil {
return err
}
}
go b.wsReadData()
go b.KeepAuthKeyAlive()
return nil
@@ -402,8 +403,7 @@ func stringToOrderStatus(status string) (order.Status, error) {
// SeedLocalCache seeds depth data
func (b *Binance) SeedLocalCache(p currency.Pair) error {
var newOrderBook orderbook.Base
orderbookNew, err := b.GetOrderBook(
ob, err := b.GetOrderBook(
OrderBookDataRequestParams{
Symbol: b.FormatExchangeCurrency(p, asset.Spot).String(),
Limit: 1000,
@@ -412,6 +412,11 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error {
return err
}
return b.SeedLocalCacheWithBook(p, &ob)
}
func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBook) error {
var newOrderBook orderbook.Base
for i := range orderbookNew.Bids {
newOrderBook.Bids = append(newOrderBook.Bids, orderbook.Item{
Amount: orderbookNew.Bids[i].Quantity,
@@ -428,12 +433,24 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error {
newOrderBook.Pair = p
newOrderBook.AssetType = asset.Spot
newOrderBook.ExchangeName = b.Name
newOrderBook.LastUpdateID = orderbookNew.LastUpdateID
return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// UpdateLocalCache updates and returns the most recent iteration of the orderbook
func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error {
currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot),
b.GetPairFormat(asset.Spot, true))
currentBook := b.Websocket.Orderbook.GetOrderbook(currencyPair, asset.Spot)
// Drop any event where u is <= lastUpdateId in the snapshot.
// The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1.
// While listening to the stream, each new event's U should be equal to the previous event's u+1.
if wsdp.LastUpdateID <= currentBook.LastUpdateID {
return nil
}
var updateBid, updateAsk []orderbook.Item
for i := range wsdp.UpdateBids {
p, err := strconv.ParseFloat(wsdp.UpdateBids[i][0].(string), 64)
@@ -460,8 +477,7 @@ func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error {
updateAsk = append(updateAsk, orderbook.Item{Price: p, Amount: a})
}
currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot),
b.GetPairFormat(asset.Spot, true))
return b.Websocket.Orderbook.Update(&wsorderbook.WebsocketOrderbookUpdate{
Bids: updateBid,
Asks: updateAsk,

View File

@@ -62,6 +62,7 @@ type Base struct {
Bids []Item `json:"bids"`
Asks []Item `json:"asks"`
LastUpdated time.Time `json:"lastUpdated"`
LastUpdateID int64 `json:"lastUpdateId"`
AssetType asset.Item `json:"assetType"`
ExchangeName string `json:"exchangeName"`
}

View File

@@ -94,6 +94,8 @@ func (w *WebsocketOrderbookLocal) processBufferUpdate(o *orderbook.Base, u *Webs
}
func (w *WebsocketOrderbookLocal) processObUpdate(o *orderbook.Base, u *WebsocketOrderbookUpdate) {
o.LastUpdateID = u.UpdateID
if w.updateEntriesByID {
w.updateByIDAndAction(o, u)
} else {

View File

@@ -486,6 +486,35 @@ func TestOutOfOrderIDs(t *testing.T) {
}
}
func TestOrderbookLastUpdateID(t *testing.T) {
obl, _, _, err := createSnapshot()
if err != nil {
t.Fatal(err)
}
if exp := float64(1000); itemArray[0][0].Price != exp {
t.Errorf("expected sorted price to be %f, received: %v",
exp, itemArray[1][0].Price)
}
for i := range itemArray {
asks := itemArray[i]
err = obl.Update(&WebsocketOrderbookUpdate{
Asks: asks,
Pair: cp,
UpdateID: int64(i) + 1,
Asset: asset.Spot,
})
if err != nil {
t.Fatal(err)
}
}
ob := obl.GetOrderbook(cp, asset.Spot)
if exp := len(itemArray); ob.LastUpdateID != int64(exp) {
t.Errorf("expected last update id to be %d, received: %v", exp, ob.LastUpdateID)
}
}
// TestRunUpdateWithoutSnapshot logic test
func TestRunUpdateWithoutSnapshot(t *testing.T) {
var obl WebsocketOrderbookLocal