exchanges: Rename UpdatePushedAt field to LastPushed and use field in gateio REST books (#1917)

* Set update pushed at time and general clean

* after merge fix

* gk: nits

* Update exchanges/gateio/gateio_types.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/gateio_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* boss: nits

* sneaky boss attack: nits

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
This commit is contained in:
Ryan O'Hara-Reid
2025-06-13 17:24:43 +10:00
committed by GitHub
parent ef8cb7b1e7
commit 062ee2a77e
14 changed files with 121 additions and 118 deletions

View File

@@ -301,7 +301,7 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
holder.updateID = book.LastUpdateID
err = holder.ob.LoadSnapshot(book.Bids, book.Asks, book.LastUpdateID, book.LastUpdated, book.UpdatePushedAt, false)
err = holder.ob.LoadSnapshot(book.Bids, book.Asks, book.LastUpdateID, book.LastUpdated, book.LastPushed, false)
if err != nil {
return err
}

View File

@@ -721,24 +721,24 @@ func (by *Bybit) wsProcessOrderbook(assetType asset.Item, resp *WebsocketRespons
if resp.Type == "snapshot" {
return by.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Pair: cp,
Exchange: by.Name,
Asset: assetType,
LastUpdated: resp.OrderbookLastUpdated.Time(),
LastUpdateID: result.UpdateID,
UpdatePushedAt: resp.PushTimestamp.Time(),
Asks: asks,
Bids: bids,
Pair: cp,
Exchange: by.Name,
Asset: assetType,
LastUpdated: resp.OrderbookLastUpdated.Time(),
LastUpdateID: result.UpdateID,
LastPushed: resp.PushTimestamp.Time(),
Asks: asks,
Bids: bids,
})
}
return by.Websocket.Orderbook.Update(&orderbook.Update{
Pair: cp,
Asks: asks,
Bids: bids,
Asset: assetType,
UpdateID: result.UpdateID,
UpdateTime: resp.OrderbookLastUpdated.Time(),
UpdatePushedAt: resp.PushTimestamp.Time(),
Pair: cp,
Asks: asks,
Bids: bids,
Asset: assetType,
UpdateID: result.UpdateID,
UpdateTime: resp.OrderbookLastUpdated.Time(),
LastPushed: resp.PushTimestamp.Time(),
})
}

View File

@@ -410,11 +410,10 @@ func (g *Gateio) GetOrderbook(ctx context.Context, pairString, interval string,
}
params.Set("with_id", strconv.FormatBool(withOrderbookID))
var response *OrderbookData
err := g.SendHTTPRequest(ctx, exchange.RestSpot, publicOrderbookSpotEPL, common.EncodeURLValues(gateioSpotOrderbook, params), &response)
if err != nil {
if err := g.SendHTTPRequest(ctx, exchange.RestSpot, publicOrderbookSpotEPL, common.EncodeURLValues(gateioSpotOrderbook, params), &response); err != nil {
return nil, err
}
return response.MakeOrderbook()
return response.MakeOrderbook(), nil
}
// GetMarketTrades retrieve market trades

View File

