From 4ba2c710b5ffd70b52d6680c1feb2700adf85e10 Mon Sep 17 00:00:00 2001 From: TaltaM <68383301+TaltaM@users.noreply.github.com> Date: Wed, 18 Aug 2021 03:43:46 +0200 Subject: [PATCH] Bittrex: Enable ws orderbook sync recovery (resolves #746) (#747) * [FIX] Enable ws orderbook sync recovery by: - Testing if books have been cleared - Assigning options when loading snapshot * orderbooks: remove setlastupdate method and on select depth method that updates linked list, this reduced lock contention across code base and fixes buffer bug on applying buffered updates * WS - Introduce signaling for the need to fetch the orderbook * Address nits * Update error messages to include exchange name Co-authored-by: shazbert --- exchanges/bittrex/bittrex_types.go | 7 +- exchanges/bittrex/bittrex_ws_orderbook.go | 119 ++++++++++++++++------ exchanges/orderbook/depth.go | 36 ++++--- exchanges/orderbook/depth_test.go | 53 ++++------ exchanges/orderbook/orderbook.go | 3 +- exchanges/stream/buffer/buffer.go | 38 +++++-- exchanges/stream/buffer/buffer_test.go | 4 +- 7 files changed, 166 insertions(+), 94 deletions(-) diff --git a/exchanges/bittrex/bittrex_types.go b/exchanges/bittrex/bittrex_types.go index 5984f766..860909f8 100644 --- a/exchanges/bittrex/bittrex_types.go +++ b/exchanges/bittrex/bittrex_types.go @@ -284,9 +284,10 @@ type orderbookManager struct { } type update struct { - buffer chan *OrderbookUpdateMessage - fetchingBook bool - initialSync bool + buffer chan *OrderbookUpdateMessage + fetchingBook bool + initialSync bool + needsFetchingBook bool } // job defines a synchonisation job that tells a go routine to fetch an diff --git a/exchanges/bittrex/bittrex_ws_orderbook.go b/exchanges/bittrex/bittrex_ws_orderbook.go index ca4c2d99..353086f2 100644 --- a/exchanges/bittrex/bittrex_ws_orderbook.go +++ b/exchanges/bittrex/bittrex_ws_orderbook.go @@ -93,7 +93,7 @@ func (b *Bittrex) UpdateLocalOBBuffer(update *OrderbookUpdateMessage) (bool, err err = b.applyBufferUpdate(currencyPair) if err != nil { - b.flushAndCleanup(currencyPair) + log.Errorf(log.WebsocketMgr, "%s websocket UpdateLocalOBBuffer: Could not apply buffer update\n", b.Name) } return false, err @@ -136,23 +136,40 @@ func (b *Bittrex) SeedLocalCacheWithOrderBook(p currency.Pair, sequence int64, o // applyBufferUpdate applies the buffer to the orderbook or initiates a new // orderbook sync by the REST protocol which is off handed to go routine. func (b *Bittrex) applyBufferUpdate(pair currency.Pair) error { - fetching, err := b.obm.checkIsFetchingBook(pair) + fetching, needsFetching, err := b.obm.handleFetchingBook(pair) if err != nil { return err } if fetching { return nil } - - recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) - if err != nil || (recent.Asks == nil && recent.Bids == nil) { + if needsFetching { if b.Verbose { - log.Debugf(log.WebsocketMgr, "Orderbook: Fetching via REST\n") + log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", b.Name) } return b.obm.fetchBookViaREST(pair) } + recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error fetching recent orderbook when applying updates: %s\n", + b.Name, + err) + } - return b.obm.checkAndProcessUpdate(b.ProcessUpdateOB, pair, recent) + if recent != nil { + err = b.obm.checkAndProcessUpdate(b.ProcessUpdateOB, pair, recent) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error processing update - initiating new orderbook sync via REST: %s\n", + b.Name, + err) + b.obm.setNeedsFetchingBook(pair) + } + } + return nil } // SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair @@ -192,12 +209,7 @@ func (b *Bittrex) processJob(p currency.Pair) error { // Immediately apply the buffer updates so we don't wait for a // new update to initiate this. - err = b.applyBufferUpdate(p) - if err != nil { - b.flushAndCleanup(p) - return err - } - return nil + return b.applyBufferUpdate(p) } // flushAndCleanup flushes orderbook and clean local cache @@ -239,9 +251,10 @@ func (o *orderbookManager) stageWsUpdate(u *OrderbookUpdateMessage, pair currenc state = &update{ // 100ms update assuming we might have up to a 10 second delay. // There could be a potential 100 updates for the currency. - buffer: make(chan *OrderbookUpdateMessage, maxWSUpdateBuffer), - fetchingBook: false, - initialSync: true, + buffer: make(chan *OrderbookUpdateMessage, maxWSUpdateBuffer), + fetchingBook: false, + initialSync: true, + needsFetchingBook: true, } m2[a] = state } @@ -258,21 +271,6 @@ func (o *orderbookManager) stageWsUpdate(u *OrderbookUpdateMessage, pair currenc } } -// checkIsFetchingBook checks status if the book is currently being via the REST -// protocol. -func (o *orderbookManager) checkIsFetchingBook(pair currency.Pair) (bool, error) { - o.Lock() - defer o.Unlock() - state, ok := o.state[pair.Base][pair.Quote][asset.Spot] - if !ok { - return false, - fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s", - pair, - asset.Spot) - } - return state.fetchingBook, nil -} - // stopFetchingBook completes the book fetching. func (o *orderbookManager) stopFetchingBook(pair currency.Pair) error { o.Lock() @@ -292,6 +290,64 @@ func (o *orderbookManager) stopFetchingBook(pair currency.Pair) error { return nil } +// stopNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + if !state.needsFetchingBook { + return fmt.Errorf("needs fetching book already set to false for %s %s", + pair, + asset.Spot) + } + state.needsFetchingBook = false + return nil +} + +// setNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + state.needsFetchingBook = true + return nil +} + +// handleFetchingBook checks if a full book is being fetched or needs to be +// fetched +func (o *orderbookManager) handleFetchingBook(pair currency.Pair) (fetching, needsFetching bool, err error) { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return false, false, + fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s", + pair, + asset.Spot) + } + + if state.fetchingBook { + return true, false, nil + } + + if state.needsFetchingBook { + state.needsFetchingBook = false + state.fetchingBook = true + return false, true, nil + } + return false, false, nil +} + // completeInitialSync sets if an asset type has completed its initial sync func (o *orderbookManager) completeInitialSync(pair currency.Pair) error { o.Lock() @@ -437,5 +493,6 @@ bufferEmpty: // disable rest orderbook synchronisation _ = o.stopFetchingBook(pair) _ = o.completeInitialSync(pair) + _ = o.stopNeedsFetchingBook(pair) return nil } diff --git a/exchanges/orderbook/depth.go b/exchanges/orderbook/depth.go index c87fdab5..2ca17395 100644 --- a/exchanges/orderbook/depth.go +++ b/exchanges/orderbook/depth.go @@ -94,8 +94,11 @@ func (d *Depth) TotalAskAmounts() (liquidity, value float64) { } // LoadSnapshot flushes the bids and asks with a snapshot -func (d *Depth) LoadSnapshot(bids, asks []Item) { +func (d *Depth) LoadSnapshot(bids, asks []Item, lastUpdateID int64, lastUpdated time.Time, updateByREST bool) { d.m.Lock() + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated + d.restSnapshot = updateByREST d.bids.load(bids, d.stack) d.asks.load(asks, d.stack) d.alert() @@ -105,6 +108,8 @@ func (d *Depth) LoadSnapshot(bids, asks []Item) { // Flush flushes the bid and ask depths func (d *Depth) Flush() { d.m.Lock() + d.lastUpdateID = 0 + d.lastUpdated = time.Time{} d.bids.load(nil, d.stack) d.asks.load(nil, d.stack) d.alert() @@ -113,11 +118,13 @@ func (d *Depth) Flush() { // UpdateBidAskByPrice updates the bid and ask spread by supplied updates, this // will trim total length of depth level to a specified supplied number -func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int) { +func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int, lastUpdateID int64, lastUpdated time.Time) { if len(bidUpdts) == 0 && len(askUpdts) == 0 { return } d.m.Lock() + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated tn := getNow() if len(bidUpdts) != 0 { d.bids.updateInsertByPrice(bidUpdts, d.stack, maxDepth, tn) @@ -130,7 +137,7 @@ func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int) { } // UpdateBidAskByID amends details by ID -func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items) error { +func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error { if len(bidUpdts) == 0 && len(askUpdts) == 0 { return nil } @@ -148,12 +155,14 @@ func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items) error { return err } } + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated d.alert() return nil } // DeleteBidAskByID deletes a price level by ID -func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool) error { +func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool, lastUpdateID int64, lastUpdated time.Time) error { if len(bidUpdts) == 0 && len(askUpdts) == 0 { return nil } @@ -171,12 +180,14 @@ func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool) error return err } } + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated d.alert() return nil } // InsertBidAskByID inserts new updates -func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items) error { +func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error { if len(bidUpdts) == 0 && len(askUpdts) == 0 { return nil } @@ -194,12 +205,14 @@ func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items) error { return err } } + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated d.alert() return nil } // UpdateInsertByID updates or inserts by ID at current price level. -func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items) error { +func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error { if len(bidUpdts) == 0 && len(askUpdts) == 0 { return nil } @@ -218,6 +231,8 @@ func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items) error { } } d.alert() + d.lastUpdateID = lastUpdateID + d.lastUpdated = lastUpdated return nil } @@ -239,15 +254,6 @@ func (d *Depth) AssignOptions(b *Base) { d.m.Unlock() } -// SetLastUpdate sets details of last update information -func (d *Depth) SetLastUpdate(lastUpdate time.Time, lastUpdateID int64, updateByREST bool) { - d.m.Lock() - d.lastUpdated = lastUpdate - d.lastUpdateID = lastUpdateID - d.restSnapshot = updateByREST - d.m.Unlock() -} - // GetName returns name of exchange func (d *Depth) GetName() string { d.m.Lock() diff --git a/exchanges/orderbook/depth_test.go b/exchanges/orderbook/depth_test.go index ed49c23c..8cc30dd1 100644 --- a/exchanges/orderbook/depth_test.go +++ b/exchanges/orderbook/depth_test.go @@ -121,7 +121,7 @@ func TestTotalAmounts(t *testing.T) { func TestLoadSnapshot(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}, 0, time.Time{}, false) if d.Retrieve().Asks[0].Price != 1337 || d.Retrieve().Bids[0].Price != 1337 { t.Fatal("not set") } @@ -129,12 +129,12 @@ func TestLoadSnapshot(t *testing.T) { func TestFlush(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}, 0, time.Time{}, false) d.Flush() if len(d.Retrieve().Asks) != 0 || len(d.Retrieve().Bids) != 0 { t.Fatal("not flushed") } - d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1}}, Items{{Price: 1337, Amount: 10}}, 0, time.Time{}, false) d.Flush() if len(d.Retrieve().Asks) != 0 || len(d.Retrieve().Bids) != 0 { t.Fatal("not flushed") @@ -143,12 +143,12 @@ func TestFlush(t *testing.T) { func TestUpdateBidAskByPrice(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}) - d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, 0) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) + d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, 0, 0, time.Time{}) if d.Retrieve().Asks[0].Amount != 2 || d.Retrieve().Bids[0].Amount != 2 { t.Fatal("orderbook amounts not updated correctly") } - d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 0, ID: 1}}, Items{{Price: 1337, Amount: 0, ID: 2}}, 0) + d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 0, ID: 1}}, Items{{Price: 1337, Amount: 0, ID: 2}}, 0, 0, time.Time{}) if d.GetAskLength() != 0 || d.GetBidLength() != 0 { t.Fatal("orderbook amounts not updated correctly") } @@ -156,8 +156,8 @@ func TestUpdateBidAskByPrice(t *testing.T) { func TestDeleteBidAskByID(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}) - err := d.DeleteBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, false) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) + err := d.DeleteBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, false, 0, time.Time{}) if err != nil { t.Fatal(err) } @@ -165,17 +165,17 @@ func TestDeleteBidAskByID(t *testing.T) { t.Fatal("items not deleted") } - err = d.DeleteBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, nil, false) + err = d.DeleteBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, nil, false, 0, time.Time{}) if !errors.Is(err, errIDCannotBeMatched) { t.Fatalf("error expected %v received %v", errIDCannotBeMatched, err) } - err = d.DeleteBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 2}}, false) + err = d.DeleteBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 2}}, false, 0, time.Time{}) if !errors.Is(err, errIDCannotBeMatched) { t.Fatalf("error expected %v received %v", errIDCannotBeMatched, err) } - err = d.DeleteBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 2}}, true) + err = d.DeleteBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 2}}, true, 0, time.Time{}) if !errors.Is(err, nil) { t.Fatalf("error expected %v received %v", nil, err) } @@ -183,8 +183,8 @@ func TestDeleteBidAskByID(t *testing.T) { func TestUpdateBidAskByID(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}) - err := d.UpdateBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) + err := d.UpdateBidAskByID(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, 0, time.Time{}) if err != nil { t.Fatal(err) } @@ -193,12 +193,12 @@ func TestUpdateBidAskByID(t *testing.T) { } // random unmatching IDs - err = d.UpdateBidAskByID(Items{{Price: 1337, Amount: 2, ID: 666}}, nil) + err = d.UpdateBidAskByID(Items{{Price: 1337, Amount: 2, ID: 666}}, nil, 0, time.Time{}) if !errors.Is(err, errIDCannotBeMatched) { t.Fatalf("error expected %v received %v", errIDCannotBeMatched, err) } - err = d.UpdateBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 69}}) + err = d.UpdateBidAskByID(nil, Items{{Price: 1337, Amount: 2, ID: 69}}, 0, time.Time{}) if !errors.Is(err, errIDCannotBeMatched) { t.Fatalf("error expected %v received %v", errIDCannotBeMatched, err) } @@ -206,8 +206,8 @@ func TestUpdateBidAskByID(t *testing.T) { func TestInsertBidAskByID(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}) - err := d.InsertBidAskByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) + err := d.InsertBidAskByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}, 0, time.Time{}) if err != nil { t.Fatal(err) } @@ -218,19 +218,19 @@ func TestInsertBidAskByID(t *testing.T) { func TestUpdateInsertByID(t *testing.T) { d := newDepth(id) - d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}) + d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) - err := d.UpdateInsertByID(Items{{Price: 1338, Amount: 0, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}) + err := d.UpdateInsertByID(Items{{Price: 1338, Amount: 0, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}, 0, time.Time{}) if !errors.Is(err, errAmountCannotBeLessOrEqualToZero) { t.Fatalf("expected: %v but received: %v", errAmountCannotBeLessOrEqualToZero, err) } - err = d.UpdateInsertByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 0, ID: 4}}) + err = d.UpdateInsertByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 0, ID: 4}}, 0, time.Time{}) if !errors.Is(err, errAmountCannotBeLessOrEqualToZero) { t.Fatalf("expected: %v but received: %v", errAmountCannotBeLessOrEqualToZero, err) } - err = d.UpdateInsertByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}) + err = d.UpdateInsertByID(Items{{Price: 1338, Amount: 2, ID: 3}}, Items{{Price: 1336, Amount: 2, ID: 4}}, 0, time.Time{}) if err != nil { t.Fatal(err) } @@ -271,17 +271,6 @@ func TestAssignOptions(t *testing.T) { } } -func TestSetLastUpdate(t *testing.T) { - d := Depth{} - tn := time.Now() - d.SetLastUpdate(tn, 1337, true) - if d.lastUpdated != tn || - d.lastUpdateID != 1337 || - !d.restSnapshot { - t.Fatal("failed to set correctly") - } -} - func TestGetName(t *testing.T) { d := Depth{} d.exchange = "test" diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index c245df5d..0d3dcc4a 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -79,8 +79,7 @@ func (s *Service) Update(b *Base) error { book.AssignOptions(b) m3[b.Pair.Quote.Item] = book } - book.SetLastUpdate(b.LastUpdated, b.LastUpdateID, true) - book.LoadSnapshot(b.Bids, b.Asks) + book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, true) s.Unlock() return s.Mux.Publish([]uuid.UUID{m1.ID}, book.Retrieve()) } diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index a85bfb5c..6c21085a 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -102,9 +102,6 @@ func (w *Orderbook) Update(u *Update) error { u.Asset) } - // Apply new update information - book.ob.SetLastUpdate(u.UpdateTime, u.UpdateID, false) - if w.bufferEnabled { processed, err := w.processBufferUpdate(book, u) if err != nil { @@ -193,7 +190,11 @@ func (w *Orderbook) processObUpdate(o *orderbookHolder, u *Update) error { // updateByPrice ammends amount if match occurs by price, deletes if amount is // zero or less and inserts if not found. func (o *orderbookHolder) updateByPrice(updts *Update) { - o.ob.UpdateBidAskByPrice(updts.Bids, updts.Asks, updts.MaxDepth) + o.ob.UpdateBidAskByPrice(updts.Bids, + updts.Asks, + updts.MaxDepth, + updts.UpdateID, + updts.UpdateTime) } // updateByIDAndAction will receive an action to execute against the orderbook @@ -201,15 +202,28 @@ func (o *orderbookHolder) updateByPrice(updts *Update) { func (o *orderbookHolder) updateByIDAndAction(updts *Update) error { switch updts.Action { case Amend: - return o.ob.UpdateBidAskByID(updts.Bids, updts.Asks) + return o.ob.UpdateBidAskByID(updts.Bids, + updts.Asks, + updts.UpdateID, + updts.UpdateTime) case Delete: // edge case for Bitfinex as their streaming endpoint duplicates deletes bypassErr := o.ob.GetName() == "Bitfinex" && o.ob.IsFundingRate() - return o.ob.DeleteBidAskByID(updts.Bids, updts.Asks, bypassErr) + return o.ob.DeleteBidAskByID(updts.Bids, + updts.Asks, + bypassErr, + updts.UpdateID, + updts.UpdateTime) case Insert: - return o.ob.InsertBidAskByID(updts.Bids, updts.Asks) + return o.ob.InsertBidAskByID(updts.Bids, + updts.Asks, + updts.UpdateID, + updts.UpdateTime) case UpdateInsert: - return o.ob.UpdateInsertByID(updts.Bids, updts.Asks) + return o.ob.UpdateInsertByID(updts.Bids, + updts.Asks, + updts.UpdateID, + updts.UpdateTime) default: return fmt.Errorf("invalid action [%s]", updts.Action) } @@ -253,7 +267,13 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { return err } - holder.ob.LoadSnapshot(book.Bids, book.Asks) + holder.ob.LoadSnapshot( + book.Bids, + book.Asks, + book.LastUpdateID, + book.LastUpdated, + false, + ) if holder.ob.VerifyOrderbook { // This is used here so as to not retrieve // book if verification is off. diff --git a/exchanges/stream/buffer/buffer_test.go b/exchanges/stream/buffer/buffer_test.go index 6511c97e..16835415 100644 --- a/exchanges/stream/buffer/buffer_test.go +++ b/exchanges/stream/buffer/buffer_test.go @@ -802,7 +802,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatal(err) } - book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...)) + book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Time{}, true) err = book.Retrieve().Verify() if err != nil { @@ -924,7 +924,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatal("did not adjust ask item placement and details") } - book.LoadSnapshot(append(bids[:0:0], bids...), append(bids[:0:0], bids...)) // nolint:gocritic + book.LoadSnapshot(append(bids[:0:0], bids...), append(bids[:0:0], bids...), 0, time.Time{}, true) // nolint:gocritic // Delete - not found err = holder.updateByIDAndAction(&Update{