GateIO: Update websocket orderbook manager (#1989)

* gateio: websocket ob manager fix (cherry-pick me)

* fix(gateio): update websocket orderbook manager to support delay and deadline parameters
    feat(subscriptions): mv ChannelKey type for subscription management from gateio to subscriptions
    test(gateio): enhance tests for orderbook update manager and subscription keys

* ai: nits

* linter: fix

* Fix asset typo and add in case test

* cranktakular: nits

* cranktakular: nits after merge master

* bump time delay for cache

* fix bug where on error it never initiates another orderbook fetch

* lint: fix

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* linter: fix

* Update exchanges/gateio/gateio_wrapper_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/gateio_wrapper.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/gateio_wrapper_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

* Update exchanges/gateio/gateio_wrapper.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

* bossking: nits

* Update exchanges/gateio/ws_ob_update_manager_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

* apply patch

* rm error state as this was on the same thread as cacheStateQueuing and had the potential to drop messages, add tests.

* linter: fix

* mock live request

* misc: fix

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

* field name from mtx -> m

* lint: fix

* race: check fix

* thrasher-: patch adams

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
This commit is contained in:
Ryan O'Hara-Reid
2025-11-27 12:21:17 +11:00
committed by GitHub
parent 719e6bebfe
commit cf54764cb7
11 changed files with 757 additions and 251 deletions

View File

@@ -167,6 +167,8 @@ var (
errInvalidTextPrefix = errors.New("invalid text value, requires prefix `t-`")
errSingleAssetRequired = errors.New("single asset type required")
errTooManyCurrencyCodes = errors.New("too many currency codes supplied")
errFetchingOrderbook = errors.New("error fetching orderbook")
errNoSpotInstrument = errors.New("no spot instrument available")
)
// validTimesInForce holds a list of supported time-in-force values and corresponding string representations.

View File

@@ -1775,6 +1775,8 @@ func TestUpdateTickers(t *testing.T) {
func TestUpdateOrderbook(t *testing.T) {
t.Parallel()
_, err := e.UpdateOrderbook(t.Context(), currency.EMPTYPAIR, 1336)
require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
for _, a := range e.GetAssetTypes(false) {
pair := getPair(t, a)
t.Run(a.String()+" "+pair.String(), func(t *testing.T) {

View File

@@ -983,3 +983,29 @@ func getWSPingHandler(channel string) (websocket.PingHandler, error) {
MessageType: gws.TextMessage,
}, nil
}
// TODO: When subscription config is added for all assets update limits to use sub.Levels
func (e *Exchange) extractOrderbookLimit(a asset.Item) (uint64, error) {
switch a {
case asset.Spot:
sub := e.Websocket.GetSubscription(spotOrderbookUpdateKey)
if sub == nil {
return 0, fmt.Errorf("%w for %q", subscription.ErrNotFound, spotOrderbookUpdateKey)
}
// There is no way to set levels when we subscribe for this specific channel
// Extract limit from interval e.g. 20ms == 20 limit book and 100ms == 100 limit book.
lim := uint64(sub.Interval.Duration().Milliseconds()) //nolint:gosec // No overflow risk
if lim != 20 && lim != 100 {
return 0, fmt.Errorf("%w: %d. Valid limits are 20 and 100", errInvalidOrderbookUpdateInterval, lim)
}
return lim, nil
case asset.USDTMarginedFutures, asset.CoinMarginedFutures:
return futuresOrderbookUpdateLimit, nil
case asset.DeliveryFutures:
return deliveryFuturesUpdateLimit, nil
case asset.Options:
return optionOrderbookUpdateLimit, nil
default:
return 0, fmt.Errorf("%w: %q", asset.ErrNotSupported, a)
}
}

View File

@@ -15,6 +15,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchange/accounts"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
)
func TestGetWSPingHandler(t *testing.T) {
@@ -209,3 +212,42 @@ func checkAccountChange(ctx context.Context, t *testing.T, exch *Exchange, tc *w
assert.Equal(t, bal.Free, stored.Free, "free balance should equal with accounts stored value")
}
}
func TestExtractOrderbookLimit(t *testing.T) {
t.Parallel()
e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Setup must not error")
_, err := e.extractOrderbookLimit(1337)
require.ErrorIs(t, err, asset.ErrNotSupported)
_, err = e.extractOrderbookLimit(asset.Spot)
require.ErrorIs(t, err, subscription.ErrNotFound)
err = e.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.Interval(time.Millisecond * 420)})
require.NoError(t, err)
_, err = e.extractOrderbookLimit(asset.Spot)
require.ErrorIs(t, err, errInvalidOrderbookUpdateInterval)
err = e.Websocket.RemoveSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.Interval(time.Millisecond * 420)})
require.NoError(t, err)
// Add dummy subscription so that it can be matched and a limit/level can be extracted for initial orderbook sync spot.
err = e.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
require.NoError(t, err)
for _, tc := range []struct {
asset asset.Item
exp uint64
}{
{asset: asset.Spot, exp: 100},
{asset: asset.USDTMarginedFutures, exp: futuresOrderbookUpdateLimit},
{asset: asset.CoinMarginedFutures, exp: futuresOrderbookUpdateLimit},
{asset: asset.DeliveryFutures, exp: deliveryFuturesUpdateLimit},
{asset: asset.Options, exp: optionOrderbookUpdateLimit},
} {
limit, err := e.extractOrderbookLimit(tc.asset)
require.NoError(t, err)
require.Equal(t, tc.exp, limit)
}
}

View File

