From 0440f14179ea3bda3431565cda53245946c038f3 Mon Sep 17 00:00:00 2001 From: David Ackroyd Date: Mon, 18 May 2020 16:16:09 +1000 Subject: [PATCH] Binance: Fix Orderbook Management (#506) With the incorrect implementation currently, orderbooks have incorrect entries in them which never get removed. This results in an unsynchronised state, having issues such as bid prices greater than the lowest ask prices The websocket depth updates must be subscribed to before getting a snapshot. Any updates from the websocket that have earlier update ids must be discarded. https://binance-docs.github.io/apidocs/spot/en/#how-to-manage-a-local-order-book-correctly Signed-off-by: David Ackroyd --- exchanges/binance/binance_test.go | 96 ++++++++++++++++--- exchanges/binance/binance_websocket.go | 40 +++++--- exchanges/orderbook/orderbook_types.go | 1 + .../websocket/wsorderbook/wsorderbook.go | 2 + .../websocket/wsorderbook/wsorderbook_test.go | 29 ++++++ 5 files changed, 143 insertions(+), 25 deletions(-) diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 5ba80732..754a9694 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -713,31 +713,101 @@ func TestWsTradeUpdate(t *testing.T) { } func TestWsDepthUpdate(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"stream":"btcusdt@depth","data":{ + seedLastUpdateID := int64(161) + book := OrderBook{ + Asks: []OrderbookItem{ + {Price: 6621.80000000, Quantity: 0.00198100}, + {Price: 6622.14000000, Quantity: 4.00000000}, + {Price: 6622.46000000, Quantity: 2.30000000}, + {Price: 6622.47000000, Quantity: 1.18633300}, + {Price: 6622.64000000, Quantity: 4.00000000}, + {Price: 6622.73000000, Quantity: 0.02900000}, + {Price: 6622.76000000, Quantity: 0.12557700}, + {Price: 6622.81000000, Quantity: 2.08994200}, + {Price: 6622.82000000, Quantity: 0.01500000}, + {Price: 6623.17000000, Quantity: 0.16831300}, + }, + Bids: []OrderbookItem{ + {Price: 6621.55000000, Quantity: 0.16356700}, + {Price: 6621.45000000, Quantity: 0.16352600}, + {Price: 6621.41000000, Quantity: 0.86091200}, + {Price: 6621.25000000, Quantity: 0.16914100}, + {Price: 6621.23000000, Quantity: 0.09193600}, + {Price: 6621.22000000, Quantity: 0.00755100}, + {Price: 6621.13000000, Quantity: 0.08432000}, + {Price: 6621.03000000, Quantity: 0.00172000}, + {Price: 6620.94000000, Quantity: 0.30506700}, + {Price: 6620.93000000, Quantity: 0.00200000}, + }, + LastUpdateID: seedLastUpdateID, + } + + update1 := []byte(`{"stream":"btcusdt@depth","data":{ "e": "depthUpdate", - "E": 123456789, + "E": 123456788, "s": "BTCUSDT", "U": 157, "u": 160, "b": [ - [ - "0.0024", - "10" - ] + ["6621.45", "0.3"] ], "a": [ - [ - "0.0026", - "100" - ] + ["6622.46", "1.5"] ] }}`) - err := b.wsHandleData(pressXToJSON) - if err.Error() != "Binance - UpdateLocalCache error: ob.Base could not be found for Exchange Binance CurrencyPair: BTC-USDT AssetType: spot" { + p := currency.NewPairWithDelimiter("BTC", "USDT", "-") + if err := b.SeedLocalCacheWithBook(p, &book); err != nil { t.Error(err) } + + if err := b.wsHandleData(update1); err != nil { + t.Error(err) + } + + ob := b.Websocket.Orderbook.GetOrderbook(p, asset.Spot) + if exp, got := seedLastUpdateID, ob.LastUpdateID; got != exp { + t.Fatalf("Unexpected Last update id of orderbook for old update. Exp: %d, got: %d", exp, got) + } + if exp, got := 2.3, ob.Asks[2].Amount; got != exp { + t.Fatalf("Ask altered by outdated update. Exp: %f, got %f", exp, got) + } + if exp, got := 0.163526, ob.Bids[1].Amount; got != exp { + t.Fatalf("Bid altered by outdated update. Exp: %f, got %f", exp, got) + } + + update2 := []byte(`{"stream":"btcusdt@depth","data":{ + "e": "depthUpdate", + "E": 123456789, + "s": "BTCUSDT", + "U": 161, + "u": 165, + "b": [ + ["6621.45", "0.163526"] + ], + "a": [ + ["6622.46", "2.3"], + ["6622.47", "1.9"] + ] + }}`) + + if err := b.wsHandleData(update2); err != nil { + t.Error(err) + } + + ob = b.Websocket.Orderbook.GetOrderbook(p, asset.Spot) + if exp, got := int64(165), ob.LastUpdateID; got != exp { + t.Fatalf("Unexpected Last update id of orderbook for new update. Exp: %d, got: %d", exp, got) + } + if exp, got := 2.3, ob.Asks[2].Amount; got != exp { + t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got) + } + if exp, got := 1.9, ob.Asks[3].Amount; got != exp { + t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got) + } + if exp, got := 0.163526, ob.Bids[1].Amount; got != exp { + t.Fatalf("Unexpected Bid amount. Exp: %f, got %f", exp, got) + } } func TestWsBalanceUpdate(t *testing.T) { diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 8df68f6f..6a97da42 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -71,14 +71,6 @@ func (b *Binance) WsConnect() error { listenKey } - enabledPairs := b.GetEnabledPairs(asset.Spot) - for i := range enabledPairs { - err = b.SeedLocalCache(enabledPairs[i]) - if err != nil { - return err - } - } - b.WebsocketConn.URL = wsurl b.WebsocketConn.Verbose = b.Verbose @@ -93,6 +85,15 @@ func (b *Binance) WsConnect() error { MessageType: websocket.PongMessage, Delay: pingDelay, }) + + enabledPairs := b.GetEnabledPairs(asset.Spot) + for i := range enabledPairs { + err = b.SeedLocalCache(enabledPairs[i]) + if err != nil { + return err + } + } + go b.wsReadData() go b.KeepAuthKeyAlive() return nil @@ -402,8 +403,7 @@ func stringToOrderStatus(status string) (order.Status, error) { // SeedLocalCache seeds depth data func (b *Binance) SeedLocalCache(p currency.Pair) error { - var newOrderBook orderbook.Base - orderbookNew, err := b.GetOrderBook( + ob, err := b.GetOrderBook( OrderBookDataRequestParams{ Symbol: b.FormatExchangeCurrency(p, asset.Spot).String(), Limit: 1000, @@ -412,6 +412,11 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { return err } + return b.SeedLocalCacheWithBook(p, &ob) +} + +func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBook) error { + var newOrderBook orderbook.Base for i := range orderbookNew.Bids { newOrderBook.Bids = append(newOrderBook.Bids, orderbook.Item{ Amount: orderbookNew.Bids[i].Quantity, @@ -428,12 +433,24 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { newOrderBook.Pair = p newOrderBook.AssetType = asset.Spot newOrderBook.ExchangeName = b.Name + newOrderBook.LastUpdateID = orderbookNew.LastUpdateID return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // UpdateLocalCache updates and returns the most recent iteration of the orderbook func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error { + currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot), + b.GetPairFormat(asset.Spot, true)) + currentBook := b.Websocket.Orderbook.GetOrderbook(currencyPair, asset.Spot) + + // Drop any event where u is <= lastUpdateId in the snapshot. + // The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1. + // While listening to the stream, each new event's U should be equal to the previous event's u+1. + if wsdp.LastUpdateID <= currentBook.LastUpdateID { + return nil + } + var updateBid, updateAsk []orderbook.Item for i := range wsdp.UpdateBids { p, err := strconv.ParseFloat(wsdp.UpdateBids[i][0].(string), 64) @@ -460,8 +477,7 @@ func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error { updateAsk = append(updateAsk, orderbook.Item{Price: p, Amount: a}) } - currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot), - b.GetPairFormat(asset.Spot, true)) + return b.Websocket.Orderbook.Update(&wsorderbook.WebsocketOrderbookUpdate{ Bids: updateBid, Asks: updateAsk, diff --git a/exchanges/orderbook/orderbook_types.go b/exchanges/orderbook/orderbook_types.go index 3686adcc..4cd72a2d 100644 --- a/exchanges/orderbook/orderbook_types.go +++ b/exchanges/orderbook/orderbook_types.go @@ -62,6 +62,7 @@ type Base struct { Bids []Item `json:"bids"` Asks []Item `json:"asks"` LastUpdated time.Time `json:"lastUpdated"` + LastUpdateID int64 `json:"lastUpdateId"` AssetType asset.Item `json:"assetType"` ExchangeName string `json:"exchangeName"` } diff --git a/exchanges/websocket/wsorderbook/wsorderbook.go b/exchanges/websocket/wsorderbook/wsorderbook.go index d7689ae0..169eb0a2 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook.go +++ b/exchanges/websocket/wsorderbook/wsorderbook.go @@ -94,6 +94,8 @@ func (w *WebsocketOrderbookLocal) processBufferUpdate(o *orderbook.Base, u *Webs } func (w *WebsocketOrderbookLocal) processObUpdate(o *orderbook.Base, u *WebsocketOrderbookUpdate) { + o.LastUpdateID = u.UpdateID + if w.updateEntriesByID { w.updateByIDAndAction(o, u) } else { diff --git a/exchanges/websocket/wsorderbook/wsorderbook_test.go b/exchanges/websocket/wsorderbook/wsorderbook_test.go index 95f6b7f2..0393712e 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook_test.go +++ b/exchanges/websocket/wsorderbook/wsorderbook_test.go @@ -486,6 +486,35 @@ func TestOutOfOrderIDs(t *testing.T) { } } +func TestOrderbookLastUpdateID(t *testing.T) { + obl, _, _, err := createSnapshot() + if err != nil { + t.Fatal(err) + } + if exp := float64(1000); itemArray[0][0].Price != exp { + t.Errorf("expected sorted price to be %f, received: %v", + exp, itemArray[1][0].Price) + } + + for i := range itemArray { + asks := itemArray[i] + err = obl.Update(&WebsocketOrderbookUpdate{ + Asks: asks, + Pair: cp, + UpdateID: int64(i) + 1, + Asset: asset.Spot, + }) + if err != nil { + t.Fatal(err) + } + } + + ob := obl.GetOrderbook(cp, asset.Spot) + if exp := len(itemArray); ob.LastUpdateID != int64(exp) { + t.Errorf("expected last update id to be %d, received: %v", exp, ob.LastUpdateID) + } +} + // TestRunUpdateWithoutSnapshot logic test func TestRunUpdateWithoutSnapshot(t *testing.T) { var obl WebsocketOrderbookLocal