orderbook/gateio: add field UpdatePushedAt and InsertedAt for specific websocket updates (#1590)

* Add in extra field for colocation monitoring

* rm tags

* populate through gateio orderbooks

* populate through incremental updates

* Add insert at field for orderbook depth

* I AM A BAD SPELLLLLLA

* add it in

* c change

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2024-08-15 16:11:22 +10:00
committed by GitHub
parent b602d54bbe
commit 91ff6c5c12
12 changed files with 103 additions and 66 deletions

View File

@@ -76,6 +76,8 @@ func (d *Depth) Retrieve() (*Base, error) {
Asset: d.asset,
Pair: d.pair,
LastUpdated: d.lastUpdated,
UpdatePushedAt: d.updatePushedAt,
InsertedAt: d.insertedAt,
LastUpdateID: d.lastUpdateID,
PriceDuplication: d.priceDuplication,
IsFundingRate: d.isFundingRate,
@@ -86,7 +88,7 @@ func (d *Depth) Retrieve() (*Base, error) {
}
// LoadSnapshot flushes the bids and asks with a snapshot
func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated time.Time, updateByREST bool) error {
func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated, updatePushedAt time.Time, updateByREST bool) error {
d.m.Lock()
defer d.m.Unlock()
if lastUpdated.IsZero() {
@@ -98,6 +100,8 @@ func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdat
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.updatePushedAt = updatePushedAt
d.insertedAt = time.Now()
d.restSnapshot = updateByREST
d.bidTranches.load(bids)
d.askTranches.load(asks)
@@ -373,6 +377,8 @@ func (d *Depth) TotalAskAmounts() (liquidity, value float64, err error) {
func (d *Depth) updateAndAlert(update *Update) {
d.lastUpdateID = update.UpdateID
d.lastUpdated = update.UpdateTime
d.updatePushedAt = update.UpdatePushedAt
d.insertedAt = time.Now()
d.Alert()
}

View File

@@ -28,7 +28,7 @@ func TestGetLength(t *testing.T) {
_, err = d.GetAskLength()
assert.ErrorIs(t, err, ErrOrderbookInvalid, "GetAskLength should error with invalid depth")
err = d.LoadSnapshot([]Tranche{{Price: 1337}}, nil, 0, time.Now(), true)
err = d.LoadSnapshot([]Tranche{{Price: 1337}}, nil, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
askLen, err := d.GetAskLength()
@@ -48,7 +48,7 @@ func TestGetLength(t *testing.T) {
_, err = d.GetBidLength()
assert.ErrorIs(t, err, ErrOrderbookInvalid, "GetBidLength should error with invalid depth")
err = d.LoadSnapshot(nil, []Tranche{{Price: 1337}}, 0, time.Now(), true)
err = d.LoadSnapshot(nil, []Tranche{{Price: 1337}}, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
bidLen, err := d.GetBidLength()
@@ -72,6 +72,8 @@ func TestRetrieve(t *testing.T) {
pair: currency.NewPair(currency.THETA, currency.USD),
asset: asset.DownsideProfitContract,
lastUpdated: time.Now(),
updatePushedAt: time.Now(),
insertedAt: time.Now(),
lastUpdateID: 1337,
priceDuplication: true,
isFundingRate: true,
@@ -142,10 +144,10 @@ func TestTotalAmounts(t *testing.T) {
func TestLoadSnapshot(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Time{}, false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Time{}, time.Now(), false)
assert.ErrorIs(t, err, errLastUpdatedNotSet, "LoadSnapshot should error correctly")
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 2}}, Tranches{{Price: 1338, Amount: 10}}, 0, time.Now(), false)
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 2}}, Tranches{{Price: 1338, Amount: 10}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
ob, err := d.Retrieve()
@@ -164,7 +166,7 @@ func TestInvalidate(t *testing.T) {
d.pair = currency.NewPair(currency.BTC, currency.WABI)
d.asset = asset.Spot
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1}}, Tranches{{Price: 1337, Amount: 10}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
ob, err := d.Retrieve()
@@ -192,7 +194,7 @@ func TestInvalidate(t *testing.T) {
func TestUpdateBidAskByPrice(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1338, Amount: 10, ID: 2}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1338, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
err = d.UpdateBidAskByPrice(&Update{})
@@ -236,7 +238,7 @@ func TestUpdateBidAskByPrice(t *testing.T) {
func TestDeleteBidAskByID(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates := &Update{
@@ -281,7 +283,7 @@ func TestDeleteBidAskByID(t *testing.T) {
func TestUpdateBidAskByID(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates := &Update{
@@ -320,7 +322,7 @@ func TestUpdateBidAskByID(t *testing.T) {
func TestInsertBidAskByID(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates := &Update{
@@ -334,7 +336,7 @@ func TestInsertBidAskByID(t *testing.T) {
err = d.InsertBidAskByID(updates)
assert.ErrorIs(t, err, errCollisionDetected, "InsertBidAskByID should error correctly on collision")
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates = &Update{
@@ -345,7 +347,7 @@ func TestInsertBidAskByID(t *testing.T) {
err = d.InsertBidAskByID(updates)
assert.ErrorIs(t, err, errCollisionDetected, "InsertBidAskByID should error correctly on collision")
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates = &Update{
@@ -365,7 +367,7 @@ func TestInsertBidAskByID(t *testing.T) {
func TestUpdateInsertByID(t *testing.T) {
t.Parallel()
d := NewDepth(id)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err := d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates := &Update{
@@ -383,7 +385,7 @@ func TestUpdateInsertByID(t *testing.T) {
_, err = d.Retrieve()
assert.ErrorIs(t, err, ErrOrderbookInvalid, "Retrieve should error correctly")
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates = &Update{
@@ -398,7 +400,7 @@ func TestUpdateInsertByID(t *testing.T) {
_, err = d.Retrieve()
assert.ErrorIs(t, err, ErrOrderbookInvalid, "Retrieve should error correctly")
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), false)
err = d.LoadSnapshot(Tranches{{Price: 1337, Amount: 1, ID: 1}}, Tranches{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Now(), time.Now(), false)
assert.NoError(t, err, "LoadSnapshot should not error")
updates = &Update{
@@ -519,7 +521,7 @@ func TestGetMidPrice_Depth(t *testing.T) {
_, err = depth.GetMidPrice()
assert.ErrorIs(t, err, errNoLiquidity, "GetMidPrice should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
mid, err := depth.GetMidPrice()
@@ -533,13 +535,13 @@ func TestGetMidPriceNoLock_Depth(t *testing.T) {
_, err := depth.getMidPriceNoLock()
assert.ErrorIs(t, err, errNoLiquidity, "getMidPriceNoLock should error correctly")
err = depth.LoadSnapshot(bid, nil, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, nil, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
_, err = depth.getMidPriceNoLock()
assert.ErrorIs(t, err, errNoLiquidity, "getMidPriceNoLock should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
mid, err := depth.getMidPriceNoLock()
@@ -562,7 +564,7 @@ func TestGetBestBidASk_Depth(t *testing.T) {
_, err = depth.GetBestAsk()
assert.ErrorIs(t, err, errNoLiquidity, "GetBestAsk should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
mid, err := depth.GetBestBid()
@@ -584,13 +586,13 @@ func TestGetSpreadAmount(t *testing.T) {
_, err = depth.GetSpreadAmount()
assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadAmount should error correctly")
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
_, err = depth.GetSpreadAmount()
assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadAmount should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
spread, err := depth.GetSpreadAmount()
@@ -608,13 +610,13 @@ func TestGetSpreadPercentage(t *testing.T) {
_, err = depth.GetSpreadPercentage()
assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadPercentage should error correctly")
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
_, err = depth.GetSpreadPercentage()
assert.ErrorIs(t, err, errNoLiquidity, "GetSpreadPercentage should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
spread, err := depth.GetSpreadPercentage()
@@ -632,13 +634,13 @@ func TestGetImbalance_Depth(t *testing.T) {
_, err = depth.GetImbalance()
assert.ErrorIs(t, err, errNoLiquidity, "GetImbalance should error correctly")
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(nil, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
_, err = depth.GetImbalance()
assert.ErrorIs(t, err, errNoLiquidity, "GetImbalance should error correctly")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
imbalance, err := depth.GetImbalance()
@@ -661,7 +663,7 @@ func TestGetTranches(t *testing.T) {
assert.Empty(t, askT, "Ask tranche should be empty")
assert.Empty(t, bidT, "Bid tranche should be empty")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
askT, bidT, err = depth.GetTranches(0)
@@ -728,7 +730,7 @@ func TestMovementMethods(t *testing.T) {
_, err = callMethod(depth, methodName, tt.tests[0].inputs)
assert.ErrorIs(t, err, errNoLiquidity, "should error correctly with no liquidity")
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
for i, subT := range tt.tests {

View File

@@ -71,7 +71,7 @@ func (s *Service) Update(b *Base) error {
book.AssignOptions(b)
m1.m[mapKey] = book
}
err := book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, true)
err := book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.UpdatePushedAt, true)
s.mu.Unlock()
if err != nil {
return err

View File

@@ -86,7 +86,24 @@ type Base struct {
Pair currency.Pair
Asset asset.Item
LastUpdated time.Time
// LastUpdated is the time when a change occurred on the exchange books.
// Note: This does not necessarily indicate the change is out of sync with
// the exchange. It represents the last known update time from the exchange,
// which could be stale if there have been no recent changes.
LastUpdated time.Time
// UpdatePushedAt is the time the exchange pushed this update. This helps
// determine factors like distance from exchange (latency) and routing
// time, which can affect the time it takes for an update to reach the user
// from the exchange.
UpdatePushedAt time.Time
// InsertedAt is the time the update was inserted into the orderbook
// management system. This field is used to calculate round-trip times and
// processing delays, e.g., InsertedAt.Sub(UpdatePushedAt) represents the
// total processing time including network latency.
InsertedAt time.Time
LastUpdateID int64
// PriceDuplication defines whether an orderbook can contain duplicate
// prices in a payload
@@ -95,7 +112,7 @@ type Base struct {
// VerifyOrderbook allows for a toggle between orderbook verification set by
// user configuration, this allows for a potential processing boost but
// a potential for orderbook integrity being deminished.
VerifyOrderbook bool `json:"-"`
VerifyOrderbook bool
// RestSnapshot defines if the depth was applied via the REST protocol thus
// an update cannot be applied via websocket mechanics and a resubscription
// would need to take place to maintain book integrity
@@ -117,6 +134,8 @@ type options struct {
pair currency.Pair
asset asset.Item
lastUpdated time.Time
updatePushedAt time.Time
insertedAt time.Time
lastUpdateID int64
priceDuplication bool
isFundingRate bool
@@ -145,9 +164,10 @@ const (
// Update and things and stuff
type Update struct {
UpdateID int64 // Used when no time is provided
UpdateTime time.Time
Asset asset.Item
UpdateID int64 // Used when no time is provided
UpdateTime time.Time
UpdatePushedAt time.Time
Asset asset.Item
Action
Bids []Tranche
Asks []Tranche

View File

@@ -1257,7 +1257,7 @@ func TestGetMovementByBaseAmount(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true)
if err != nil {
t.Fatal(err)
}
@@ -1392,7 +1392,7 @@ func TestGetBaseAmountFromNominalSlippage(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
base, err := depth.bidTranches.hitBidsByNominalSlippage(tt.NominalSlippage, tt.ReferencePrice)
@@ -1500,7 +1500,7 @@ func TestGetBaseAmountFromImpact(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), true)
err := depth.LoadSnapshot(tt.BidLiquidity, nil, 0, time.Now(), time.Now(), true)
if err != nil {
t.Fatal(err)
}
@@ -1586,7 +1586,7 @@ func TestGetMovementByQuoteAmount(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true)
if err != nil {
t.Fatal(err)
}
@@ -1719,7 +1719,7 @@ func TestGetQuoteAmountFromNominalSlippage(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
quote, err := depth.askTranches.liftAsksByNominalSlippage(tt.NominalSlippage, tt.ReferencePrice)
@@ -1808,7 +1808,7 @@ func TestGetQuoteAmountFromImpact(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
t.Parallel()
depth := NewDepth(id)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), true)
err := depth.LoadSnapshot(nil, tt.AskLiquidity, 0, time.Now(), time.Now(), true)
assert.NoError(t, err, "LoadSnapshot should not error")
quote, err := depth.askTranches.liftAsksByImpactSlippage(tt.ImpactSlippage, tt.ReferencePrice)
@@ -1830,7 +1830,7 @@ func TestGetHeadPrice(t *testing.T) {
if _, err := depth.askTranches.getHeadPriceNoLock(); !errors.Is(err, errNoLiquidity) {
t.Fatalf("received: '%v' but expected: '%v'", err, errNoLiquidity)
}
err := depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
err := depth.LoadSnapshot(bid, ask, 0, time.Now(), time.Now(), true)
if err != nil {
t.Fatalf("failed to load snapshot: %s", err)
}