@@ -180,7 +180,7 @@ func (e *Exchange) SetDefaults() {
e.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit
e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout
e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit
e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWsOrderbookUpdateTimeDelay, defaultWSOrderbookUpdateDeadline)
}
// Setup sets user configuration
@@ -619,21 +619,31 @@ func (e *Exchange) UpdateOrderbook(ctx context.Context, p currency.Pair, a asset
// UpdateOrderbookWithLimit updates and returns the orderbook for a currency pair with a set orderbook size limit
func (e *Exchange) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair, a asset.Item, limit uint64) (*orderbook.Book, error) {
book, err := e.fetchOrderbook(ctx, p, a, limit)
if err != nil {
return nil, err
}
if err := book.Process(); err != nil {
return nil, err
}
return orderbook.Get(e.Name, book.Pair, a)
}
func (e *Exchange) fetchOrderbook(ctx context.Context, p currency.Pair, a asset.Item, limit uint64) (*orderbook.Book, error) {
p, err := e.FormatExchangeCurrency(p, a)
if err != nil {
return nil, err
}
var o *Orderbook
switch a {
case asset.Spot, asset.Margin, asset.CrossMargin:
var available bool
available, err = e.checkInstrumentAvailabilityInSpot(p)
if err != nil {
case asset.Margin, asset.CrossMargin:
if available, err := e.checkInstrumentAvailabilityInSpot(p); err != nil {
return nil, err
} else if !available {
return nil, fmt.Errorf("%w: %w for %q %q", errFetchingOrderbook, errNoSpotInstrument, a, p)
}
if a != asset.Spot && !available {
return nil, fmt.Errorf("%v instrument %v does not have orderbook data", a, p)
}
fallthrough
case asset.Spot:
o, err = e.GetOrderbook(ctx, p.String(), "", limit, true)
case asset.CoinMarginedFutures, asset.USDTMarginedFutures:
var settle currency.Code
@@ -653,7 +663,7 @@ func (e *Exchange) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair
return nil, err
}
ob := &orderbook.Book{
return &orderbook.Book{
Exchange: e.Name,
Asset: a,
ValidateOrderbook: e.ValidateOrderbook,
@@ -663,13 +673,7 @@ func (e *Exchange) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair
LastPushed: o.Current.Time(),
Bids: o.Bids.Levels(),
Asks: o.Asks.Levels(),
}
if err := ob.Process(); err != nil {
return nil, err
}
return orderbook.Get(e.Name, p, a)
}, nil
}
// UpdateAccountBalances retrieves currency balances

View File

@@ -1,6 +1,7 @@
package gateio
import (
"fmt"
"testing"
"github.com/gofrs/uuid"
@@ -10,6 +11,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
)
func TestCancelAllOrders(t *testing.T) {
@@ -89,3 +91,63 @@ func BenchmarkMessageID(b *testing.B) {
_ = e.MessageID()
}
}
func TestFetchOrderbook(t *testing.T) {
t.Parallel()
testexch.UpdatePairsOnce(t, e)
availSpot, err := e.GetAvailablePairs(asset.Spot)
require.NoError(t, err, "GetAvailablePairs must not error")
availMargin, err := e.GetAvailablePairs(asset.Margin)
require.NoError(t, err, "GetAvailablePairs must not error")
marginPairNotInSpot, err := availMargin.Remove(availSpot...).GetRandomPair()
require.NoError(t, err, "GetRandomPair must not error")
availOptions, err := e.GetAvailablePairs(asset.Options)
require.NoError(t, err, "GetAvailablePairs must not error")
optionsPair, err := availOptions.GetRandomPair()
require.NoError(t, err, "GetRandomPair must not error")
availDelivery, err := e.GetAvailablePairs(asset.DeliveryFutures)
require.NoError(t, err, "GetAvailablePairs must not error")
deliveryPair, err := availDelivery.GetRandomPair()
require.NoError(t, err, "GetRandomPair must not error")
for _, tc := range []struct {
pair currency.Pair
a asset.Item
err error
}{
{pair: currency.EMPTYPAIR, a: asset.Spot, err: currency.ErrCurrencyPairEmpty},
{pair: marginPairNotInSpot, a: asset.Margin, err: errNoSpotInstrument},
{pair: marginPairNotInSpot, a: asset.Binary, err: asset.ErrNotSupported},
{pair: currency.NewBTCUSDT(), a: asset.Spot},
{pair: currency.NewBTCUSDT(), a: asset.USDTMarginedFutures},
{pair: deliveryPair, a: asset.DeliveryFutures},
{pair: optionsPair, a: asset.Options},
} {
t.Run(fmt.Sprintf("%s-%s: expected err:%v", tc.pair, tc.a, tc.err), func(t *testing.T) {
t.Parallel()
got, err := e.fetchOrderbook(t.Context(), tc.pair, tc.a, 1)
if tc.err != nil {
require.ErrorIs(t, err, tc.err)
return
}
require.NoError(t, err)
assert.Equal(t, e.Name, got.Exchange, "Exchange name should be correct")
assert.True(t, tc.pair.Equal(got.Pair), "Pair should be correct")
assert.Equal(t, tc.a, got.Asset, "Asset should be correct")
assert.LessOrEqual(t, len(got.Asks), 1, "Asks count should not exceed limit, but may be empty especially for options")
assert.LessOrEqual(t, len(got.Bids), 1, "Bids count should not exceed limit, but may be empty especially for options")
assert.NotZero(t, got.LastUpdated, "Last updated timestamp should be set")
assert.NotZero(t, got.LastUpdateID, "Last update ID should be set")
assert.NotZero(t, got.LastPushed, "Last pushed timestamp should be set")
assert.LessOrEqual(t, got.LastUpdated, got.LastPushed, "Last updated timestamp should be before last pushed timestamp")
})
}
}

View File

@@ -1,5 +1,63 @@
{
"routes": {
"/api/v4/spot/order_book": {
"GET": [
{
"data": {
"asks": [
[
"112095.9",
"0.509926"
],
[
"112096",
"0.0439"
],
[
"112097",
"0.0439"
],
[
"112097.9",
"0.044111"
],
[
"112098",
"0.0439"
]
],
"bids": [
[
"112095.8",
"2.255276"
],
[
"112095.7",
"1.490851"
],
[
"112095",
"0.0439"
],
[
"112094.2",
"0.08921"
],
[
"112092.3",
"1.186865"
]
],
"current": 1759104757920,
"id": 27596272445,
"update": 1759104757919
},
"queryString": "currency_pair=BTC_USDT\u0026limit=20\u0026with_id=true",
"bodyParams": "",
"headers": {}
}
]
},
"/futures/usdt/risk_limit_table": {
"GET": [
{

View File

@@ -12,195 +12,254 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/log"
)
const defaultWSSnapshotSyncDelay = 2 * time.Second
var (
errOrderbookSnapshotOutdated = errors.New("orderbook snapshot is outdated")
errPendingUpdatesNotApplied = errors.New("pending updates not applied")
errInvalidOrderbookUpdateInterval = errors.New("invalid orderbook update interval")
var errOrderbookSnapshotOutdated = errors.New("orderbook snapshot is outdated")
defaultWSOrderbookUpdateDeadline = time.Minute * 2
defaultWsOrderbookUpdateTimeDelay = time.Second * 2
spotOrderbookUpdateKey = subscription.MustChannelKey(subscription.OrderbookChannel)
errUnhandledCacheState = errors.New("unhandled cache state")
)
type wsOBUpdateManager struct {
lookup map[key.PairAsset]*updateCache
snapshotSyncDelay time.Duration
mtx sync.RWMutex
lookup map[key.PairAsset]*updateCache
deadline time.Duration
delay time.Duration
m sync.RWMutex
}
type updateCache struct {
updates []pendingUpdate
updating bool
mtx sync.Mutex
updates []pendingUpdate
ch chan int64
m sync.Mutex
state cacheState
}
type cacheState uint32
const (
cacheStateUninitialised cacheState = iota
cacheStateInitialised
cacheStateQueuing
cacheStateSynced
)
type pendingUpdate struct {
update *orderbook.Update
firstUpdateID int64
}
func newWsOBUpdateManager(snapshotSyncDelay time.Duration) *wsOBUpdateManager {
return &wsOBUpdateManager{lookup: make(map[key.PairAsset]*updateCache), snapshotSyncDelay: snapshotSyncDelay}
func newWsOBUpdateManager(delay, deadline time.Duration) *wsOBUpdateManager {
return &wsOBUpdateManager{lookup: make(map[key.PairAsset]*updateCache), deadline: deadline, delay: delay}
}
// ProcessOrderbookUpdate processes an orderbook update by syncing snapshot, caching updates and applying them
func (m *wsOBUpdateManager) ProcessOrderbookUpdate(ctx context.Context, g *Exchange, firstUpdateID int64, update *orderbook.Update) error {
cache := m.LoadCache(update.Pair, update.Asset)
cache.mtx.Lock()
defer cache.mtx.Unlock()
if cache.updating {
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
return nil
}
lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(update.Pair, update.Asset)
if err != nil && !errors.Is(err, orderbook.ErrDepthNotFound) {
return err
}
if lastUpdateID+1 >= firstUpdateID {
return applyOrderbookUpdate(g, update)
}
// Orderbook is behind notifications, therefore Invalidate store
if err := g.Websocket.Orderbook.InvalidateOrderbook(update.Pair, update.Asset); err != nil && !errors.Is(err, orderbook.ErrDepthNotFound) {
return err
}
cache.updating = true
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
go func() {
select {
case <-ctx.Done():
return
case <-time.After(m.snapshotSyncDelay):
if err := cache.SyncOrderbook(ctx, g, update.Pair, update.Asset); err != nil {
g.Websocket.DataHandler <- fmt.Errorf("failed to sync orderbook for %v %v: %w", update.Pair, update.Asset, err)
}
}
}()
return nil
}
// LoadCache loads the cache for the given pair and asset. If the cache does not exist, it creates a new one.
func (m *wsOBUpdateManager) LoadCache(p currency.Pair, a asset.Item) *updateCache {
m.mtx.RLock()
cache, ok := m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
m.mtx.RUnlock()
if !ok {
m.mtx.Lock()
cache = &updateCache{}
m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] = cache
m.mtx.Unlock()
}
return cache
}
// SyncOrderbook fetches and synchronises an orderbook snapshot to the limit size so that pending updates can be
// applied to the orderbook.
func (c *updateCache) SyncOrderbook(ctx context.Context, g *Exchange, pair currency.Pair, a asset.Item) error {
// TODO: When subscription config is added for all assets update limits to use sub.Levels
var limit uint64
switch a {
case asset.Spot:
sub := g.Websocket.GetSubscription(spotOrderbookUpdateKey)
if sub == nil {
return fmt.Errorf("no subscription found for %q", spotOrderbookUpdateKey)
}
// There is no way to set levels when we subscribe for this specific subscription case.
// Extract limit from interval e.g. 20ms == 20 limit book and 100ms == 100 limit book.
limit = uint64(sub.Interval.Duration().Milliseconds()) //nolint:gosec // No overflow risk
case asset.USDTMarginedFutures, asset.USDCMarginedFutures:
limit = futuresOrderbookUpdateLimit
case asset.DeliveryFutures:
limit = deliveryFuturesUpdateLimit
case asset.Options:
limit = optionOrderbookUpdateLimit
}
book, err := g.UpdateOrderbookWithLimit(ctx, pair, a, limit)
c.mtx.Lock() // lock here to prevent ws handle data interference with REST request above
defer func() {
c.updates = nil
c.updating = false
c.mtx.Unlock()
}()
func (m *wsOBUpdateManager) ProcessOrderbookUpdate(ctx context.Context, e *Exchange, firstUpdateID int64, update *orderbook.Update) error {
cache, err := m.LoadCache(update.Pair, update.Asset)
if err != nil {
return err
}
if a != asset.Spot {
if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
cache.m.Lock()
defer cache.m.Unlock()
switch cache.state {
case cacheStateSynced:
return m.applyUpdate(ctx, e, cache, firstUpdateID, update)
case cacheStateInitialised:
m.initialiseOrderbookCache(ctx, e, firstUpdateID, update, cache)
case cacheStateQueuing:
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
select {
case cache.ch <- update.UpdateID: // Notify SyncOrderbook of most recent update ID for inspection
default:
}
} else {
// Spot, Margin, and Cross Margin books are all classified as spot
for i := range standardMarginAssetTypes {
if enabled, _ := g.IsPairEnabled(pair, standardMarginAssetTypes[i]); !enabled {
continue
}
book.Asset = standardMarginAssetTypes[i]
if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
default:
return fmt.Errorf("%w: %d for %v %v", errUnhandledCacheState, cache.state, update.Pair, update.Asset)
}
return nil
}
// applyUpdate verifies and applies an orderbook update
// Invalidates the cache on error
// Does not benefit from concurrent lock protection
func (m *wsOBUpdateManager) applyUpdate(ctx context.Context, e *Exchange, cache *updateCache, firstUpdateID int64, update *orderbook.Update) error {
lastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(update.Pair, update.Asset)
if err != nil {
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", e.Name, update.Pair, update.Asset, err)
return m.invalidateCache(ctx, e, firstUpdateID, update, cache)
}
if lastUpdateID+1 != firstUpdateID {
if e.Verbose { // disconnection will pollute logs
log.Warnf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: desync detected", e.Name, update.Pair, update.Asset)
}
return m.invalidateCache(ctx, e, firstUpdateID, update, cache)
}
if err := e.Websocket.Orderbook.Update(update); err != nil {
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", e.Name, update.Pair, update.Asset, err)
return m.invalidateCache(ctx, e, firstUpdateID, update, cache)
}
return nil
}
// invalidateCache invalidates the existing orderbook, clears the update queue and reinitialises the orderbook cache
// assumes lock already active on cache
func (m *wsOBUpdateManager) invalidateCache(ctx context.Context, e *Exchange, firstUpdateID int64, update *orderbook.Update, cache *updateCache) error {
if err := e.Websocket.Orderbook.InvalidateOrderbook(update.Pair, update.Asset); err != nil {
return err
}
m.initialiseOrderbookCache(ctx, e, firstUpdateID, update, cache)
return nil
}
// initialiseOrderbookCache sets the cache state to queuing, appends the update to the cache and spawns a goroutine
// to fetch and synchronise the orderbook snapshot
// assumes lock already active on cache
func (m *wsOBUpdateManager) initialiseOrderbookCache(ctx context.Context, e *Exchange, firstUpdateID int64, update *orderbook.Update, cache *updateCache) {
cache.state = cacheStateQueuing
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
go func() {
if err := cache.SyncOrderbook(ctx, e, update.Pair, update.Asset, m.delay, m.deadline); err != nil {
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", e.Name, update.Pair, update.Asset, err)
}
}()
}
// LoadCache loads the cache for the given pair and asset. If the cache does not exist, it creates a new one.
func (m *wsOBUpdateManager) LoadCache(p currency.Pair, a asset.Item) (*updateCache, error) {
if p.IsEmpty() {
return nil, currency.ErrCurrencyPairEmpty
}
if !a.IsValid() {
return nil, fmt.Errorf("%w: %q", asset.ErrInvalidAsset, a)
}
m.m.RLock()
cache, ok := m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
m.m.RUnlock()
if !ok {
cache = &updateCache{ch: make(chan int64), state: cacheStateInitialised}
m.m.Lock()
m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] = cache
m.m.Unlock()
}
return cache, nil
}
// SyncOrderbook fetches and synchronises an orderbook snapshot to the limit size so that pending updates can be
// applied to the orderbook.
func (c *updateCache) SyncOrderbook(ctx context.Context, e *Exchange, pair currency.Pair, a asset.Item, delay, deadline time.Duration) error {
limit, err := e.extractOrderbookLimit(a)
if err != nil {
c.clearWithLock()
return err
}
// REST requests can be behind websocket updates by a large margin, so we wait here to allow the cache to fill with
// updates before we fetch the orderbook snapshot.
select {
case <-ctx.Done():
c.clearWithLock()
return ctx.Err()
case <-time.After(delay):
}
// Setting deadline to error out instead of waiting for rate limiter delay which excessively builds a backlog of
// pending updates.
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(deadline))
defer cancel()
book, err := e.fetchOrderbook(ctx, pair, a, limit)
if err != nil {
c.clearWithLock()
return err
}
if err := c.waitForUpdate(ctx, book.LastUpdateID+1); err != nil {
c.clearWithLock()
return err
}
c.m.Lock() // Lock here to prevent ws handle data interference with REST request above.
defer func() {
c.clearNoLock()
c.m.Unlock()
}()
if err := e.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
}
return c.applyPendingUpdates(e)
}
// waitForUpdate waits for an update with an ID >= nextUpdateID
func (c *updateCache) waitForUpdate(ctx context.Context, nextUpdateID int64) error {
c.m.Lock()
updateListLastUpdateID := c.updates[len(c.updates)-1].update.UpdateID
c.m.Unlock()
if updateListLastUpdateID >= nextUpdateID {
return nil
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case recentPendingUpdateID := <-c.ch:
if recentPendingUpdateID >= nextUpdateID {
return nil
}
}
}
return c.applyPendingUpdates(g, a)
}
// ApplyPendingUpdates applies all pending updates to the orderbook
func (c *updateCache) applyPendingUpdates(g *Exchange, a asset.Item) error {
// applyPendingUpdates applies all pending updates to the orderbook
// assumes lock already active on cache
func (c *updateCache) applyPendingUpdates(e *Exchange) error {
var updated bool
for _, data := range c.updates {
lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(data.update.Pair, a)
bookLastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(data.update.Pair, data.update.Asset)
if err != nil {
return err
}
nextID := lastUpdateID + 1
if data.firstUpdateID > nextID {
return errOrderbookSnapshotOutdated
}
if data.update.UpdateID < nextID {
continue // skip updates that are behind the current orderbook
}
if err := applyOrderbookUpdate(g, data.update); err != nil {
return err
}
}
return nil
}
// applyOrderbookUpdate applies an orderbook update to the orderbook
func applyOrderbookUpdate(g *Exchange, update *orderbook.Update) error {
if update.Asset != asset.Spot {
return g.Websocket.Orderbook.Update(update)
}
nextUpdateID := bookLastUpdateID + 1 // From docs: `baseId+1`
for i := range standardMarginAssetTypes {
if enabled, _ := g.IsPairEnabled(update.Pair, standardMarginAssetTypes[i]); !enabled {
// From docs: Dump all notifications which satisfy `u` < `baseId+1`
if data.update.UpdateID < nextUpdateID {
continue
}
update.Asset = standardMarginAssetTypes[i]
if err := g.Websocket.Orderbook.Update(update); err != nil {
pendingFirstUpdateID := data.firstUpdateID // `U`
// From docs: `baseID+1` < first notification `U` current base order book falls behind notifications
if nextUpdateID < pendingFirstUpdateID {
return errOrderbookSnapshotOutdated
}
if err := e.Websocket.Orderbook.Update(data.update); err != nil {
return err
}
updated = true
}
if !updated {
return errPendingUpdatesNotApplied
}
c.state = cacheStateSynced
return nil
}
var spotOrderbookUpdateKey = channelKey{&subscription.Subscription{Channel: subscription.OrderbookChannel}}
var _ subscription.MatchableKey = channelKey{}
type channelKey struct {
*subscription.Subscription
func (c *updateCache) clearWithLock() {
c.m.Lock()
defer c.m.Unlock()
c.clearNoLock()
}
func (k channelKey) Match(eachKey subscription.MatchableKey) bool {
return k.Subscription.Channel == eachKey.GetSubscription().Channel
}
func (k channelKey) GetSubscription() *subscription.Subscription {
return k.Subscription
func (c *updateCache) clearNoLock() {
c.updates = nil
}

View File

@@ -1,6 +1,9 @@
package gateio
import (
"context"
"math"
"sync"
"testing"
"time"
@@ -17,11 +20,17 @@ import (
func TestProcessOrderbookUpdate(t *testing.T) {
t.Parallel()
m := newWsOBUpdateManager(0)
m := newWsOBUpdateManager(time.Millisecond*200, time.Millisecond*200)
err := m.ProcessOrderbookUpdate(t.Context(), e, 1337, &orderbook.Update{})
assert.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
pair := currency.NewPair(currency.BABY, currency.BABYDOGE)
cache, err := m.LoadCache(pair, asset.USDTMarginedFutures)
require.NoError(t, err)
cache.m.Lock()
assert.Equal(t, cacheStateInitialised, cache.state)
cache.m.Unlock()
err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{
Exchange: e.Name,
Pair: pair,
@@ -42,59 +51,103 @@ func TestProcessOrderbookUpdate(t *testing.T) {
UpdateTime: time.Now(),
})
require.NoError(t, err)
// Test orderbook snapshot is behind update
err = m.ProcessOrderbookUpdate(t.Context(), e, 1340, &orderbook.Update{
UpdateID: 1341,
cache.m.Lock()
assert.Equal(t, cacheStateQueuing, cache.state, "state should be queuing after first update")
cache.m.Unlock()
var wg1, wg2 sync.WaitGroup
wg1.Add(1)
wg2.Go(func() {
wg1.Done()
updatedID := <-cache.ch
assert.Equal(t, int64(1339), updatedID, "should ensure update was queued")
})
wg1.Wait()
err = m.ProcessOrderbookUpdate(t.Context(), e, 1337, &orderbook.Update{
UpdateID: 1339,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
wg2.Wait()
require.NoError(t, err)
cache.m.Lock()
assert.Equal(t, cacheStateQueuing, cache.state)
cache.m.Unlock()
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
assert.Eventually(t, func() bool {
cache.m.Lock()
defer cache.m.Unlock()
return cache.state == cacheStateQueuing
}, time.Second, time.Millisecond*10, "sync should eventually fail as BABYBABYDOGE is not a supported pair an error state and forces everything to queue")
cache.mtx.Lock()
assert.Len(t, cache.updates, 1)
assert.True(t, cache.updating)
cache.mtx.Unlock()
// Test orderbook snapshot is behind update
err = m.ProcessOrderbookUpdate(t.Context(), e, 1342, &orderbook.Update{
UpdateID: 1343,
err = m.ProcessOrderbookUpdate(t.Context(), e, 1337, &orderbook.Update{
UpdateID: 1338,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err)
require.NoError(t, err, "ProcessOrderbookUpdate must not error as an error state is recovered")
cache.mtx.Lock()
assert.Len(t, cache.updates, 2)
assert.True(t, cache.updating)
cache.mtx.Unlock()
err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{
Exchange: e.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Level{{Price: 1, Amount: 1}},
Asks: []orderbook.Level{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
LastPushed: time.Now(),
LastUpdateID: 1336,
})
require.NoError(t, err, "LoadSnapshot must not error while ensuring successful processing")
cache.m.Lock()
cache.state = cacheStateSynced
cache.m.Unlock()
err = m.ProcessOrderbookUpdate(t.Context(), e, 1337, &orderbook.Update{
UpdateID: 1338,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err, "ProcessOrderbookUpdate must not error while ensuring successful processing")
time.Sleep(time.Millisecond * 2) // Allow sync delay to pass
cache.mtx.Lock()
assert.Empty(t, cache.updates)
assert.False(t, cache.updating)
cache.mtx.Unlock()
cache.m.Lock()
cache.state = 100
cache.m.Unlock()
err = m.ProcessOrderbookUpdate(t.Context(), e, 1337, &orderbook.Update{
UpdateID: 1339,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.ErrorIs(t, err, errUnhandledCacheState, "ProcessOrderbookUpdate must error due to unhandled state")
}
func TestLoadCache(t *testing.T) {
t.Parallel()
m := newWsOBUpdateManager(0)
pair := currency.NewPair(currency.BABY, currency.BABYDOGE)
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
m := newWsOBUpdateManager(0, 0)
_, err := m.LoadCache(currency.EMPTYPAIR, 1336)
require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
_, err = m.LoadCache(currency.NewBTCUSDT(), 1336)
require.ErrorIs(t, err, asset.ErrInvalidAsset)
cache, err := m.LoadCache(currency.NewBTCUSDT(), asset.USDTMarginedFutures)
require.NoError(t, err, "LoadCache must not error")
assert.NotNil(t, cache)
assert.Len(t, m.lookup, 1)
cache.m.Lock()
assert.Equal(t, cacheStateInitialised, cache.state, "state should be initialised after first load")
cache.m.Unlock()
// Test cache is reused
cache2 := m.LoadCache(pair, asset.USDTMarginedFutures)
assert.Equal(t, cache, cache2)
cache2, err := m.LoadCache(currency.NewBTCUSDT(), asset.USDTMarginedFutures)
require.NoError(t, err, "LoadCache must not error")
assert.Equal(t, cache, cache2, "should be the same cache instance")
}
func TestSyncOrderbook(t *testing.T) {
@@ -102,37 +155,67 @@ func TestSyncOrderbook(t *testing.T) {
e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Setup must not error")
require.NoError(t, e.UpdateTradablePairs(t.Context()))
cache := &updateCache{}
pair := currency.NewPair(currency.ETH, currency.USDT)
err := cache.SyncOrderbook(t.Context(), e, pair, asset.Spot, 0, defaultWSOrderbookUpdateDeadline)
require.ErrorIs(t, err, subscription.ErrNotFound)
// Add dummy subscription so that it can be matched and a limit/level can be extracted for initial orderbook sync spot.
err := e.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
err = e.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
require.NoError(t, err)
m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
ctxCancel, cancel := context.WithCancel(t.Context())
cancel()
err = cache.SyncOrderbook(ctxCancel, e, pair, asset.Spot, 0, defaultWSOrderbookUpdateDeadline)
require.ErrorIs(t, err, context.Canceled)
for _, a := range []asset.Item{asset.Spot, asset.USDTMarginedFutures} {
pair := currency.NewPair(currency.ETH, currency.USDT)
err := e.CurrencyPairs.EnablePair(a, pair)
require.NoError(t, err)
cache := m.LoadCache(pair, a)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: asset.Spot}}}
err = cache.SyncOrderbook(t.Context(), e, pair, asset.Spot, 0, 0)
require.ErrorIs(t, err, context.DeadlineExceeded)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: a}}}
cache.updating = true
err = cache.SyncOrderbook(t.Context(), e, pair, a)
require.NoError(t, err)
require.False(t, cache.updating)
require.Empty(t, cache.updates)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: asset.Spot}}}
err = cache.SyncOrderbook(t.Context(), e, pair, asset.Spot, 0, time.Second)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
expectedLimit := 20
if a == asset.Spot {
expectedLimit = 100
}
err = e.Base.SetPairs([]currency.Pair{pair}, asset.Spot, true)
require.NoError(t, err)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: asset.Spot, UpdateID: math.MaxInt64}}}
err = cache.SyncOrderbook(t.Context(), e, pair, asset.Spot, 0, time.Second)
require.ErrorIs(t, err, orderbook.ErrOrderbookInvalid)
b, err := e.Websocket.Orderbook.GetOrderbook(pair, a)
require.NoError(t, err)
require.Len(t, b.Bids, expectedLimit)
require.Len(t, b.Asks, expectedLimit)
err = e.Base.SetPairs([]currency.Pair{pair}, asset.USDTMarginedFutures, true)
require.NoError(t, err)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: asset.USDTMarginedFutures, UpdateID: math.MaxInt64}}}
err = cache.SyncOrderbook(t.Context(), e, pair, asset.USDTMarginedFutures, 0, time.Second)
require.ErrorIs(t, err, orderbook.ErrOrderbookInvalid)
}
func TestWaitForUpdate(t *testing.T) {
t.Parallel()
cache := &updateCache{
updates: []pendingUpdate{
{update: &orderbook.Update{Pair: currency.NewBTCUSD(), Asset: asset.Spot, UpdateID: 1337, AllowEmpty: true, UpdateTime: time.Now()}},
},
}
err := cache.waitForUpdate(t.Context(), 1337)
require.NoError(t, err)
ctx, cancel := context.WithDeadline(t.Context(), time.Now())
defer cancel()
err = cache.waitForUpdate(ctx, 1338)
require.ErrorIs(t, err, context.DeadlineExceeded)
cache.ch = make(chan int64, 1) // Reset channel to avoid deadlock
var wg sync.WaitGroup
wg.Go(func() {
err = cache.waitForUpdate(t.Context(), 1338)
})
cache.ch <- 1338
wg.Wait()
assert.NoError(t, err)
}
func TestApplyPendingUpdates(t *testing.T) {
@@ -140,11 +223,12 @@ func TestApplyPendingUpdates(t *testing.T) {
e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Setup must not error")
require.NoError(t, e.UpdateTradablePairs(t.Context()))
m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
pair := currency.NewPair(currency.LTC, currency.USDT)
err := e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{
cache := &updateCache{updates: []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: asset.USDTMarginedFutures}}}}
err := cache.applyPendingUpdates(e)
require.ErrorIs(t, err, orderbook.ErrDepthNotFound)
dummy := &orderbook.Book{
Exchange: e.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
@@ -153,52 +237,169 @@ func TestApplyPendingUpdates(t *testing.T) {
LastUpdated: time.Now(),
LastPushed: time.Now(),
LastUpdateID: 1335,
})
require.NoError(t, err)
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
update := &orderbook.Update{
UpdateID: 1339,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
}
cache.updates = []pendingUpdate{{update: update, firstUpdateID: 1337}}
err = cache.applyPendingUpdates(e, asset.USDTMarginedFutures)
err = e.Websocket.Orderbook.LoadSnapshot(dummy)
require.NoError(t, err)
err = cache.applyPendingUpdates(e)
require.ErrorIs(t, err, errPendingUpdatesNotApplied)
cache.updates[0].firstUpdateID = 1337
cache.updates[0].update.UpdateID = 1338
err = cache.applyPendingUpdates(e)
require.ErrorIs(t, err, errOrderbookSnapshotOutdated)
cache.updates[0].firstUpdateID = 1336
err = cache.applyPendingUpdates(e, asset.USDTMarginedFutures)
cache.updates[0].update.UpdateID = 1338
err = cache.applyPendingUpdates(e)
require.ErrorIs(t, err, orderbook.ErrOrderbookInvalid)
err = e.Websocket.Orderbook.LoadSnapshot(dummy)
require.NoError(t, err)
cache.updates[0].update.AllowEmpty = true
cache.updates[0].update.UpdateTime = time.Now()
err = cache.applyPendingUpdates(e)
require.NoError(t, err)
cache.updates[0].firstUpdateID = 1339
cache.updates[0].update.UpdateID = 1342
cache.updates = append(cache.updates, pendingUpdate{
firstUpdateID: 1344,
update: &orderbook.Update{Pair: pair, Asset: asset.USDTMarginedFutures, UpdateID: 1345, AllowEmpty: true, UpdateTime: time.Now()},
})
err = cache.applyPendingUpdates(e)
require.ErrorIs(t, err, errOrderbookSnapshotOutdated)
}
func TestApplyOrderbookUpdate(t *testing.T) {
func TestClearWithLock(t *testing.T) {
t.Parallel()
cache := &updateCache{updates: []pendingUpdate{{update: &orderbook.Update{}}}}
cache.clearWithLock()
require.Empty(t, cache.updates)
}
func TestClearNoLock(t *testing.T) {
t.Parallel()
cache := &updateCache{updates: []pendingUpdate{{update: &orderbook.Update{}}}}
cache.clearNoLock()
require.Empty(t, cache.updates)
}
func TestApplyUpdate(t *testing.T) {
t.Parallel()
e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Setup must not error")
require.NoError(t, e.UpdateTradablePairs(t.Context()))
err := testexch.Setup(e)
require.NoError(t, err, "Setup must not error")
e.Name = "ApplyUpdateTest"
pair := currency.NewBTCUSDT()
m := newWsOBUpdateManager(0, 0)
cache, err := m.LoadCache(currency.NewBTCUSDT(), asset.USDTMarginedFutures)
require.NoError(t, err, "LoadCache must not error")
update := &orderbook.Update{
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
cache.m.Lock()
err = m.applyUpdate(t.Context(), e, cache, 1, &orderbook.Update{
Pair: currency.NewBTCUSDT(),
Asset: asset.USDTMarginedFutures,
})
cache.m.Unlock()
require.ErrorIs(t, err, orderbook.ErrDepthNotFound, "applyUpdate must error when not initialised")
snapshot := &orderbook.Book{
Exchange: e.Name,
Pair: currency.NewBTCUSDT(),
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Level{{Price: 1, Amount: 1}},
Asks: []orderbook.Level{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
LastPushed: time.Now(),
LastUpdateID: 1336,
}
err := applyOrderbookUpdate(e, update)
require.ErrorIs(t, err, orderbook.ErrDepthNotFound)
update.Asset = asset.Spot
err = applyOrderbookUpdate(e, update)
require.ErrorIs(t, err, orderbook.ErrDepthNotFound)
update.Pair = currency.NewPair(currency.BABY, currency.BABYDOGE)
err = applyOrderbookUpdate(e, update)
err = e.Websocket.Orderbook.LoadSnapshot(snapshot)
require.NoError(t, err)
cache.m.Lock()
err = m.applyUpdate(t.Context(), e, cache, 1, &orderbook.Update{
UpdateID: 1338,
Pair: currency.NewBTCUSDT(),
Asset: asset.USDTMarginedFutures,
})
cache.m.Unlock()
require.NoError(t, err, "applyUpdate must not error when desynced")
_, err = e.Websocket.Orderbook.LastUpdateID(currency.NewBTCUSDT(), asset.USDTMarginedFutures)
require.ErrorIs(t, err, orderbook.ErrOrderbookInvalid, "LastUpdateID must error after invalidateCache is called")
err = e.Websocket.Orderbook.LoadSnapshot(snapshot)
require.NoError(t, err)
cache.m.Lock()
err = m.applyUpdate(t.Context(), e, cache, 1337, &orderbook.Update{
UpdateID: 1339,
Pair: currency.NewBTCUSDT(),
Asset: asset.USDTMarginedFutures,
})
cache.m.Unlock()
require.NoError(t, err, "applyUpdate must not error when in sync but update failed to apply")
_, err = e.Websocket.Orderbook.LastUpdateID(currency.NewBTCUSDT(), asset.USDTMarginedFutures)
require.ErrorIs(t, err, orderbook.ErrOrderbookInvalid, "LastUpdateID must error after invalidateCache is called")
err = e.Websocket.Orderbook.LoadSnapshot(snapshot)
require.NoError(t, err)
cache.m.Lock()
err = m.applyUpdate(t.Context(), e, cache, 1337, &orderbook.Update{
UpdateID: 1338,
Pair: currency.NewBTCUSDT(),
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
})
cache.m.Unlock()
require.NoError(t, err, "applyUpdate must not error when in sync and update applied")
}
func TestOBManagerProcessOrderbookUpdateHTTPMocked(t *testing.T) {
t.Parallel()
e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Setup must not error")
e.Name = "ManagerHTTPMocked"
err := testexch.MockHTTPInstance(e, "/api/v4/")
require.NoError(t, err, "MockHTTPInstance must not error")
// Add dummy subscription so that it can be matched and a limit/level can be extracted for initial orderbook sync spot.
err = e.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.TwentyMilliseconds})
require.NoError(t, err)
m := newWsOBUpdateManager(0, defaultWSOrderbookUpdateDeadline)
err = m.ProcessOrderbookUpdate(t.Context(), e, 27596272446, &orderbook.Update{
UpdateID: 27596272447,
Pair: currency.NewBTCUSDT(),
Asset: asset.Spot,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err, "ProcessOrderbookUpdate must not error")
// Wait for the background sync goroutine to complete and orderbook to be synced
require.Eventually(t, func() bool {
_, err := e.Websocket.Orderbook.LastUpdateID(currency.NewBTCUSDT(), asset.Spot)
return err == nil
}, time.Second*5, time.Millisecond*50, "orderbook must eventually be synced")
err = m.ProcessOrderbookUpdate(t.Context(), e, 27596272448, &orderbook.Update{
UpdateID: 27596272449,
Pair: currency.NewBTCUSDT(),
Asset: asset.Spot,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err, "ProcessOrderbookUpdate must not error on synced orderbook")
id, err := e.Websocket.Orderbook.LastUpdateID(currency.NewBTCUSDT(), asset.Spot)
require.NoError(t, err, "LastUpdateID must not error")
assert.Equal(t, int64(27596272449), id, "LastUpdateID should be updated to orderbook.Update.UpdateID")
}

View File

@@ -122,3 +122,29 @@ func (k IgnoringAssetKey) Match(eachKey MatchableKey) bool {
eachSub.Levels == k.Levels &&
eachSub.Interval == k.Interval
}
// ChannelKey is a key type for finding a single subscription by its channel, this will match first found.
// For use with exchange websocket method GetSubscription.
type ChannelKey struct {
*Subscription
}
var _ MatchableKey = ChannelKey{} // Enforce ChannelKey must implement MatchableKey
// MustChannelKey is a helper function to create a ChannelKey from a subscription channel
func MustChannelKey(channel string) ChannelKey {
if channel == "" {
panic("channel must not be empty")
}
return ChannelKey{Subscription: &Subscription{Channel: channel}}
}
// Match implements MatchableKey
func (k ChannelKey) Match(eachKey MatchableKey) bool {
return k.Subscription.Channel == eachKey.GetSubscription().Channel
}
// GetSubscription returns the underlying subscription
func (k ChannelKey) GetSubscription() *Subscription {
return k.Subscription
}

View File

@@ -150,3 +150,27 @@ func TestGetSubscription(t *testing.T) {
assert.Same(t, s, ExactKey{s}.GetSubscription(), "ExactKey.GetSubscription Must return a pointer to the subscription")
assert.Same(t, s, IgnoringPairsKey{s}.GetSubscription(), "IgnorePairKeys.GetSubscription Must return a pointer to the subscription")
}
func TestMustChannelKey(t *testing.T) {
t.Parallel()
require.Panics(t, func() { MustChannelKey("") }, "no channel string must panic")
key := MustChannelKey(TickerChannel)
assert.Equal(t, TickerChannel, key.Subscription.Channel)
}
func TestChannelKeyMatch(t *testing.T) {
t.Parallel()
key := ChannelKey{&Subscription{Channel: TickerChannel}}
try := &DummyKey{&Subscription{Channel: OrderbookChannel}, t}
require.Panics(t, func() { key.Match(nil) }, "Match on a nil must panic")
require.False(t, key.Match(try), "Match must reject a different channel")
try.Channel = TickerChannel
assert.True(t, key.Match(try), "Match should accept an identical channel")
}
func TestChannelKeyGetSubscription(t *testing.T) {
t.Parallel()
key := ChannelKey{&Subscription{Channel: TickerChannel}}
assert.Same(t, key.Subscription, key.GetSubscription(), "GetSubscription should return the underlying subscription")
}