diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 5ba80732..754a9694 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -713,31 +713,101 @@ func TestWsTradeUpdate(t *testing.T) { } func TestWsDepthUpdate(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"stream":"btcusdt@depth","data":{ + seedLastUpdateID := int64(161) + book := OrderBook{ + Asks: []OrderbookItem{ + {Price: 6621.80000000, Quantity: 0.00198100}, + {Price: 6622.14000000, Quantity: 4.00000000}, + {Price: 6622.46000000, Quantity: 2.30000000}, + {Price: 6622.47000000, Quantity: 1.18633300}, + {Price: 6622.64000000, Quantity: 4.00000000}, + {Price: 6622.73000000, Quantity: 0.02900000}, + {Price: 6622.76000000, Quantity: 0.12557700}, + {Price: 6622.81000000, Quantity: 2.08994200}, + {Price: 6622.82000000, Quantity: 0.01500000}, + {Price: 6623.17000000, Quantity: 0.16831300}, + }, + Bids: []OrderbookItem{ + {Price: 6621.55000000, Quantity: 0.16356700}, + {Price: 6621.45000000, Quantity: 0.16352600}, + {Price: 6621.41000000, Quantity: 0.86091200}, + {Price: 6621.25000000, Quantity: 0.16914100}, + {Price: 6621.23000000, Quantity: 0.09193600}, + {Price: 6621.22000000, Quantity: 0.00755100}, + {Price: 6621.13000000, Quantity: 0.08432000}, + {Price: 6621.03000000, Quantity: 0.00172000}, + {Price: 6620.94000000, Quantity: 0.30506700}, + {Price: 6620.93000000, Quantity: 0.00200000}, + }, + LastUpdateID: seedLastUpdateID, + } + + update1 := []byte(`{"stream":"btcusdt@depth","data":{ "e": "depthUpdate", - "E": 123456789, + "E": 123456788, "s": "BTCUSDT", "U": 157, "u": 160, "b": [ - [ - "0.0024", - "10" - ] + ["6621.45", "0.3"] ], "a": [ - [ - "0.0026", - "100" - ] + ["6622.46", "1.5"] ] }}`) - err := b.wsHandleData(pressXToJSON) - if err.Error() != "Binance - UpdateLocalCache error: ob.Base could not be found for Exchange Binance CurrencyPair: BTC-USDT AssetType: spot" { + p := currency.NewPairWithDelimiter("BTC", "USDT", "-") + if err := b.SeedLocalCacheWithBook(p, &book); err != nil { t.Error(err) } + + if err := b.wsHandleData(update1); err != nil { + t.Error(err) + } + + ob := b.Websocket.Orderbook.GetOrderbook(p, asset.Spot) + if exp, got := seedLastUpdateID, ob.LastUpdateID; got != exp { + t.Fatalf("Unexpected Last update id of orderbook for old update. Exp: %d, got: %d", exp, got) + } + if exp, got := 2.3, ob.Asks[2].Amount; got != exp { + t.Fatalf("Ask altered by outdated update. Exp: %f, got %f", exp, got) + } + if exp, got := 0.163526, ob.Bids[1].Amount; got != exp { + t.Fatalf("Bid altered by outdated update. Exp: %f, got %f", exp, got) + } + + update2 := []byte(`{"stream":"btcusdt@depth","data":{ + "e": "depthUpdate", + "E": 123456789, + "s": "BTCUSDT", + "U": 161, + "u": 165, + "b": [ + ["6621.45", "0.163526"] + ], + "a": [ + ["6622.46", "2.3"], + ["6622.47", "1.9"] + ] + }}`) + + if err := b.wsHandleData(update2); err != nil { + t.Error(err) + } + + ob = b.Websocket.Orderbook.GetOrderbook(p, asset.Spot) + if exp, got := int64(165), ob.LastUpdateID; got != exp { + t.Fatalf("Unexpected Last update id of orderbook for new update. Exp: %d, got: %d", exp, got) + } + if exp, got := 2.3, ob.Asks[2].Amount; got != exp { + t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got) + } + if exp, got := 1.9, ob.Asks[3].Amount; got != exp { + t.Fatalf("Unexpected Ask amount. Exp: %f, got %f", exp, got) + } + if exp, got := 0.163526, ob.Bids[1].Amount; got != exp { + t.Fatalf("Unexpected Bid amount. Exp: %f, got %f", exp, got) + } } func TestWsBalanceUpdate(t *testing.T) { diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 8df68f6f..6a97da42 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -71,14 +71,6 @@ func (b *Binance) WsConnect() error { listenKey } - enabledPairs := b.GetEnabledPairs(asset.Spot) - for i := range enabledPairs { - err = b.SeedLocalCache(enabledPairs[i]) - if err != nil { - return err - } - } - b.WebsocketConn.URL = wsurl b.WebsocketConn.Verbose = b.Verbose @@ -93,6 +85,15 @@ func (b *Binance) WsConnect() error { MessageType: websocket.PongMessage, Delay: pingDelay, }) + + enabledPairs := b.GetEnabledPairs(asset.Spot) + for i := range enabledPairs { + err = b.SeedLocalCache(enabledPairs[i]) + if err != nil { + return err + } + } + go b.wsReadData() go b.KeepAuthKeyAlive() return nil @@ -402,8 +403,7 @@ func stringToOrderStatus(status string) (order.Status, error) { // SeedLocalCache seeds depth data func (b *Binance) SeedLocalCache(p currency.Pair) error { - var newOrderBook orderbook.Base - orderbookNew, err := b.GetOrderBook( + ob, err := b.GetOrderBook( OrderBookDataRequestParams{ Symbol: b.FormatExchangeCurrency(p, asset.Spot).String(), Limit: 1000, @@ -412,6 +412,11 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { return err } + return b.SeedLocalCacheWithBook(p, &ob) +} + +func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBook) error { + var newOrderBook orderbook.Base for i := range orderbookNew.Bids { newOrderBook.Bids = append(newOrderBook.Bids, orderbook.Item{ Amount: orderbookNew.Bids[i].Quantity, @@ -428,12 +433,24 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { newOrderBook.Pair = p newOrderBook.AssetType = asset.Spot newOrderBook.ExchangeName = b.Name + newOrderBook.LastUpdateID = orderbookNew.LastUpdateID return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // UpdateLocalCache updates and returns the most recent iteration of the orderbook func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error { + currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot), + b.GetPairFormat(asset.Spot, true)) + currentBook := b.Websocket.Orderbook.GetOrderbook(currencyPair, asset.Spot) + + // Drop any event where u is <= lastUpdateId in the snapshot. + // The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1. + // While listening to the stream, each new event's U should be equal to the previous event's u+1. + if wsdp.LastUpdateID <= currentBook.LastUpdateID { + return nil + } + var updateBid, updateAsk []orderbook.Item for i := range wsdp.UpdateBids { p, err := strconv.ParseFloat(wsdp.UpdateBids[i][0].(string), 64) @@ -460,8 +477,7 @@ func (b *Binance) UpdateLocalCache(wsdp *WebsocketDepthStream) error { updateAsk = append(updateAsk, orderbook.Item{Price: p, Amount: a}) } - currencyPair := currency.NewPairFromFormattedPairs(wsdp.Pair, b.GetEnabledPairs(asset.Spot), - b.GetPairFormat(asset.Spot, true)) + return b.Websocket.Orderbook.Update(&wsorderbook.WebsocketOrderbookUpdate{ Bids: updateBid, Asks: updateAsk, diff --git a/exchanges/orderbook/orderbook_types.go b/exchanges/orderbook/orderbook_types.go index 3686adcc..4cd72a2d 100644 --- a/exchanges/orderbook/orderbook_types.go +++ b/exchanges/orderbook/orderbook_types.go @@ -62,6 +62,7 @@ type Base struct { Bids []Item `json:"bids"` Asks []Item `json:"asks"` LastUpdated time.Time `json:"lastUpdated"` + LastUpdateID int64 `json:"lastUpdateId"` AssetType asset.Item `json:"assetType"` ExchangeName string `json:"exchangeName"` } diff --git a/exchanges/websocket/wsorderbook/wsorderbook.go b/exchanges/websocket/wsorderbook/wsorderbook.go index d7689ae0..169eb0a2 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook.go +++ b/exchanges/websocket/wsorderbook/wsorderbook.go @@ -94,6 +94,8 @@ func (w *WebsocketOrderbookLocal) processBufferUpdate(o *orderbook.Base, u *Webs } func (w *WebsocketOrderbookLocal) processObUpdate(o *orderbook.Base, u *WebsocketOrderbookUpdate) { + o.LastUpdateID = u.UpdateID + if w.updateEntriesByID { w.updateByIDAndAction(o, u) } else { diff --git a/exchanges/websocket/wsorderbook/wsorderbook_test.go b/exchanges/websocket/wsorderbook/wsorderbook_test.go index 95f6b7f2..0393712e 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook_test.go +++ b/exchanges/websocket/wsorderbook/wsorderbook_test.go @@ -486,6 +486,35 @@ func TestOutOfOrderIDs(t *testing.T) { } } +func TestOrderbookLastUpdateID(t *testing.T) { + obl, _, _, err := createSnapshot() + if err != nil { + t.Fatal(err) + } + if exp := float64(1000); itemArray[0][0].Price != exp { + t.Errorf("expected sorted price to be %f, received: %v", + exp, itemArray[1][0].Price) + } + + for i := range itemArray { + asks := itemArray[i] + err = obl.Update(&WebsocketOrderbookUpdate{ + Asks: asks, + Pair: cp, + UpdateID: int64(i) + 1, + Asset: asset.Spot, + }) + if err != nil { + t.Fatal(err) + } + } + + ob := obl.GetOrderbook(cp, asset.Spot) + if exp := len(itemArray); ob.LastUpdateID != int64(exp) { + t.Errorf("expected last update id to be %d, received: %v", exp, ob.LastUpdateID) + } +} + // TestRunUpdateWithoutSnapshot logic test func TestRunUpdateWithoutSnapshot(t *testing.T) { var obl WebsocketOrderbookLocal