@@ -1802,8 +1802,16 @@ func TestUpdateTickers(t *testing.T) {
func TestUpdateOrderbook(t *testing.T) {
t.Parallel()
for _, a := range g.GetAssetTypes(false) {
_, err := g.UpdateOrderbook(t.Context(), getPair(t, a), a)
assert.NoErrorf(t, err, "UpdateOrderbook should not error for %s", a)
pair := getPair(t, a)
t.Run(a.String()+" "+pair.String(), func(t *testing.T) {
t.Parallel()
o, err := g.UpdateOrderbook(t.Context(), pair, a)
require.NoError(t, err)
if a != asset.Options { // Options orderbooks can be empty
assert.NotEmpty(t, o.Bids)
assert.NotEmpty(t, o.Asks)
}
})
}
}

View File

@@ -526,30 +526,25 @@ type OrderbookData struct {
Bids [][2]types.Number `json:"bids"`
}
// MakeOrderbook parse Orderbook asks/bids Price and Amount and create an Orderbook Instance with asks and bids data in []OrderbookItem.
func (a *OrderbookData) MakeOrderbook() (*Orderbook, error) {
ob := &Orderbook{
ID: a.ID,
Current: a.Current,
Update: a.Update,
Asks: make([]OrderbookItem, len(a.Asks)),
Bids: make([]OrderbookItem, len(a.Bids)),
}
// MakeOrderbook converts OrderbookData into an Orderbook
func (a *OrderbookData) MakeOrderbook() *Orderbook {
asks := make([]OrderbookItem, len(a.Asks))
for x := range a.Asks {
ob.Asks[x].Price = a.Asks[x][0]
ob.Asks[x].Amount = a.Asks[x][1].Float64()
asks[x].Price = a.Asks[x][0]
asks[x].Amount = a.Asks[x][1]
}
bids := make([]OrderbookItem, len(a.Bids))
for x := range a.Bids {
ob.Bids[x].Price = a.Bids[x][0]
ob.Bids[x].Amount = a.Bids[x][1].Float64()
bids[x].Price = a.Bids[x][0]
bids[x].Amount = a.Bids[x][1]
}
return ob, nil
return &Orderbook{ID: a.ID, Current: a.Current, Update: a.Update, Asks: asks, Bids: bids}
}
// OrderbookItem stores an orderbook item
type OrderbookItem struct {
Price types.Number `json:"p"`
Amount float64 `json:"s"`
Amount types.Number `json:"s"`
}
// Orderbook stores the orderbook data

View File

@@ -353,23 +353,23 @@ func (g *Gateio) processCandlestick(incoming []byte) error {
return nil
}
func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Time) error {
func (g *Gateio) processOrderbookTicker(incoming []byte, lastPushed time.Time) error {
var data WsOrderbookTickerData
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
return g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: data.Pair,
Asset: asset.Spot,
LastUpdated: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
Bids: []orderbook.Tranche{{Price: data.BestBidPrice.Float64(), Amount: data.BestBidAmount.Float64()}},
Asks: []orderbook.Tranche{{Price: data.BestAskPrice.Float64(), Amount: data.BestAskAmount.Float64()}},
Exchange: g.Name,
Pair: data.Pair,
Asset: asset.Spot,
LastUpdated: data.UpdateTime.Time(),
LastPushed: lastPushed,
Bids: []orderbook.Tranche{{Price: data.BestBidPrice.Float64(), Amount: data.BestBidAmount.Float64()}},
Asks: []orderbook.Tranche{{Price: data.BestAskPrice.Float64(), Amount: data.BestAskAmount.Float64()}},
})
}
func (g *Gateio) processOrderbookUpdate(ctx context.Context, incoming []byte, updatePushedAt time.Time) error {
func (g *Gateio) processOrderbookUpdate(ctx context.Context, incoming []byte, lastPushed time.Time) error {
var data WsOrderbookUpdate
if err := json.Unmarshal(incoming, &data); err != nil {
return err
@@ -385,18 +385,18 @@ func (g *Gateio) processOrderbookUpdate(ctx context.Context, incoming []byte, up
bids[x].Amount = data.Bids[x][1].Float64()
}
return g.wsOBUpdateMgr.ProcessOrderbookUpdate(ctx, g, data.FirstUpdateID, &orderbook.Update{
UpdateID: data.LastUpdateID,
UpdateTime: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
Pair: data.Pair,
Asset: asset.Spot,
Asks: asks,
Bids: bids,
AllowEmpty: true,
UpdateID: data.LastUpdateID,
UpdateTime: data.UpdateTime.Time(),
LastPushed: lastPushed,
Pair: data.Pair,
Asset: asset.Spot,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.Time) error {
func (g *Gateio) processOrderbookSnapshot(incoming []byte, lastPushed time.Time) error {
var data WsOrderbookSnapshot
if err := json.Unmarshal(incoming, &data); err != nil {
return err
@@ -416,13 +416,13 @@ func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.T
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
if err := g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: data.CurrencyPair,
Asset: a,
LastUpdated: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
Bids: bids,
Asks: asks,
Exchange: g.Name,
Pair: data.CurrencyPair,
Asset: a,
LastUpdated: data.UpdateTime.Time(),
LastPushed: lastPushed,
Bids: bids,
Asks: asks,
}); err != nil {
return err
}

View File

@@ -423,18 +423,18 @@ func (g *Gateio) processFuturesOrderbookUpdate(ctx context.Context, incoming []b
}
return g.wsOBUpdateMgr.ProcessOrderbookUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
UpdatePushedAt: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
LastPushed: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, updatePushedAt time.Time) error {
func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, lastPushed time.Time) error {
if event == "all" {
var data WsFuturesOrderbookSnapshot
err := json.Unmarshal(incoming, &data)
@@ -446,7 +446,7 @@ func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte,
Exchange: g.Name,
Pair: data.Contract,
LastUpdated: data.Timestamp.Time(),
UpdatePushedAt: updatePushedAt,
LastPushed: lastPushed,
VerifyOrderbook: g.CanVerifyOrderbook,
}
base.Asks = make([]orderbook.Tranche, len(data.Asks))
@@ -501,8 +501,8 @@ func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte,
Asset: assetType,
Exchange: g.Name,
Pair: currencyPair,
LastUpdated: updatePushedAt,
UpdatePushedAt: updatePushedAt,
LastUpdated: lastPushed,
LastPushed: lastPushed,
VerifyOrderbook: g.CanVerifyOrderbook,
})
if err != nil {

View File

@@ -514,18 +514,18 @@ func (g *Gateio) processOptionsOrderbookUpdate(ctx context.Context, incoming []b
bids[x].Amount = data.Bids[x].Size
}
return g.wsOBUpdateMgr.ProcessOrderbookUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
UpdatePushedAt: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
LastPushed: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, updatePushedAt time.Time) error {
func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, lastPushed time.Time) error {
if event == "all" {
var data WsOptionsOrderbookSnapshot
err := json.Unmarshal(incoming, &data)
@@ -537,7 +537,7 @@ func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming
Exchange: g.Name,
Pair: data.Contract,
LastUpdated: data.Timestamp.Time(),
UpdatePushedAt: updatePushedAt,
LastPushed: lastPushed,
VerifyOrderbook: g.CanVerifyOrderbook,
}
base.Asks = make([]orderbook.Tranche, len(data.Asks))
@@ -590,8 +590,8 @@ func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming
Asset: asset.Options,
Exchange: g.Name,
Pair: currencyPair,
LastUpdated: updatePushedAt,
UpdatePushedAt: updatePushedAt,
LastUpdated: lastPushed,
LastPushed: lastPushed,
VerifyOrderbook: g.CanVerifyOrderbook,
})
if err != nil {

View File

@@ -695,18 +695,19 @@ func (g *Gateio) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair,
Pair: p.Upper(),
LastUpdateID: o.ID,
LastUpdated: o.Update.Time(),
LastPushed: o.Current.Time(),
}
book.Bids = make(orderbook.Tranches, len(o.Bids))
for x := range o.Bids {
book.Bids[x] = orderbook.Tranche{
Amount: o.Bids[x].Amount,
Amount: o.Bids[x].Amount.Float64(),
Price: o.Bids[x].Price.Float64(),
}
}
book.Asks = make(orderbook.Tranches, len(o.Asks))
for x := range o.Asks {
book.Asks[x] = orderbook.Tranche{
Amount: o.Asks[x].Amount,
Amount: o.Asks[x].Amount.Float64(),
Price: o.Asks[x].Price.Float64(),
}
}

View File

@@ -24,14 +24,14 @@ func TestProcessOrderbookUpdate(t *testing.T) {
pair := currency.NewPair(currency.BABY, currency.BABYDOGE)
err = g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
UpdatePushedAt: time.Now(),
LastUpdateID: 1336,
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
LastPushed: time.Now(),
LastUpdateID: 1336,
})
require.NoError(t, err)
@@ -146,14 +146,14 @@ func TestApplyPendingUpdates(t *testing.T) {
m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
pair := currency.NewPair(currency.LTC, currency.USDT)
err := g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
UpdatePushedAt: time.Now(),
LastUpdateID: 1335,
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
LastPushed: time.Now(),
LastUpdateID: 1335,
})
require.NoError(t, err)

