From 91ff6c5c1253c40c1de9a0ce8f329ed23a16d2d7 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Thu, 15 Aug 2024 16:11:22 +1000 Subject: [PATCH] orderbook/gateio: add field `UpdatePushedAt` and `InsertedAt` for specific websocket updates (#1590) * Add in extra field for colocation monitoring * rm tags * populate through gateio orderbooks * populate through incremental updates * Add insert at field for orderbook depth * I AM A BAD SPELLLLLLA * add it in * c change --------- Co-authored-by: Ryan O'Hara-Reid --- engine/rpcserver_test.go | 6 +-- exchanges/gateio/gateio_types.go | 1 + exchanges/gateio/gateio_websocket.go | 31 ++++++++------- exchanges/gateio/gateio_ws_futures.go | 6 ++- exchanges/gateio/gateio_ws_option.go | 6 ++- exchanges/orderbook/depth.go | 8 +++- exchanges/orderbook/depth_test.go | 54 +++++++++++++------------- exchanges/orderbook/orderbook.go | 2 +- exchanges/orderbook/orderbook_types.go | 30 +++++++++++--- exchanges/orderbook/tranches_test.go | 14 +++---- exchanges/stream/buffer/buffer.go | 1 + exchanges/stream/buffer/buffer_test.go | 10 ++--- 12 files changed, 103 insertions(+), 66 deletions(-) diff --git a/engine/rpcserver_test.go b/engine/rpcserver_test.go index 44acfdc0..33a7aa0a 100644 --- a/engine/rpcserver_test.go +++ b/engine/rpcserver_test.go @@ -3486,7 +3486,7 @@ func TestGetOrderbookMovement(t *testing.T) { {Price: 13, Amount: 1}, {Price: 14, Amount: 1}, } - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -3599,7 +3599,7 @@ func TestGetOrderbookAmountByNominal(t *testing.T) { {Price: 13, Amount: 1}, {Price: 14, Amount: 1}, } - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -3705,7 +3705,7 @@ func TestGetOrderbookAmountByImpact(t *testing.T) { {Price: 13, Amount: 1}, {Price: 14, Amount: 1}, } - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index aebb62fc..f4c48fa3 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2014,6 +2014,7 @@ type WsEventResponse struct { type WsResponse struct { ID int64 `json:"id"` Time Time `json:"time"` + TimeMs Time `json:"time_ms"` Channel string `json:"channel"` Event string `json:"event"` Result json.RawMessage `json:"result"` diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index a5b7b43f..1480510f 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -131,11 +131,11 @@ func (g *Gateio) wsHandleData(respRaw []byte) error { case spotCandlesticksChannel: return g.processCandlestick(push.Result) case spotOrderbookTickerChannel: - return g.processOrderbookTicker(push.Result) + return g.processOrderbookTicker(push.Result, push.TimeMs.Time()) case spotOrderbookUpdateChannel: - return g.processOrderbookUpdate(push.Result) + return g.processOrderbookUpdate(push.Result, push.TimeMs.Time()) case spotOrderbookChannel: - return g.processOrderbookSnapshot(push.Result) + return g.processOrderbookSnapshot(push.Result, push.TimeMs.Time()) case spotOrdersChannel: return g.processSpotOrders(respRaw) case spotUserTradesChannel: @@ -278,7 +278,7 @@ func (g *Gateio) processCandlestick(incoming []byte) error { return nil } -func (g *Gateio) processOrderbookTicker(incoming []byte) error { +func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Time) error { var data WsOrderbookTickerData err := json.Unmarshal(incoming, &data) if err != nil { @@ -286,16 +286,17 @@ func (g *Gateio) processOrderbookTicker(incoming []byte) error { } return g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ - Exchange: g.Name, - Pair: data.CurrencyPair, - Asset: asset.Spot, - LastUpdated: data.UpdateTimeMS.Time(), - Bids: []orderbook.Tranche{{Price: data.BestBidPrice.Float64(), Amount: data.BestBidAmount.Float64()}}, - Asks: []orderbook.Tranche{{Price: data.BestAskPrice.Float64(), Amount: data.BestAskAmount.Float64()}}, + Exchange: g.Name, + Pair: data.CurrencyPair, + Asset: asset.Spot, + LastUpdated: data.UpdateTimeMS.Time(), + UpdatePushedAt: updatePushedAt, + Bids: []orderbook.Tranche{{Price: data.BestBidPrice.Float64(), Amount: data.BestBidAmount.Float64()}}, + Asks: []orderbook.Tranche{{Price: data.BestAskPrice.Float64(), Amount: data.BestAskAmount.Float64()}}, }) } -func (g *Gateio) processOrderbookUpdate(incoming []byte) error { +func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Time) error { var data WsOrderbookUpdate err := json.Unmarshal(incoming, &data) if err != nil { @@ -323,8 +324,9 @@ func (g *Gateio) processOrderbookUpdate(incoming []byte) error { fetchedCurrencyPairSnapshotOrderbook[data.CurrencyPair.String()] = true } updates := orderbook.Update{ - UpdateTime: data.UpdateTimeMs.Time(), - Pair: data.CurrencyPair, + UpdateTime: data.UpdateTimeMs.Time(), + UpdatePushedAt: updatePushedAt, + Pair: data.CurrencyPair, } updates.Asks = make([]orderbook.Tranche, len(data.Asks)) for x := range data.Asks { @@ -377,7 +379,7 @@ func (g *Gateio) processOrderbookUpdate(incoming []byte) error { return nil } -func (g *Gateio) processOrderbookSnapshot(incoming []byte) error { +func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.Time) error { var data WsOrderbookSnapshot err := json.Unmarshal(incoming, &data) if err != nil { @@ -389,6 +391,7 @@ func (g *Gateio) processOrderbookSnapshot(incoming []byte) error { Pair: data.CurrencyPair, Asset: asset.Spot, LastUpdated: data.UpdateTimeMs.Time(), + UpdatePushedAt: updatePushedAt, LastUpdateID: data.LastUpdateID, VerifyOrderbook: g.CanVerifyOrderbook, } diff --git a/exchanges/gateio/gateio_ws_futures.go b/exchanges/gateio/gateio_ws_futures.go index 8962212d..19a640d2 100644 --- a/exchanges/gateio/gateio_ws_futures.go +++ b/exchanges/gateio/gateio_ws_futures.go @@ -577,7 +577,7 @@ func (g *Gateio) processFuturesAndOptionsOrderbookUpdate(incoming []byte, assetT return g.Websocket.Orderbook.Update(&updates) } -func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, pushTime time.Time) error { +func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, updatePushedAt time.Time) error { if event == "all" { var data WsFuturesOrderbookSnapshot err := json.Unmarshal(incoming, &data) @@ -589,6 +589,7 @@ func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, Exchange: g.Name, Pair: data.Contract, LastUpdated: data.TimestampInMs.Time(), + UpdatePushedAt: updatePushedAt, VerifyOrderbook: g.CanVerifyOrderbook, } base.Asks = make([]orderbook.Tranche, len(data.Asks)) @@ -643,7 +644,8 @@ func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, Asset: assetType, Exchange: g.Name, Pair: currencyPair, - LastUpdated: pushTime, + LastUpdated: updatePushedAt, + UpdatePushedAt: updatePushedAt, VerifyOrderbook: g.CanVerifyOrderbook, }) if err != nil { diff --git a/exchanges/gateio/gateio_ws_option.go b/exchanges/gateio/gateio_ws_option.go index fe1384c8..c33a8493 100644 --- a/exchanges/gateio/gateio_ws_option.go +++ b/exchanges/gateio/gateio_ws_option.go @@ -554,7 +554,7 @@ func (g *Gateio) processOrderbookTickerPushData(incoming []byte) error { return nil } -func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, pushTime time.Time) error { +func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, updatePushedAt time.Time) error { if event == "all" { var data WsOptionsOrderbookSnapshot err := json.Unmarshal(incoming, &data) @@ -566,6 +566,7 @@ func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming Exchange: g.Name, Pair: data.Contract, LastUpdated: data.Timestamp.Time(), + UpdatePushedAt: updatePushedAt, VerifyOrderbook: g.CanVerifyOrderbook, } base.Asks = make([]orderbook.Tranche, len(data.Asks)) @@ -618,7 +619,8 @@ func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming Asset: asset.Options, Exchange: g.Name, Pair: currencyPair, - LastUpdated: pushTime, + LastUpdated: updatePushedAt, + UpdatePushedAt: updatePushedAt, VerifyOrderbook: g.CanVerifyOrderbook, }) if err != nil { diff --git a/exchanges/orderbook/depth.go b/exchanges/orderbook/depth.go index 13fd1cfc..a8e9ebda 100644 --- a/exchanges/orderbook/depth.go +++ b/exchanges/orderbook/depth.go @@ -76,6 +76,8 @@ func (d *Depth) Retrieve() (*Base, error) { Asset: d.asset, Pair: d.pair, LastUpdated: d.lastUpdated, + UpdatePushedAt: d.updatePushedAt, + InsertedAt: d.insertedAt, LastUpdateID: d.lastUpdateID, PriceDuplication: d.priceDuplication, IsFundingRate: d.isFundingRate, @@ -86,7 +88,7 @@ func (d *Depth) Retrieve() (*Base, error) { } // LoadSnapshot flushes the bids and asks with a snapshot -func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated time.Time, updateByREST bool) error { +func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated, updatePushedAt time.Time, updateByREST bool) error { d.m.Lock() defer d.m.Unlock() if lastUpdated.IsZero() { @@ -98,6 +100,8 @@ func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdat } d.lastUpdateID = lastUpdateID d.lastUpdated = lastUpdated + d.updatePushedAt = updatePushedAt + d.insertedAt = time.Now() d.restSnapshot = updateByREST d.bidTranches.load(bids) d.askTranches.load(asks) @@ -373,6 +377,8 @@ func (d *Depth) TotalAskAmounts() (liquidity, value float64, err error) { func (d *Depth) updateAndAlert(update *Update) { d.lastUpdateID = update.UpdateID d.lastUpdated = update.UpdateTime + d.updatePushedAt = update.UpdatePushedAt + d.insertedAt = time.Now() d.Alert() } diff --git a/exchanges/orderbook/depth_test.go b/exchanges/orderbook/depth_test.go index 95f6d65e..dafad8fd 100644 --- a/exchanges/orderbook/depth_test.go +++ b/exchanges/orderbook/depth_test.go @@ -28,7 +28,7 @@ func TestGetLength(t *testing.T) { _, err = d.GetAskLength() assert.ErrorIs(t, err, ErrOrderbookInvalid, "GetAskLength should error with invalid depth") - err = d.LoadSnapshot([]Tranche{{Price: 1337}}, nil, 0, time.Now(), true) + err = d.LoadSnapshot([]Tranche{{Price: 1337}}, nil, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") askLen, err := d.GetAskLength() @@ -48,7 +48,7 @@ func TestGetLength(t *testing.T) { _, err = d.GetBidLength() assert.ErrorIs(t, err, ErrOrderbookInvalid, "GetBidLength should error with invalid depth") - err = d.LoadSnapshot(nil, []Tranche{{Price: 1337}}, 0, time.Now(), true) + err = d.LoadSnapshot(nil, []Tranche{{Price: 1337}}, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") bidLen, err := d.GetBidLength() @@ -72,6 +72,8 @@ func TestRetrieve(t *testing.T) { pair: currency.NewPair(currency.THETA, currency.USD), asset: asset.DownsideProfitContract, lastUpdated: time.Now(), + updatePushedAt: time.Now(), + insertedAt: time.Now(), lastUpdateID: 1337, priceDuplication: true, isFundingRate: true, @@ -142,10 +144,10 @@ func TestTotalAmounts(t *testing.T) { func TestLoadSnapshot(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Time{}, false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Time{}, time.Now(), false) assert.ErrorIs(t, err, errLastUpdatedNotSet, "LoadSnapshot should error correctly") - err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 2}}, Tranches{{Price: 1338, Amount: 10}}, 0, time.Now(), false) + err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 2}}, Tranches{{Price: 1338, Amount: 10}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") ob, err := d.Retrieve() @@ -164,7 +166,7 @@ func TestInvalidate(t *testing.T) { d.pair = currency.NewPair(currency.BTC, currency.WABI) d.asset = asset.Spot - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") ob, err := d.Retrieve() @@ -192,7 +194,7 @@ func TestInvalidate(t *testing.T) { func TestUpdateBidAskByPrice(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1338, Amount: 10, ID: 2}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1338, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") err = d.UpdateBidAskByPrice(&Update{}) @@ -236,7 +238,7 @@ func TestUpdateBidAskByPrice(t *testing.T) { func TestDeleteBidAskByID(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates := &Update{ @@ -281,7 +283,7 @@ func TestDeleteBidAskByID(t *testing.T) { func TestUpdateBidAskByID(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates := &Update{ @@ -320,7 +322,7 @@ func TestUpdateBidAskByID(t *testing.T) { func TestInsertBidAskByID(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates := &Update{ @@ -334,7 +336,7 @@ func TestInsertBidAskByID(t *testing.T) { err = d.InsertBidAskByID(updates) assert.ErrorIs(t, err, errCollisionDetected, "InsertBidAskByID should error correctly on collision") - err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates = &Update{ @@ -345,7 +347,7 @@ func TestInsertBidAskByID(t *testing.T) { err = d.InsertBidAskByID(updates) assert.ErrorIs(t, err, errCollisionDetected, "InsertBidAskByID should error correctly on collision") - err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates = &Update{ @@ -365,7 +367,7 @@ func TestInsertBidAskByID(t *testing.T) { func TestUpdateInsertByID(t *testing.T) { t.Parallel() d := NewDepth(id) - err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates := &Update{ @@ -383,7 +385,7 @@ func TestUpdateInsertByID(t *testing.T) { _, err = d.Retrieve() assert.ErrorIs(t, err, ErrOrderbookInvalid, "Retrieve should error correctly") - err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates = &Update{ @@ -398,7 +400,7 @@ func TestUpdateInsertByID(t *testing.T) { _, err = d.Retrieve() assert.ErrorIs(t, err, ErrOrderbookInvalid, "Retrieve should error correctly") - err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false) + err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false) assert.NoError(t, err, "LoadSnapshot should not error") updates = &Update{ @@ -519,7 +521,7 @@ func TestGetMidPrice_Depth(t *testing.T) { _, err = depth.GetMidPrice() assert.ErrorIs(t, err, errNoLiquidity, "GetMidPrice should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") mid, err := depth.GetMidPrice() @@ -533,13 +535,13 @@ func TestGetMidPriceNoLock_Depth(t *testing.T) { _, err := depth.getMidPriceNoLock() assert.ErrorIs(t, err, errNoLiquidity, "getMidPriceNoLock should error correctly") - err = depth.LoadSnapshot(bid, nil, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, nil, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") _, err = depth.getMidPriceNoLock() assert.ErrorIs(t, err, errNoLiquidity, "getMidPriceNoLock should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") mid, err := depth.getMidPriceNoLock() @@ -562,7 +564,7 @@ func TestGetBestBidASk_Depth(t *testing.T) { _, err = depth.GetBestAsk() assert.ErrorIs(t, err, errNoLiquidity, "GetBestAsk should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") mid, err := depth.GetBestBid() @@ -584,13 +586,13 @@ func TestGetSpreadAmount(t *testing.T) { _, err = depth.GetSpreadAmount() assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadAmount should error correctly") - err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") _, err = depth.GetSpreadAmount() assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadAmount should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") spread, err := depth.GetSpreadAmount() @@ -608,13 +610,13 @@ func TestGetSpreadPercentage(t *testing.T) { _, err = depth.GetSpreadPercentage() assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadPercentage should error correctly") - err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") _, err = depth.GetSpreadPercentage() assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadPercentage should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") spread, err := depth.GetSpreadPercentage() @@ -632,13 +634,13 @@ func TestGetImbalance_Depth(t *testing.T) { _, err = depth.GetImbalance() assert.ErrorIs(t, err, errNoLiquidity, "GetImbalance should error correctly") - err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") _, err = depth.GetImbalance() assert.ErrorIs(t, err, errNoLiquidity, "GetImbalance should error correctly") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") imbalance, err := depth.GetImbalance() @@ -661,7 +663,7 @@ func TestGetTranches(t *testing.T) { assert.Empty(t, askT, "Ask tranche should be empty") assert.Empty(t, bidT, "Bid tranche should be empty") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") askT, bidT, err = depth.GetTranches(0) @@ -728,7 +730,7 @@ func TestMovementMethods(t *testing.T) { _, err = callMethod(depth, methodName, tt.tests[0].inputs) assert.ErrorIs(t, err, errNoLiquidity, "should error correctly with no liquidity") - err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") for i, subT := range tt.tests { diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index 877c25d1..540180be 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -71,7 +71,7 @@ func (s *Service) Update(b *Base) error { book.AssignOptions(b) m1.m[mapKey] = book } - err := book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, true) + err := book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.UpdatePushedAt, true) s.mu.Unlock() if err != nil { return err diff --git a/exchanges/orderbook/orderbook_types.go b/exchanges/orderbook/orderbook_types.go index 6ed7bd3d..c40ddf6d 100644 --- a/exchanges/orderbook/orderbook_types.go +++ b/exchanges/orderbook/orderbook_types.go @@ -86,7 +86,24 @@ type Base struct { Pair currency.Pair Asset asset.Item - LastUpdated time.Time + // LastUpdated is the time when a change occurred on the exchange books. + // Note: This does not necessarily indicate the change is out of sync with + // the exchange. It represents the last known update time from the exchange, + // which could be stale if there have been no recent changes. + LastUpdated time.Time + + // UpdatePushedAt is the time the exchange pushed this update. This helps + // determine factors like distance from exchange (latency) and routing + // time, which can affect the time it takes for an update to reach the user + // from the exchange. + UpdatePushedAt time.Time + + // InsertedAt is the time the update was inserted into the orderbook + // management system. This field is used to calculate round-trip times and + // processing delays, e.g., InsertedAt.Sub(UpdatePushedAt) represents the + // total processing time including network latency. + InsertedAt time.Time + LastUpdateID int64 // PriceDuplication defines whether an orderbook can contain duplicate // prices in a payload @@ -95,7 +112,7 @@ type Base struct { // VerifyOrderbook allows for a toggle between orderbook verification set by // user configuration, this allows for a potential processing boost but // a potential for orderbook integrity being deminished. - VerifyOrderbook bool `json:"-"` + VerifyOrderbook bool // RestSnapshot defines if the depth was applied via the REST protocol thus // an update cannot be applied via websocket mechanics and a resubscription // would need to take place to maintain book integrity @@ -117,6 +134,8 @@ type options struct { pair currency.Pair asset asset.Item lastUpdated time.Time + updatePushedAt time.Time + insertedAt time.Time lastUpdateID int64 priceDuplication bool isFundingRate bool @@ -145,9 +164,10 @@ const ( // Update and things and stuff type Update struct { - UpdateID int64 // Used when no time is provided - UpdateTime time.Time - Asset asset.Item + UpdateID int64 // Used when no time is provided + UpdateTime time.Time + UpdatePushedAt time.Time + Asset asset.Item Action Bids []Tranche Asks []Tranche diff --git a/exchanges/orderbook/tranches_test.go b/exchanges/orderbook/tranches_test.go index ee75b9ad..110c374e 100644 --- a/exchanges/orderbook/tranches_test.go +++ b/exchanges/orderbook/tranches_test.go @@ -1257,7 +1257,7 @@ func TestGetMovementByBaseAmount(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true) + err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -1392,7 +1392,7 @@ func TestGetBaseAmountFromNominalSlippage(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true) + err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") base, err := depth.bidTranches.hitBidsByNominalSlippage(tt.NominalSlippage, tt.ReferencePrice) @@ -1500,7 +1500,7 @@ func TestGetBaseAmountFromImpact(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true) + err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -1586,7 +1586,7 @@ func TestGetMovementByQuoteAmount(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true) + err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -1719,7 +1719,7 @@ func TestGetQuoteAmountFromNominalSlippage(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true) + err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") quote, err := depth.askTranches.liftAsksByNominalSlippage(tt.NominalSlippage, tt.ReferencePrice) @@ -1808,7 +1808,7 @@ func TestGetQuoteAmountFromImpact(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { t.Parallel() depth := NewDepth(id) - err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true) + err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true) assert.NoError(t, err, "LoadSnapshot should not error") quote, err := depth.askTranches.liftAsksByImpactSlippage(tt.ImpactSlippage, tt.ReferencePrice) @@ -1830,7 +1830,7 @@ func TestGetHeadPrice(t *testing.T) { if _, err := depth.askTranches.getHeadPriceNoLock(); !errors.Is(err, errNoLiquidity) { t.Fatalf("received: '%v' but expected: '%v'", err, errNoLiquidity) } - err := depth.LoadSnapshot(bid, ask, 0, time.Now(), true) + err := depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true) if err != nil { t.Fatalf("failed to load snapshot: %s", err) } diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index c2fb1ce3..d2ce8a9c 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -337,6 +337,7 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { book.Asks, book.LastUpdateID, book.LastUpdated, + book.UpdatePushedAt, false) if err != nil { return err diff --git a/exchanges/stream/buffer/buffer_test.go b/exchanges/stream/buffer/buffer_test.go index a7739dc8..d65fc1b6 100644 --- a/exchanges/stream/buffer/buffer_test.go +++ b/exchanges/stream/buffer/buffer_test.go @@ -933,7 +933,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatal(err) } - err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), true) + err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -968,7 +968,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, errAmendFailure) } - err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), true) + err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) } @@ -1079,7 +1079,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatal("did not adjust ask item placement and details") } - err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), true) //nolint:gocritic + err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), time.Now(), true) //nolint:gocritic if err != nil { t.Fatal(err) } @@ -1098,7 +1098,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, errDeleteFailure) } - err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), true) //nolint:gocritic + err = book.LoadSnapshot(append(bids[:0:0], bids...), append(asks[:0:0], asks...), 0, time.Now(), time.Now(), true) //nolint:gocritic if err != nil { t.Fatal(err) } @@ -1134,7 +1134,7 @@ func TestUpdateByIDAndAction(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, errAmendFailure) } - err = book.LoadSnapshot(bids, bids, 0, time.Now(), true) + err = book.LoadSnapshot(bids, bids, 0, time.Now(), time.Now(), true) if err != nil { t.Fatal(err) }