View File

@@ -78,7 +78,7 @@ func (d *Depth) Retrieve() (*Base, error) {
Asset: d.asset,
Pair: d.pair,
LastUpdated: d.lastUpdated,
UpdatePushedAt: d.updatePushedAt,
LastPushed: d.lastPushed,
InsertedAt: d.insertedAt,
LastUpdateID: d.lastUpdateID,
PriceDuplication: d.priceDuplication,
@@ -92,7 +92,7 @@ func (d *Depth) Retrieve() (*Base, error) {
}
// LoadSnapshot flushes the bids and asks with a snapshot
func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated, updatePushedAt time.Time, updateByREST bool) error {
func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdated, lastPushed time.Time, updateByREST bool) error {
d.m.Lock()
defer d.m.Unlock()
if lastUpdated.IsZero() {
@@ -104,7 +104,7 @@ func (d *Depth) LoadSnapshot(bids, asks []Tranche, lastUpdateID int64, lastUpdat
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.updatePushedAt = updatePushedAt
d.lastPushed = lastPushed
d.insertedAt = time.Now()
d.restSnapshot = updateByREST
d.bidTranches.load(bids)
@@ -387,7 +387,7 @@ func (d *Depth) TotalAskAmounts() (liquidity, value float64, err error) {
func (d *Depth) updateAndAlert(update *Update) {
d.lastUpdateID = update.UpdateID
d.lastUpdated = update.UpdateTime
d.updatePushedAt = update.UpdatePushedAt
d.lastPushed = update.LastPushed
d.insertedAt = time.Now()
d.Alert()
}

View File

@@ -74,7 +74,7 @@ func TestRetrieve(t *testing.T) {
pair: currency.NewPair(currency.THETA, currency.USD),
asset: asset.DownsideProfitContract,
lastUpdated: time.Now(),
updatePushedAt: time.Now(),
lastPushed: time.Now(),
insertedAt: time.Now(),
lastUpdateID: 1337,
priceDuplication: true,
@@ -102,7 +102,7 @@ func TestRetrieve(t *testing.T) {
assert.Equal(t, currency.NewPair(currency.THETA, currency.USD), ob.Pair, "Should have correct Pair")
assert.Equal(t, asset.DownsideProfitContract, ob.Asset, "Should have correct Asset")
assert.Equal(t, d.options.lastUpdated, ob.LastUpdated, "Should have correct LastUpdated")
assert.Equal(t, d.options.updatePushedAt, ob.UpdatePushedAt, "Should have correct UpdatePushedAt")
assert.Equal(t, d.options.lastPushed, ob.LastPushed, "Should have correct LastPushed")
assert.Equal(t, d.options.insertedAt, ob.InsertedAt, "Should have correct InsertedAt")
assert.EqualValues(t, 1337, ob.LastUpdateID, "Should have correct LastUpdateID")
assert.True(t, ob.PriceDuplication, "Should have correct PriceDuplication")

View File

@@ -52,7 +52,7 @@ func (s *store) Update(b *Base) error {
return err
}
}
if err := book.Depth.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.UpdatePushedAt, true); err != nil {
if err := book.Depth.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.LastPushed, true); err != nil {
return err
}
return s.signalMux.Publish(book.Depth, book.RouterID)

View File

@@ -94,15 +94,15 @@ type Base struct {
// which could be stale if there have been no recent changes.
LastUpdated time.Time
// UpdatePushedAt is the time the exchange pushed this update. This helps
// LastPushed is the time the exchange pushed this update. This helps
// determine factors like distance from exchange (latency) and routing
// time, which can affect the time it takes for an update to reach the user
// from the exchange.
UpdatePushedAt time.Time
LastPushed time.Time
// InsertedAt is the time the update was inserted into the orderbook
// management system. This field is used to calculate round-trip times and
// processing delays, e.g., InsertedAt.Sub(UpdatePushedAt) represents the
// processing delays, e.g., InsertedAt.Sub(LastPushed) represents the
// total processing time including network latency.
InsertedAt time.Time
@@ -136,7 +136,7 @@ type options struct {
pair currency.Pair
asset asset.Item
lastUpdated time.Time
updatePushedAt time.Time
lastPushed time.Time
insertedAt time.Time
lastUpdateID int64
priceDuplication bool
@@ -166,10 +166,10 @@ const (
// Update and things and stuff
type Update struct {
UpdateID int64
UpdateTime time.Time
UpdatePushedAt time.Time
Asset asset.Item
UpdateID int64
UpdateTime time.Time
LastPushed time.Time
Asset asset.Item
Action
Bids []Tranche
Asks []Tranche