From b2817595730a734a893cd6e1153691457cb80829 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 23 May 2025 17:29:39 +1000 Subject: [PATCH] gateio: Fix websocket orderbook incremental updates (#1863) * gateio: Add websocket orderbook update manager * RM println * glorious: nit * Adds delivery futures update processing as well * Change to const value for delivery * Drop check out of order, can reinstate if required. * Adds in validation methods to ensure config changes are correct when expanding templates and return errors with correct info if not. * fix some things and add in todo when this gets updated * fix spelling * linter: fix * gk: initial nits * gk: nits shift to template only verification with funcmap, rm interface for single sub checking. * rm unused error * linter: fix * update to const frequency * gk: wrap with panic and single invocation in template, change name * gk: nits to check across stored subs with incoming subs * linter: fix * updates names, makes things slightly more efficient and adds tests * linter: fix * gk: sexc patch v2 * glorious: nits * gk: nits * Update exchanges/subscription/template.go Co-authored-by: Gareth Kirwan * gk: nits * linter: make peace with linter regulations * glorious: Add TODO for future template integration * glorious: commentary nits * fix name * give me a break, have a kit kat * revert whoops * update wording on comment * revert secondary call to expand templates and update tests * misc lint: fix * Add spot orderbook update interval for 20ms, expand tests, piggy back limit/level off loaded subscription. Thanks to @thrasher- * linter/spell: fix * ai nits: drop go routine on mtx RUnlock * Update exchanges/gateio/ws_ob_update_manager.go Co-authored-by: Gareth Kirwan * gk: revert to 100ms from 20ms waiting for config upgrade patch * test: fix * cranktakular: nits * strings quoted in fmt call * thrasher-: nits * Update exchanges/gateio/gateio_test.go Co-authored-by: Gareth Kirwan * Update exchanges/gateio/gateio_test.go Co-authored-by: Gareth Kirwan * gk: nits --------- Co-authored-by: Ryan O'Hara-Reid Co-authored-by: Gareth Kirwan --- exchange/websocket/buffer/buffer.go | 41 +++- exchange/websocket/buffer/buffer_test.go | 53 +++- exchanges/gateio/gateio.go | 49 ++-- exchanges/gateio/gateio_test.go | 225 ++++++++++++++++- exchanges/gateio/gateio_types.go | 40 ++- exchanges/gateio/gateio_websocket.go | 228 +++++++++++++----- .../gateio_websocket_delivery_futures.go | 20 +- exchanges/gateio/gateio_websocket_futures.go | 71 +++--- exchanges/gateio/gateio_websocket_option.go | 43 +++- .../gateio_websocket_request_spot_test.go | 4 +- exchanges/gateio/gateio_wrapper.go | 14 +- exchanges/gateio/ws_ob_update_manager.go | 207 ++++++++++++++++ exchanges/gateio/ws_ob_update_manager_test.go | 205 ++++++++++++++++ exchanges/kline/kline.go | 6 + exchanges/kline/kline_test.go | 156 +++--------- exchanges/kline/kline_types.go | 71 +++--- exchanges/orderbook/orderbook_types.go | 4 +- exchanges/subscription/fixtures_test.go | 30 +++ exchanges/subscription/list.go | 1 + exchanges/subscription/subscription.go | 24 +- exchanges/subscription/template.go | 18 +- exchanges/subscription/template_test.go | 8 + 22 files changed, 1145 insertions(+), 373 deletions(-) create mode 100644 exchanges/gateio/ws_ob_update_manager.go create mode 100644 exchanges/gateio/ws_ob_update_manager_test.go diff --git a/exchange/websocket/buffer/buffer.go b/exchange/websocket/buffer/buffer.go index c5f6a7cd..e2e1ac79 100644 --- a/exchange/websocket/buffer/buffer.go +++ b/exchange/websocket/buffer/buffer.go @@ -15,6 +15,11 @@ import ( const packageError = "websocket orderbook buffer error: %w" +// Public err vars +var ( + ErrDepthNotFound = errors.New("orderbook depth not found") +) + var ( errExchangeConfigNil = errors.New("exchange config is nil") errBufferConfigNil = errors.New("buffer config is nil") @@ -22,7 +27,6 @@ var ( errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set") errUpdateIsNil = errors.New("update is nil") errUpdateNoTargets = errors.New("update bid/ask targets cannot be nil") - errDepthNotFound = errors.New("orderbook depth not found") errRESTOverwrite = errors.New("orderbook has been overwritten by REST protocol") errInvalidAction = errors.New("invalid action") errAmendFailure = errors.New("orderbook amend update failure") @@ -70,7 +74,7 @@ func (w *Orderbook) validate(u *orderbook.Update) error { if u == nil { return fmt.Errorf(packageError, errUpdateIsNil) } - if len(u.Bids) == 0 && len(u.Asks) == 0 { + if len(u.Bids) == 0 && len(u.Asks) == 0 && !u.AllowEmpty { return fmt.Errorf(packageError, errUpdateNoTargets) } return nil @@ -87,7 +91,7 @@ func (w *Orderbook) Update(u *orderbook.Update) error { book, ok := w.ob[key.PairAsset{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}] if !ok { return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s", - errDepthNotFound, + ErrDepthNotFound, w.exchangeName, u.Pair, u.Asset) @@ -309,15 +313,38 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { // GetOrderbook returns an orderbook copy as orderbook.Base func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base, error) { + if p.IsEmpty() { + return nil, currency.ErrCurrencyPairEmpty + } + if !a.IsValid() { + return nil, asset.ErrInvalidAsset + } w.mtx.Lock() defer w.mtx.Unlock() book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] if !ok { - return nil, fmt.Errorf("%s %s %s %w", w.exchangeName, p, a, errDepthNotFound) + return nil, fmt.Errorf("%s %w: %s.%s", w.exchangeName, ErrDepthNotFound, a, p) } return book.ob.Retrieve() } +// LastUpdateID returns the last update ID of the orderbook +func (w *Orderbook) LastUpdateID(p currency.Pair, a asset.Item) (int64, error) { + if p.IsEmpty() { + return 0, currency.ErrCurrencyPairEmpty + } + if !a.IsValid() { + return 0, asset.ErrInvalidAsset + } + w.mtx.Lock() + defer w.mtx.Unlock() + book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] + if !ok { + return 0, fmt.Errorf("%s %w: %s.%s", w.exchangeName, ErrDepthNotFound, a, p) + } + return book.ob.LastUpdateID() +} + // FlushBuffer flushes w.ob data to be garbage collected and refreshed when a // connection is lost and reconnected func (w *Orderbook) FlushBuffer() { @@ -332,11 +359,7 @@ func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error { defer w.mtx.Unlock() book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] if !ok { - return fmt.Errorf("cannot flush orderbook %s %s %s %w", - w.exchangeName, - p, - a, - errDepthNotFound) + return fmt.Errorf("cannot flush orderbook %s %s %s %w", w.exchangeName, p, a, ErrDepthNotFound) } // error not needed in this return _ = book.ob.Invalidate(errOrderbookFlushed) diff --git a/exchange/websocket/buffer/buffer_test.go b/exchange/websocket/buffer/buffer_test.go index da4df937..1ed73966 100644 --- a/exchange/websocket/buffer/buffer_test.go +++ b/exchange/websocket/buffer/buffer_test.go @@ -50,6 +50,7 @@ func createSnapshot(pair currency.Pair, bookVerifiy ...bool) (holder *Orderbook, PriceDuplication: true, LastUpdated: time.Now(), VerifyOrderbook: len(bookVerifiy) > 0 && bookVerifiy[0], + LastUpdateID: 69420, } newBook := make(map[key.PairAsset]*orderbookHolder) @@ -462,7 +463,7 @@ func TestOrderbookLastUpdateID(t *testing.T) { err = holder.Update(&orderbook.Update{ Asks: asks, Pair: cp, - UpdateID: int64(i) + 1, + UpdateID: int64(i) + 1 + 69420, Asset: asset.Spot, UpdateTime: time.Now(), }) @@ -480,7 +481,7 @@ func TestOrderbookLastUpdateID(t *testing.T) { ob, err := holder.GetOrderbook(cp, asset.Spot) require.NoError(t, err) - assert.Equal(t, int64(len(itemArray)), ob.LastUpdateID) + assert.Equal(t, int64(len(itemArray)+69420), ob.LastUpdateID) } // TestRunUpdateWithoutSnapshot logic test @@ -500,7 +501,7 @@ func TestRunUpdateWithoutSnapshot(t *testing.T) { UpdateTime: time.Now(), Asset: asset.Spot, }) - require.ErrorIs(t, err, errDepthNotFound) + require.ErrorIs(t, err, ErrDepthNotFound) } // TestRunUpdateWithoutAnyUpdates logic test @@ -723,6 +724,12 @@ func TestGetOrderbook(t *testing.T) { holder, _, _, err := createSnapshot(cp) require.NoError(t, err) + _, err = holder.GetOrderbook(currency.EMPTYPAIR, asset.Spot) + require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty) + + _, err = holder.GetOrderbook(cp, 0) + require.ErrorIs(t, err, asset.ErrInvalidAsset) + ob, err := holder.GetOrderbook(cp, asset.Spot) require.NoError(t, err) @@ -736,15 +743,35 @@ func TestGetOrderbook(t *testing.T) { bidLen, err := bufferOb.ob.GetBidLength() require.NoError(t, err) - if askLen != len(ob.Asks) || - bidLen != len(ob.Bids) || - b.Asset != ob.Asset || - b.Exchange != ob.Exchange || - b.LastUpdateID != ob.LastUpdateID || - b.PriceDuplication != ob.PriceDuplication || - b.Pair != ob.Pair { - t.Fatal("data on both books should be the same") - } + assert.Equal(t, askLen, len(ob.Asks), "ask length mismatch") + assert.Equal(t, bidLen, len(ob.Bids), "bid length mismatch") + assert.Equal(t, b.Asset, ob.Asset, "asset mismatch") + assert.Equal(t, b.Exchange, ob.Exchange, "exchange name mismatch") + assert.Equal(t, b.LastUpdateID, ob.LastUpdateID, "last update ID mismatch") + assert.Equal(t, b.PriceDuplication, ob.PriceDuplication, "price duplication mismatch") + assert.Equal(t, b.Pair, ob.Pair, "pair mismatch") +} + +func TestLastUpdateID(t *testing.T) { + t.Parallel() + cp, err := getExclusivePair() + require.NoError(t, err) + + holder, _, _, err := createSnapshot(cp) + require.NoError(t, err) + + _, err = holder.LastUpdateID(currency.EMPTYPAIR, asset.Spot) + require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty) + + _, err = holder.LastUpdateID(cp, 0) + require.ErrorIs(t, err, asset.ErrInvalidAsset) + + _, err = holder.LastUpdateID(cp, asset.FutureCombo) + require.ErrorIs(t, err, ErrDepthNotFound) + + ob, err := holder.LastUpdateID(cp, asset.Spot) + require.NoError(t, err) + require.Equal(t, int64(69420), ob) } func TestSetup(t *testing.T) { @@ -982,7 +1009,7 @@ func TestFlushOrderbook(t *testing.T) { } _, err = w.GetOrderbook(cp, asset.Spot) - require.ErrorIs(t, err, errDepthNotFound) + require.ErrorIs(t, err, ErrDepthNotFound) require.NoError(t, w.LoadSnapshot(&snapShot1)) require.NoError(t, w.FlushOrderbook(cp, asset.Spot)) diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index f84087a6..aab65620 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -189,6 +189,7 @@ func timeInForceFromString(tif string) (order.TimeInForce, error) { type Gateio struct { Counter common.Counter // Must be first due to alignment requirements exchange.Base + wsOBUpdateMgr *wsOBUpdateManager } // ***************************************** SubAccounts ******************************** @@ -334,43 +335,23 @@ func (g *Gateio) GetTicker(ctx context.Context, currencyPair, timezone string) ( return nil, fmt.Errorf("no ticker data found for currency pair %v", currencyPair) } -// GetIntervalString returns a string representation of the interval according to the Gateio exchange representation. -func (g *Gateio) GetIntervalString(interval kline.Interval) (string, error) { +// getIntervalString returns a string representation of the interval according to the Gateio exchange representation +func getIntervalString(interval kline.Interval) (string, error) { switch interval { - case kline.HundredMilliseconds: - return "100ms", nil case kline.ThousandMilliseconds: return "1000ms", nil - case kline.TenSecond: - return "10s", nil - case kline.ThirtySecond: - return "30s", nil - case kline.OneMin: - return "1m", nil - case kline.FiveMin: - return "5m", nil - case kline.FifteenMin: - return "15m", nil - case kline.ThirtyMin: - return "30m", nil - case kline.OneHour: - return "1h", nil - case kline.TwoHour: - return "2h", nil - case kline.FourHour: - return "4h", nil - case kline.EightHour: - return "8h", nil - case kline.TwelveHour: - return "12h", nil case kline.OneDay: return "1d", nil case kline.SevenDay: return "7d", nil case kline.OneMonth: return "30d", nil + case kline.TenMilliseconds, kline.TwentyMilliseconds, kline.HundredMilliseconds, kline.TwoHundredAndFiftyMilliseconds, + kline.TenSecond, kline.ThirtySecond, kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, + kline.OneHour, kline.TwoHour, kline.FourHour, kline.EightHour, kline.TwelveHour: + return interval.Short(), nil default: - return "", kline.ErrUnsupportedInterval + return "", fmt.Errorf("%q: %w", interval.String(), kline.ErrUnsupportedInterval) } } @@ -478,7 +459,7 @@ func (g *Gateio) GetCandlesticks(ctx context.Context, currencyPair currency.Pair var err error if interval.Duration().Microseconds() != 0 { var intervalString string - intervalString, err = g.GetIntervalString(interval) + intervalString, err = getIntervalString(interval) if err != nil { return nil, err } @@ -1963,7 +1944,7 @@ func (g *Gateio) GetFuturesCandlesticks(ctx context.Context, settle currency.Cod params.Set("limit", strconv.FormatUint(limit, 10)) } if interval.Duration().Microseconds() != 0 { - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } @@ -1993,7 +1974,7 @@ func (g *Gateio) PremiumIndexKLine(ctx context.Context, settleCurrency currency. if limit > 0 { params.Set("limit", strconv.FormatInt(limit, 10)) } - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } @@ -2059,7 +2040,7 @@ func (g *Gateio) GetFutureStats(ctx context.Context, settle currency.Code, contr params.Set("from", strconv.FormatInt(from.Unix(), 10)) } if int64(interval) != 0 { - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } @@ -2735,7 +2716,7 @@ func (g *Gateio) GetDeliveryFuturesCandlesticks(ctx context.Context, settle curr params.Set("limit", strconv.FormatUint(limit, 10)) } if int64(interval) != 0 { - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } @@ -3502,7 +3483,7 @@ func (g *Gateio) GetOptionFuturesCandlesticks(ctx context.Context, contract curr if !to.IsZero() { params.Set("to", strconv.FormatInt(to.Unix(), 10)) } - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } @@ -3528,7 +3509,7 @@ func (g *Gateio) GetOptionFuturesMarkPriceCandlesticks(ctx context.Context, unde params.Set("to", strconv.FormatInt(to.Unix(), 10)) } if int64(interval) != 0 { - intervalString, err := g.GetIntervalString(interval) + intervalString, err := getIntervalString(interval) if err != nil { return nil, err } diff --git a/exchanges/gateio/gateio_test.go b/exchanges/gateio/gateio_test.go index 1e0d8cdc..e19e410c 100644 --- a/exchanges/gateio/gateio_test.go +++ b/exchanges/gateio/gateio_test.go @@ -8,6 +8,7 @@ import ( "log" "os" "slices" + "strconv" "strings" "sync" "testing" @@ -2260,9 +2261,6 @@ func TestGenerateSubscriptionsSpot(t *testing.T) { require.NoError(t, testexch.Setup(g), "Test instance Setup must not error") g.Websocket.SetCanUseAuthenticatedEndpoints(true) - g.Features.Subscriptions = append(g.Features.Subscriptions, &subscription.Subscription{ - Enabled: true, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.ThousandMilliseconds, Levels: 5, - }) subs, err := g.generateSubscriptionsSpot() require.NoError(t, err, "generateSubscriptions must not error") exp := subscription.List{} @@ -3345,3 +3343,224 @@ func TestTimeInForceString(t *testing.T) { assert.Equal(t, valid.String, timeInForceString(valid.TimeInForce)) } } + +func TestIsSingleOrderbookChannel(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + channel string + expected bool + }{ + {channel: spotOrderbookUpdateChannel, expected: true}, + {channel: spotOrderbookChannel, expected: true}, + {channel: spotOrderbookTickerChannel, expected: true}, + {channel: futuresOrderbookChannel, expected: true}, + {channel: futuresOrderbookTickerChannel, expected: true}, + {channel: futuresOrderbookUpdateChannel, expected: true}, + {channel: optionsOrderbookChannel, expected: true}, + {channel: optionsOrderbookTickerChannel, expected: true}, + {channel: optionsOrderbookUpdateChannel, expected: true}, + {channel: spotTickerChannel, expected: false}, + {channel: "sad", expected: false}, + } { + assert.Equal(t, tc.expected, isSingleOrderbookChannel(tc.channel)) + } +} + +func TestValidateSubscriptions(t *testing.T) { + t.Parallel() + require.NoError(t, g.ValidateSubscriptions(nil)) + require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSDT()}}})) + require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{ + {Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + })) + require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{ + {Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD(), currency.NewBTCUSDT()}}, + })) + require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{ + {Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSDT()}}, + })) + require.ErrorIs(t, g.ValidateSubscriptions([]*subscription.Subscription{ + {Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + {Channel: spotOrderbookChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}}, + }), subscription.ErrExclusiveSubscription) +} + +func TestCandlesChannelIntervals(t *testing.T) { + t.Parallel() + s := &subscription.Subscription{Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: 0} + _, err := candlesChannelInterval(s) + require.ErrorIs(t, err, kline.ErrUnsupportedInterval, "candlestickChannelInterval must error correctly with a 0 interval") + s.Interval = kline.ThousandMilliseconds + i, err := candlesChannelInterval(s) + require.NoError(t, err) + assert.Equal(t, "1000ms", i) +} + +func TestOrderbookChannelIntervals(t *testing.T) { + t.Parallel() + + s := &subscription.Subscription{Channel: futuresOrderbookUpdateChannel, Interval: kline.TwentyMilliseconds, Levels: 100} + _, err := orderbookChannelInterval(s, asset.Futures) + require.ErrorIs(t, err, subscription.ErrInvalidInterval) + require.ErrorContains(t, err, "20ms only valid with Levels 20") + s.Levels = 20 + i, err := orderbookChannelInterval(s, asset.Futures) + require.NoError(t, err) + assert.Equal(t, "20ms", i) + + for s, exp := range map[*subscription.Subscription]error{ + {Asset: asset.Binary, Channel: "unknown_channel", Interval: kline.OneYear}: nil, + {Asset: asset.Spot, Channel: spotOrderbookTickerChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.Spot, Channel: spotOrderbookTickerChannel, Interval: 0}: nil, + {Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.HundredMilliseconds}: nil, + {Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.ThousandMilliseconds}: nil, + {Asset: asset.Spot, Channel: spotOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.Spot, Channel: spotOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil, + {Asset: asset.Futures, Channel: futuresOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.Futures, Channel: futuresOrderbookTickerChannel, Interval: 0}: nil, + {Asset: asset.Futures, Channel: futuresOrderbookChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.Futures, Channel: futuresOrderbookChannel, Interval: 0}: nil, + {Asset: asset.Futures, Channel: futuresOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.Futures, Channel: futuresOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookTickerChannel, Interval: 0}: nil, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookChannel, Interval: 0}: nil, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil, + {Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.ThousandMilliseconds}: nil, + + {Asset: asset.Options, Channel: optionsOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.Options, Channel: optionsOrderbookTickerChannel, Interval: 0}: nil, + {Asset: asset.Options, Channel: optionsOrderbookChannel, Interval: kline.TwoHundredAndFiftyMilliseconds}: subscription.ErrInvalidInterval, + {Asset: asset.Options, Channel: optionsOrderbookChannel, Interval: 0}: nil, + {Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval, + {Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil, + {Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.ThousandMilliseconds}: nil, + } { + t.Run(s.Asset.String()+"/"+s.Channel+"/"+s.Interval.Short(), func(t *testing.T) { + t.Parallel() + i, err := orderbookChannelInterval(s, s.Asset) + if exp != nil { + require.ErrorIs(t, err, exp) + } else { + switch { + case s.Channel == "unknown_channel": + assert.Empty(t, i, "orderbookChannelInterval should return empty for unknown channels") + case strings.HasSuffix(s.Channel, "_ticker"): + assert.Empty(t, i) + case s.Interval == 0: + assert.Equal(t, "0", i) + default: + exp, err2 := getIntervalString(s.Interval) + require.NoError(t, err2, "getIntervalString must not error for validating expected value") + require.Equal(t, exp, i) + } + } + }) + } +} + +func TestChannelLevels(t *testing.T) { + t.Parallel() + + for s, exp := range map[*subscription.Subscription]error{ + {Channel: "unknown_channel", Asset: asset.Binary}: nil, + {Channel: spotOrderbookTickerChannel, Asset: asset.Spot}: nil, + {Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Levels: 1}: subscription.ErrInvalidLevel, + {Channel: spotOrderbookUpdateChannel, Asset: asset.Spot}: nil, + {Channel: spotOrderbookUpdateChannel, Asset: asset.Spot, Levels: 100}: subscription.ErrInvalidLevel, + {Channel: spotOrderbookChannel, Asset: asset.Spot}: subscription.ErrInvalidLevel, + {Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 5}: nil, + {Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 10}: nil, + {Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 20}: nil, + {Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 50}: nil, + {Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 100}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures}: subscription.ErrInvalidLevel, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 1}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 5}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 10}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 20}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 50}: nil, + {Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 100}: nil, + {Channel: futuresOrderbookTickerChannel, Asset: asset.Futures}: nil, + {Channel: futuresOrderbookTickerChannel, Asset: asset.Futures, Levels: 1}: subscription.ErrInvalidLevel, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures}: subscription.ErrInvalidLevel, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures, Levels: 20}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures, Levels: 50}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures}: subscription.ErrInvalidLevel, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 5}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 10}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 20}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 50}: nil, + {Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 100}: nil, + {Channel: optionsOrderbookTickerChannel, Asset: asset.Options}: nil, + {Channel: optionsOrderbookTickerChannel, Asset: asset.Options, Levels: 1}: subscription.ErrInvalidLevel, + {Channel: optionsOrderbookUpdateChannel, Asset: asset.Options}: subscription.ErrInvalidLevel, + {Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 5}: nil, + {Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 10}: nil, + {Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 20}: nil, + {Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 50}: nil, + {Channel: optionsOrderbookChannel, Asset: asset.Options}: subscription.ErrInvalidLevel, + {Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 5}: nil, + {Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 10}: nil, + {Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 20}: nil, + {Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 50}: nil, + } { + t.Run(s.Asset.String()+"/"+s.Channel+"/"+strconv.Itoa(s.Levels), func(t *testing.T) { + t.Parallel() + l, err := channelLevels(s, s.Asset) + switch { + case exp != nil: + require.ErrorIs(t, err, exp) + case s.Levels == 0: + assert.Empty(t, l) + default: + require.NoError(t, err) + require.NotEmpty(t, l) + } + }) + } +} + +func TestGetIntervalString(t *testing.T) { + t.Parallel() + for k, exp := range map[kline.Interval]string{ + kline.TenMilliseconds: "10ms", + kline.TwentyMilliseconds: "20ms", + kline.HundredMilliseconds: "100ms", + kline.TwoHundredAndFiftyMilliseconds: "250ms", + kline.ThousandMilliseconds: "1000ms", + kline.TenSecond: "10s", + kline.ThirtySecond: "30s", + kline.OneMin: "1m", + kline.FiveMin: "5m", + kline.FifteenMin: "15m", + kline.ThirtyMin: "30m", + kline.OneHour: "1h", + kline.TwoHour: "2h", + kline.FourHour: "4h", + kline.EightHour: "8h", + kline.TwelveHour: "12h", + kline.OneDay: "1d", + kline.SevenDay: "7d", + kline.OneMonth: "30d", + } { + t.Run(exp, func(t *testing.T) { + t.Parallel() + s, err := getIntervalString(k) + require.NoError(t, err) + assert.Equal(t, exp, s) + }) + } + _, err := getIntervalString(0) + assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "0 should be an invalid interval") + _, err = getIntervalString(kline.FiveDay) + assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "Any other random interval should also be invalid") +} diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index d4cac557..a1399aac 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2050,7 +2050,7 @@ type WsCandlesticks struct { type WsOrderbookTickerData struct { UpdateTime types.Time `json:"t"` UpdateOrderID int64 `json:"u"` - CurrencyPair currency.Pair `json:"s"` + Pair currency.Pair `json:"s"` BestBidPrice types.Number `json:"b"` BestBidAmount types.Number `json:"B"` BestAskPrice types.Number `json:"a"` @@ -2059,12 +2059,12 @@ type WsOrderbookTickerData struct { // WsOrderbookUpdate represents websocket orderbook update push data type WsOrderbookUpdate struct { - UpdateTime types.Time `json:"t"` - CurrencyPair currency.Pair `json:"s"` - FirstOrderbookUpdatedID int64 `json:"U"` // First update order book id in this event since last update - LastOrderbookUpdatedID int64 `json:"u"` - Bids [][2]types.Number `json:"b"` - Asks [][2]types.Number `json:"a"` + UpdateTime types.Time `json:"t"` + Pair currency.Pair `json:"s"` + FirstUpdateID int64 `json:"U"` // First update order book id in this event since last update + LastUpdateID int64 `json:"u"` + Bids [][2]types.Number `json:"b"` + Asks [][2]types.Number `json:"a"` } // WsOrderbookSnapshot represents a websocket orderbook snapshot push data @@ -2225,14 +2225,14 @@ type WsFuturesAndOptionsOrderbookUpdate struct { ContractName currency.Pair `json:"s"` FirstUpdatedID int64 `json:"U"` LastUpdatedID int64 `json:"u"` - Bids []struct { - Price types.Number `json:"p"` - Size float64 `json:"s"` - } `json:"b"` - Asks []struct { - Price types.Number `json:"p"` - Size float64 `json:"s"` - } `json:"a"` + Bids []Tranche `json:"b"` + Asks []Tranche `json:"a"` +} + +// Tranche represents a tranche of orderbook data +type Tranche struct { + Price types.Number `json:"p"` + Size float64 `json:"s"` } // WsFuturesOrderbookSnapshot represents a futures orderbook snapshot push data @@ -2240,14 +2240,8 @@ type WsFuturesOrderbookSnapshot struct { Timestamp types.Time `json:"t"` Contract currency.Pair `json:"contract"` OrderbookID int64 `json:"id"` - Asks []struct { - Price types.Number `json:"p"` - Size float64 `json:"s"` - } `json:"asks"` - Bids []struct { - Price types.Number `json:"p"` - Size float64 `json:"s"` - } `json:"bids"` + Asks []Tranche `json:"asks"` + Bids []Tranche `json:"bids"` } // WsFuturesOrderbookUpdateEvent represents futures orderbook push data with the event 'update' diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 43e6518f..8e740010 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/http" + "slices" "strconv" "strings" "text/template" @@ -17,6 +18,7 @@ import ( "github.com/buger/jsonparser" gws "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" @@ -59,14 +61,14 @@ var defaultSubscriptions = subscription.List{ {Enabled: true, Channel: subscription.TickerChannel, Asset: asset.Spot}, {Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FiveMin}, {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds}, + {Enabled: false, Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Interval: kline.TenMilliseconds, Levels: 1}, + {Enabled: false, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds, Levels: 100}, {Enabled: true, Channel: spotBalancesChannel, Asset: asset.Spot, Authenticated: true}, {Enabled: true, Channel: crossMarginBalanceChannel, Asset: asset.CrossMargin, Authenticated: true}, {Enabled: true, Channel: marginBalancesChannel, Asset: asset.Margin, Authenticated: true}, {Enabled: false, Channel: subscription.AllTradesChannel, Asset: asset.Spot}, } -var fetchedCurrencyPairSnapshotOrderbook = make(map[string]bool) - var subscriptionNames = map[string]string{ subscription.TickerChannel: spotTickerChannel, subscription.OrderbookChannel: spotOrderbookUpdateChannel, @@ -187,7 +189,7 @@ func (g *Gateio) WsHandleSpotData(ctx context.Context, respRaw []byte) error { case spotOrderbookTickerChannel: return g.processOrderbookTicker(push.Result, push.Time) case spotOrderbookUpdateChannel: - return g.processOrderbookUpdate(push.Result, push.Time) + return g.processOrderbookUpdate(ctx, push.Result, push.Time) case spotOrderbookChannel: return g.processOrderbookSnapshot(push.Result, push.Time) case spotOrdersChannel: @@ -358,7 +360,7 @@ func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Tim } return g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ Exchange: g.Name, - Pair: data.CurrencyPair, + Pair: data.Pair, Asset: asset.Spot, LastUpdated: data.UpdateTime.Time(), UpdatePushedAt: updatePushedAt, @@ -367,41 +369,11 @@ func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Tim }) } -func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Time) error { +func (g *Gateio) processOrderbookUpdate(ctx context.Context, incoming []byte, updatePushedAt time.Time) error { var data WsOrderbookUpdate if err := json.Unmarshal(incoming, &data); err != nil { return err } - - if len(data.Asks) == 0 && len(data.Bids) == 0 { - return nil - } - - enabledAssets := make([]asset.Item, 0, len(standardMarginAssetTypes)) - for _, a := range standardMarginAssetTypes { - if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled { - enabledAssets = append(enabledAssets, a) - } - } - - sPair := data.CurrencyPair.String() - if !fetchedCurrencyPairSnapshotOrderbook[sPair] { - orderbooks, err := g.UpdateOrderbook(context.Background(), data.CurrencyPair, asset.Spot) // currency pair orderbook data for Spot, Margin, and Cross Margin is same - if err != nil { - return err - } - // TODO: handle orderbook update synchronisation - for _, a := range enabledAssets { - assetOrderbook := *orderbooks - assetOrderbook.Asset = a - err = g.Websocket.Orderbook.LoadSnapshot(&assetOrderbook) - if err != nil { - return err - } - } - fetchedCurrencyPairSnapshotOrderbook[sPair] = true - } - asks := make([]orderbook.Tranche, len(data.Asks)) for x := range data.Asks { asks[x].Price = data.Asks[x][0].Float64() @@ -412,21 +384,16 @@ func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Tim bids[x].Price = data.Bids[x][0].Float64() bids[x].Amount = data.Bids[x][1].Float64() } - - for _, a := range enabledAssets { - if err := g.Websocket.Orderbook.Update(&orderbook.Update{ - UpdateTime: data.UpdateTime.Time(), - UpdatePushedAt: updatePushedAt, - Pair: data.CurrencyPair, - Asset: a, - Asks: asks, - Bids: bids, - }); err != nil { - return err - } - } - - return nil + return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdateID, &orderbook.Update{ + UpdateID: data.LastUpdateID, + UpdateTime: data.UpdateTime.Time(), + UpdatePushedAt: updatePushedAt, + Pair: data.Pair, + Asset: asset.Spot, + Asks: asks, + Bids: bids, + AllowEmpty: true, + }) } func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.Time) error { @@ -682,9 +649,10 @@ func (g *Gateio) GetSubscriptionTemplate(_ *subscription.Subscription) (*templat Funcs(template.FuncMap{ "channelName": channelName, "singleSymbolChannel": singleSymbolChannel, - "interval": g.GetIntervalString, - }). - Parse(subTplText) + "orderbookInterval": orderbookChannelInterval, + "candlesInterval": candlesChannelInterval, + "levels": channelLevels, + }).Parse(subTplText) } // manageSubs sends a websocket message to subscribe or unsubscribe from a list of channel @@ -782,20 +750,166 @@ func singleSymbolChannel(name string) bool { return false } +// ValidateSubscriptions implements the subscription.ListValidator interface. +// It ensures that, for each orderbook pair asset, only one type of subscription (e.g., best bid/ask, orderbook update, or orderbook snapshot) +// is active at a time. Multiple concurrent subscriptions for the same asset are disallowed to prevent orderbook data corruption. +func (g *Gateio) ValidateSubscriptions(l subscription.List) error { + orderbookGuard := map[key.PairAsset]string{} + for _, s := range l { + n := channelName(s) + if !isSingleOrderbookChannel(n) { + continue + } + for _, p := range s.Pairs { + k := key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: s.Asset} + existingChanName, ok := orderbookGuard[k] + if !ok { + orderbookGuard[k] = n + continue + } + if existingChanName != n { + return fmt.Errorf("%w for %q %q between %q and %q, please enable only one type", subscription.ErrExclusiveSubscription, k.Pair(), k.Asset, existingChanName, n) + } + } + } + return nil +} + +// isSingleOrderbookChannel checks if the specified channel represents a single orderbook subscription. +// It returns true for channels like orderbook updates, snapshots, or tickers, as multiple subscriptions +// for the same pair asset could corrupt the stored orderbook data. +func isSingleOrderbookChannel(name string) bool { + switch name { + case spotOrderbookUpdateChannel, + spotOrderbookChannel, + spotOrderbookTickerChannel, + futuresOrderbookChannel, + futuresOrderbookTickerChannel, + futuresOrderbookUpdateChannel, + optionsOrderbookChannel, + optionsOrderbookTickerChannel, + optionsOrderbookUpdateChannel: + return true + } + return false +} + +var channelIntervalsMap = map[asset.Item]map[string][]kline.Interval{ + asset.Spot: { + spotOrderbookTickerChannel: {}, + spotOrderbookChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds}, + spotOrderbookUpdateChannel: {kline.TwentyMilliseconds, kline.HundredMilliseconds}, + }, + asset.Futures: { + futuresOrderbookTickerChannel: {}, + futuresOrderbookChannel: {0}, + futuresOrderbookUpdateChannel: {kline.TwentyMilliseconds, kline.HundredMilliseconds}, + }, + asset.DeliveryFutures: { + futuresOrderbookTickerChannel: {}, + futuresOrderbookChannel: {0}, + futuresOrderbookUpdateChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds}, + }, + asset.Options: { + optionsOrderbookTickerChannel: {}, + optionsOrderbookChannel: {0}, + optionsOrderbookUpdateChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds}, + }, +} + +func candlesChannelInterval(s *subscription.Subscription) (string, error) { + if s.Channel == subscription.CandlesChannel { + return getIntervalString(s.Interval) + } + return "", nil +} + +func orderbookChannelInterval(s *subscription.Subscription, a asset.Item) (string, error) { + cName := channelName(s) + + assetChannels, ok := channelIntervalsMap[a] + if !ok { + return "", nil + } + + switch intervals, ok := assetChannels[cName]; { + case !ok: + return "", nil + case len(intervals) == 0: + if s.Interval != 0 { + return "", fmt.Errorf("%w for %s: %q; interval not supported for channel", subscription.ErrInvalidInterval, cName, s.Interval) + } + return "", nil + case !slices.Contains(intervals, s.Interval): + return "", fmt.Errorf("%w for %s: %q; supported: %q", subscription.ErrInvalidInterval, cName, s.Interval, intervals) + case cName == futuresOrderbookUpdateChannel && s.Interval == kline.TwentyMilliseconds && s.Levels != 20: + return "", fmt.Errorf("%w for %q: 20ms only valid with Levels 20", subscription.ErrInvalidInterval, cName) + case s.Interval == 0: + return "0", nil // Do not move this into getIntervalString, it's only valid for ws subs + } + + return getIntervalString(s.Interval) +} + +var channelLevelsMap = map[asset.Item]map[string][]int{ + asset.Spot: { + spotOrderbookTickerChannel: {}, + spotOrderbookUpdateChannel: {}, + spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, + }, + asset.Futures: { + futuresOrderbookChannel: {1, 5, 10, 20, 50, 100}, + futuresOrderbookTickerChannel: {}, + futuresOrderbookUpdateChannel: {20, 50, 100}, + }, + asset.DeliveryFutures: { + futuresOrderbookChannel: {1, 5, 10, 20, 50, 100}, + futuresOrderbookTickerChannel: {}, + futuresOrderbookUpdateChannel: {5, 10, 20, 50, 100}, + }, + asset.Options: { + optionsOrderbookTickerChannel: {}, + optionsOrderbookUpdateChannel: {5, 10, 20, 50}, + optionsOrderbookChannel: {5, 10, 20, 50}, + }, +} + +func channelLevels(s *subscription.Subscription, a asset.Item) (string, error) { + cName := channelName(s) + assetChannels, ok := channelLevelsMap[a] + if !ok { + return "", nil + } + + switch levels, ok := assetChannels[cName]; { + case !ok: + return "", nil + case len(levels) == 0: + if s.Levels != 0 { + return "", fmt.Errorf("%w for %s: `%d`; levels not supported for channel", subscription.ErrInvalidLevel, cName, s.Levels) + } + return "", nil + case !slices.Contains(levels, s.Levels): + return "", fmt.Errorf("%w for %s: %d; supported: %v", subscription.ErrInvalidLevel, cName, s.Levels, levels) + } + + return strconv.Itoa(s.Levels), nil +} + const subTplText = ` {{- with $name := channelName $.S }} {{- range $asset, $pairs := $.AssetPairs }} {{- if singleSymbolChannel $name }} {{- range $i, $p := $pairs -}} - {{- if eq $name "spot.candlesticks" }}{{ interval $.S.Interval -}} , {{- end }} + {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} {{- $p }} - {{- if eq "spot.order_book" $name -}} , {{- $.S.Levels }}{{ end }} - {{- if hasPrefix "spot.order_book" $name -}} , {{- interval $.S.Interval }}{{ end }} + {{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }} + {{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }} {{- $.PairSeparator }} {{- end }} {{- $.AssetSeparator }} {{- else }} - {{- $pairs.Join }} + {{- $pairs.Join }} {{- end }} {{- end }} {{- end }} diff --git a/exchanges/gateio/gateio_websocket_delivery_futures.go b/exchanges/gateio/gateio_websocket_delivery_futures.go index 1d20ae88..33227d29 100644 --- a/exchanges/gateio/gateio_websocket_delivery_futures.go +++ b/exchanges/gateio/gateio_websocket_delivery_futures.go @@ -25,25 +25,23 @@ const ( // delivery testnet urls deliveryTestNetBTCTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/btc" //nolint:unused // Can be used for testing deliveryTestNetUSDTTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/usdt" //nolint:unused // Can be used for testing + + deliveryFuturesUpdateLimit uint64 = 100 ) var defaultDeliveryFuturesSubscriptions = []string{ futuresTickersChannel, futuresTradesChannel, - futuresOrderbookChannel, + futuresOrderbookUpdateChannel, futuresCandlesticksChannel, } -var fetchedFuturesCurrencyPairSnapshotOrderbook = make(map[string]bool) - // WsDeliveryFuturesConnect initiates a websocket connection for delivery futures account func (g *Gateio) WsDeliveryFuturesConnect(ctx context.Context, conn websocket.Connection) error { - err := g.CurrencyPairs.IsAssetEnabled(asset.DeliveryFutures) - if err != nil { + if err := g.CurrencyPairs.IsAssetEnabled(asset.DeliveryFutures); err != nil { return err } - err = conn.DialContext(ctx, &gws.Dialer{}, http.Header{}) - if err != nil { + if err := conn.DialContext(ctx, &gws.Dialer{}, http.Header{}); err != nil { return err } pingMessage, err := json.Marshal(WsInput{ @@ -64,6 +62,7 @@ func (g *Gateio) WsDeliveryFuturesConnect(ctx context.Context, conn websocket.Co } // GenerateDeliveryFuturesDefaultSubscriptions returns delivery futures default subscriptions params. +// TODO: Update to use the new subscription template system func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.List, error) { _, err := g.GetCredentials(context.Background()) if err != nil { @@ -92,6 +91,9 @@ func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.Lis params["interval"] = "0" case futuresCandlesticksChannel: params["interval"] = kline.FiveMin + case futuresOrderbookUpdateChannel: + params["frequency"] = kline.HundredMilliseconds + params["level"] = strconv.FormatUint(deliveryFuturesUpdateLimit, 10) } fPair, err := g.FormatExchangeCurrency(pairs[j], asset.DeliveryFutures) if err != nil { @@ -165,7 +167,7 @@ func (g *Gateio) generateDeliveryFuturesPayload(ctx context.Context, conn websoc frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval) if okay { var frequencyString string - frequencyString, err = g.GetIntervalString(frequency) + frequencyString, err = getIntervalString(frequency) if err != nil { return nil, err } @@ -188,7 +190,7 @@ func (g *Gateio) generateDeliveryFuturesPayload(ctx context.Context, conn websoc interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval) if okay { var intervalString string - intervalString, err = g.GetIntervalString(interval) + intervalString, err = getIntervalString(interval) if err != nil { return nil, err } diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index a76d1fa9..cae6a626 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -45,12 +45,13 @@ const ( futuresReduceRiskLimitsChannel = "futures.reduce_risk_limits" futuresPositionsChannel = "futures.positions" futuresAutoOrdersChannel = "futures.autoorders" + + futuresOrderbookUpdateLimit uint64 = 20 ) var defaultFuturesSubscriptions = []string{ futuresTickersChannel, futuresTradesChannel, - futuresOrderbookChannel, futuresOrderbookUpdateChannel, futuresCandlesticksChannel, } @@ -85,6 +86,7 @@ func (g *Gateio) WsFuturesConnect(ctx context.Context, conn websocket.Connection } // GenerateFuturesDefaultSubscriptions returns default subscriptions information. +// TODO: Update to use the new subscription template system func (g *Gateio) GenerateFuturesDefaultSubscriptions(a asset.Item) (subscription.List, error) { channelsToSubscribe := defaultFuturesSubscriptions if g.Websocket.CanUseAuthenticatedEndpoints() { @@ -110,8 +112,9 @@ func (g *Gateio) GenerateFuturesDefaultSubscriptions(a asset.Item) (subscription case futuresCandlesticksChannel: params["interval"] = kline.FiveMin case futuresOrderbookUpdateChannel: - params["frequency"] = kline.ThousandMilliseconds - params["level"] = "100" + // This is the fastest frequency available for futures orderbook updates 20 levels every 20ms + params["frequency"] = kline.TwentyMilliseconds + params["level"] = strconv.FormatUint(futuresOrderbookUpdateLimit, 10) } fPair, err := g.FormatExchangeCurrency(pairs[j], a) if err != nil { @@ -163,7 +166,7 @@ func (g *Gateio) WsHandleFuturesData(ctx context.Context, respRaw []byte, a asse case futuresOrderbookTickerChannel: return g.processFuturesOrderbookTicker(push.Result) case futuresOrderbookUpdateChannel: - return g.processFuturesAndOptionsOrderbookUpdate(push.Result, a) + return g.processFuturesOrderbookUpdate(ctx, push.Result, a, push.Time) case futuresCandlesticksChannel: return g.processFuturesCandlesticks(respRaw, a) case futuresOrdersChannel: @@ -247,7 +250,7 @@ func (g *Gateio) generateFuturesPayload(ctx context.Context, conn websocket.Conn frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval) if okay { var frequencyString string - frequencyString, err = g.GetIntervalString(frequency) + frequencyString, err = getIntervalString(frequency) if err != nil { return nil, err } @@ -270,7 +273,7 @@ func (g *Gateio) generateFuturesPayload(ctx context.Context, conn websocket.Conn interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval) if okay { var intervalString string - intervalString, err = g.GetIntervalString(interval) + intervalString, err = getIntervalString(interval) if err != nil { return nil, err } @@ -403,50 +406,32 @@ func (g *Gateio) processFuturesOrderbookTicker(incoming []byte) error { return nil } -func (g *Gateio) processFuturesAndOptionsOrderbookUpdate(incoming []byte, assetType asset.Item) error { +func (g *Gateio) processFuturesOrderbookUpdate(ctx context.Context, incoming []byte, a asset.Item, pushTime time.Time) error { var data WsFuturesAndOptionsOrderbookUpdate - err := json.Unmarshal(incoming, &data) - if err != nil { + if err := json.Unmarshal(incoming, &data); err != nil { return err } - if (assetType == asset.Options && !fetchedOptionsCurrencyPairSnapshotOrderbook[data.ContractName.String()]) || - (assetType != asset.Options && !fetchedFuturesCurrencyPairSnapshotOrderbook[data.ContractName.String()]) { - orderbooks, err := g.UpdateOrderbook(context.Background(), data.ContractName, assetType) - if err != nil { - return err - } - if orderbooks.LastUpdateID < data.FirstUpdatedID || orderbooks.LastUpdateID > data.LastUpdatedID { - return nil - } - err = g.Websocket.Orderbook.LoadSnapshot(orderbooks) - if err != nil { - return err - } - if assetType == asset.Options { - fetchedOptionsCurrencyPairSnapshotOrderbook[data.ContractName.String()] = true - } else { - fetchedFuturesCurrencyPairSnapshotOrderbook[data.ContractName.String()] = true - } - } - updates := orderbook.Update{ - UpdateTime: data.Timestamp.Time(), - Pair: data.ContractName, - Asset: assetType, - } - updates.Asks = make([]orderbook.Tranche, len(data.Asks)) + asks := make([]orderbook.Tranche, len(data.Asks)) for x := range data.Asks { - updates.Asks[x].Amount = data.Asks[x].Size - updates.Asks[x].Price = data.Asks[x].Price.Float64() + asks[x].Price = data.Asks[x].Price.Float64() + asks[x].Amount = data.Asks[x].Size } - updates.Bids = make([]orderbook.Tranche, len(data.Bids)) + bids := make([]orderbook.Tranche, len(data.Bids)) for x := range data.Bids { - updates.Bids[x].Amount = data.Bids[x].Size - updates.Bids[x].Price = data.Bids[x].Price.Float64() + bids[x].Price = data.Bids[x].Price.Float64() + bids[x].Amount = data.Bids[x].Size } - if len(updates.Asks) == 0 && len(updates.Bids) == 0 { - return errors.New("malformed orderbook data") - } - return g.Websocket.Orderbook.Update(&updates) + + return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{ + UpdateID: data.LastUpdatedID, + UpdateTime: data.Timestamp.Time(), + UpdatePushedAt: pushTime, + Pair: data.ContractName, + Asset: a, + Asks: asks, + Bids: bids, + AllowEmpty: true, + }) } func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, updatePushedAt time.Time) error { diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 3d49a5e5..6a3a2d0b 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -51,6 +51,8 @@ const ( optionsPositionCloseChannel = "options.position_closes" optionsBalancesChannel = "options.balances" optionsPositionsChannel = "options.positions" + + optionOrderbookUpdateLimit uint64 = 50 ) var defaultOptionsSubscriptions = []string{ @@ -60,12 +62,9 @@ var defaultOptionsSubscriptions = []string{ optionsUnderlyingTradesChannel, optionsContractCandlesticksChannel, optionsUnderlyingCandlesticksChannel, - optionsOrderbookChannel, optionsOrderbookUpdateChannel, } -var fetchedOptionsCurrencyPairSnapshotOrderbook = make(map[string]bool) - // WsOptionsConnect initiates a websocket connection to options websocket endpoints. func (g *Gateio) WsOptionsConnect(ctx context.Context, conn websocket.Connection) error { err := g.CurrencyPairs.IsAssetEnabled(asset.Options) @@ -94,6 +93,7 @@ func (g *Gateio) WsOptionsConnect(ctx context.Context, conn websocket.Connection } // GenerateOptionsDefaultSubscriptions generates list of channel subscriptions for options asset type. +// TODO: Update to use the new subscription template system func (g *Gateio) GenerateOptionsDefaultSubscriptions() (subscription.List, error) { channelsToSubscribe := defaultOptionsSubscriptions var userID int64 @@ -140,8 +140,8 @@ getEnabledPairs: case optionsContractCandlesticksChannel, optionsUnderlyingCandlesticksChannel: params["interval"] = kline.FiveMin case optionsOrderbookUpdateChannel: - params["interval"] = kline.ThousandMilliseconds - params["level"] = "20" + params["interval"] = kline.HundredMilliseconds + params["level"] = strconv.FormatUint(optionOrderbookUpdateLimit, 10) case optionsOrdersChannel, optionsUserTradesChannel, optionsLiquidatesChannel, @@ -247,7 +247,7 @@ func (g *Gateio) generateOptionsPayload(ctx context.Context, conn websocket.Conn if !ok { return nil, fmt.Errorf("%w, missing options orderbook interval", orderbook.ErrOrderbookInvalid) } - intervalString, err = g.GetIntervalString(interval) + intervalString, err = getIntervalString(interval) if err != nil { return nil, err } @@ -262,7 +262,7 @@ func (g *Gateio) generateOptionsPayload(ctx context.Context, conn websocket.Conn if !ok { return nil, errors.New("missing options underlying candlesticks interval") } - intervalString, err = g.GetIntervalString(interval) + intervalString, err = getIntervalString(interval) if err != nil { return nil, err } @@ -327,7 +327,7 @@ func (g *Gateio) WsHandleOptionsData(ctx context.Context, respRaw []byte) error case optionsOrderbookTickerChannel: return g.processOrderbookTickerPushData(respRaw) case optionsOrderbookUpdateChannel: - return g.processFuturesAndOptionsOrderbookUpdate(push.Result, asset.Options) + return g.processOptionsOrderbookUpdate(ctx, push.Result, asset.Options, push.Time) case optionsOrdersChannel: return g.processOptionsOrderPushData(respRaw) case optionsUserTradesChannel: @@ -498,6 +498,33 @@ func (g *Gateio) processOrderbookTickerPushData(incoming []byte) error { return nil } +func (g *Gateio) processOptionsOrderbookUpdate(ctx context.Context, incoming []byte, a asset.Item, pushTime time.Time) error { + var data WsFuturesAndOptionsOrderbookUpdate + if err := json.Unmarshal(incoming, &data); err != nil { + return err + } + asks := make([]orderbook.Tranche, len(data.Asks)) + for x := range data.Asks { + asks[x].Price = data.Asks[x].Price.Float64() + asks[x].Amount = data.Asks[x].Size + } + bids := make([]orderbook.Tranche, len(data.Bids)) + for x := range data.Bids { + bids[x].Price = data.Bids[x].Price.Float64() + bids[x].Amount = data.Bids[x].Size + } + return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{ + UpdateID: data.LastUpdatedID, + UpdateTime: data.Timestamp.Time(), + UpdatePushedAt: pushTime, + Pair: data.ContractName, + Asset: a, + Asks: asks, + Bids: bids, + AllowEmpty: true, + }) +} + func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, updatePushedAt time.Time) error { if event == "all" { var data WsOptionsOrderbookSnapshot diff --git a/exchanges/gateio/gateio_websocket_request_spot_test.go b/exchanges/gateio/gateio_websocket_request_spot_test.go index 4f50bdd4..492579da 100644 --- a/exchanges/gateio/gateio_websocket_request_spot_test.go +++ b/exchanges/gateio/gateio_websocket_request_spot_test.go @@ -185,7 +185,7 @@ func TestWebsocketSpotGetOrderStatus(t *testing.T) { sharedtestvalues.SkipTestIfCredentialsUnset(t, g, canManipulateRealOrders) testexch.UpdatePairsOnce(t, g) - g := newExchangeWithWebsocket(t, asset.Spot) + g := newExchangeWithWebsocket(t, asset.Spot) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes got, err := g.WebsocketSpotGetOrderStatus(t.Context(), "644999650452", BTCUSDT, "") require.NoError(t, err) @@ -199,7 +199,7 @@ func newExchangeWithWebsocket(t *testing.T, a asset.Item) *Gateio { if apiKey == "" || apiSecret == "" { t.Skip() } - g := new(Gateio) + g := new(Gateio) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes require.NoError(t, testexch.Setup(g), "Test instance Setup must not error") testexch.UpdatePairsOnce(t, g) g.API.AuthenticatedSupport = true diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 55641953..8cab6c05 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -177,6 +177,7 @@ func (g *Gateio) SetDefaults() { g.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit g.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout g.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit + g.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay) } // Setup sets user configuration @@ -649,6 +650,11 @@ func (g *Gateio) UpdateTickers(ctx context.Context, a asset.Item) error { // UpdateOrderbook updates and returns the orderbook for a currency pair func (g *Gateio) UpdateOrderbook(ctx context.Context, p currency.Pair, a asset.Item) (*orderbook.Base, error) { + return g.UpdateOrderbookWithLimit(ctx, p, a, 0) +} + +// UpdateOrderbookWithLimit updates and returns the orderbook for a currency pair with a set orderbook size limit +func (g *Gateio) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair, a asset.Item, limit uint64) (*orderbook.Base, error) { p, err := g.FormatExchangeCurrency(p, a) if err != nil { return nil, err @@ -664,18 +670,18 @@ func (g *Gateio) UpdateOrderbook(ctx context.Context, p currency.Pair, a asset.I if a != asset.Spot && !available { return nil, fmt.Errorf("%v instrument %v does not have orderbook data", a, p) } - o, err = g.GetOrderbook(ctx, p.String(), "", 0, true) + o, err = g.GetOrderbook(ctx, p.String(), "", limit, true) case asset.CoinMarginedFutures, asset.USDTMarginedFutures: var settle currency.Code settle, err = getSettlementCurrency(p, a) if err != nil { return nil, err } - o, err = g.GetFuturesOrderbook(ctx, settle, p.String(), "", 0, true) + o, err = g.GetFuturesOrderbook(ctx, settle, p.String(), "", limit, true) case asset.DeliveryFutures: - o, err = g.GetDeliveryOrderbook(ctx, currency.USDT, "", p, 0, true) + o, err = g.GetDeliveryOrderbook(ctx, currency.USDT, "", p, limit, true) case asset.Options: - o, err = g.GetOptionsOrderbook(ctx, p, "", 0, true) + o, err = g.GetOptionsOrderbook(ctx, p, "", limit, true) default: return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, a) } diff --git a/exchanges/gateio/ws_ob_update_manager.go b/exchanges/gateio/ws_ob_update_manager.go new file mode 100644 index 00000000..2e7cead2 --- /dev/null +++ b/exchanges/gateio/ws_ob_update_manager.go @@ -0,0 +1,207 @@ +package gateio + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchange/websocket/buffer" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" +) + +const defaultWSSnapshotSyncDelay = 2 * time.Second + +var errOrderbookSnapshotOutdated = errors.New("orderbook snapshot is outdated") + +type wsOBUpdateManager struct { + lookup map[key.PairAsset]*updateCache + snapshotSyncDelay time.Duration + mtx sync.RWMutex +} + +type updateCache struct { + updates []pendingUpdate + updating bool + mtx sync.Mutex +} + +type pendingUpdate struct { + update *orderbook.Update + firstUpdateID int64 +} + +func newWsOBUpdateManager(snapshotSyncDelay time.Duration) *wsOBUpdateManager { + return &wsOBUpdateManager{lookup: make(map[key.PairAsset]*updateCache), snapshotSyncDelay: snapshotSyncDelay} +} + +// ProcessUpdate processes an orderbook update by syncing snapshot, caching updates and applying them +func (m *wsOBUpdateManager) ProcessUpdate(ctx context.Context, g *Gateio, firstUpdateID int64, update *orderbook.Update) error { + cache := m.LoadCache(update.Pair, update.Asset) + cache.mtx.Lock() + defer cache.mtx.Unlock() + + if cache.updating { + cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID}) + return nil + } + + lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(update.Pair, update.Asset) + if err != nil && !errors.Is(err, buffer.ErrDepthNotFound) { + return err + } + + if lastUpdateID+1 >= firstUpdateID { + return applyOrderbookUpdate(g, update) + } + + // Orderbook is behind notifications, flush store to prevent trading on stale data + if err := g.Websocket.Orderbook.FlushOrderbook(update.Pair, update.Asset); err != nil && !errors.Is(err, buffer.ErrDepthNotFound) { + return err + } + + cache.updating = true + cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID}) + + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(m.snapshotSyncDelay): + if err := cache.SyncOrderbook(ctx, g, update.Pair, update.Asset); err != nil { + g.Websocket.DataHandler <- fmt.Errorf("failed to sync orderbook for %v %v: %w", update.Pair, update.Asset, err) + } + } + }() + + return nil +} + +// LoadCache loads the cache for the given pair and asset. If the cache does not exist, it creates a new one. +func (m *wsOBUpdateManager) LoadCache(p currency.Pair, a asset.Item) *updateCache { + m.mtx.RLock() + cache, ok := m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] + m.mtx.RUnlock() + if !ok { + m.mtx.Lock() + cache = &updateCache{} + m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] = cache + m.mtx.Unlock() + } + return cache +} + +// SyncOrderbook fetches and synchronises an orderbook snapshot to the limit size so that pending updates can be +// applied to the orderbook. +func (c *updateCache) SyncOrderbook(ctx context.Context, g *Gateio, pair currency.Pair, a asset.Item) error { + // TODO: When subscription config is added for all assets update limits to use sub.Levels + var limit uint64 + switch a { + case asset.Spot: + sub := g.Websocket.GetSubscription(spotOrderbookUpdateKey) + if sub == nil { + return fmt.Errorf("no subscription found for %q", spotOrderbookUpdateKey) + } + // There is no way to set levels when we subscribe for this specific subscription case. + // Extract limit from interval e.g. 20ms == 20 limit book and 100ms == 100 limit book. + limit = uint64(sub.Interval.Duration().Milliseconds()) //nolint:gosec // No overflow risk + case asset.USDTMarginedFutures, asset.USDCMarginedFutures: + limit = futuresOrderbookUpdateLimit + case asset.DeliveryFutures: + limit = deliveryFuturesUpdateLimit + case asset.Options: + limit = optionOrderbookUpdateLimit + } + + book, err := g.UpdateOrderbookWithLimit(ctx, pair, a, limit) + + c.mtx.Lock() // lock here to prevent ws handle data interference with REST request above + defer func() { + c.updates = nil + c.updating = false + c.mtx.Unlock() + }() + + if err != nil { + return err + } + + if a != asset.Spot { + if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil { + return err + } + } else { + // Spot, Margin, and Cross Margin books are all classified as spot + for i := range standardMarginAssetTypes { + if enabled, _ := g.IsPairEnabled(pair, standardMarginAssetTypes[i]); !enabled { + continue + } + book.Asset = standardMarginAssetTypes[i] + if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil { + return err + } + } + } + return c.applyPendingUpdates(g, a) +} + +// ApplyPendingUpdates applies all pending updates to the orderbook +func (c *updateCache) applyPendingUpdates(g *Gateio, a asset.Item) error { + for _, data := range c.updates { + lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(data.update.Pair, a) + if err != nil { + return err + } + nextID := lastUpdateID + 1 + if data.firstUpdateID > nextID { + return errOrderbookSnapshotOutdated + } + if data.update.UpdateID < nextID { + continue // skip updates that are behind the current orderbook + } + if err := applyOrderbookUpdate(g, data.update); err != nil { + return err + } + } + return nil +} + +// applyOrderbookUpdate applies an orderbook update to the orderbook +func applyOrderbookUpdate(g *Gateio, update *orderbook.Update) error { + if update.Asset != asset.Spot { + return g.Websocket.Orderbook.Update(update) + } + + for i := range standardMarginAssetTypes { + if enabled, _ := g.IsPairEnabled(update.Pair, standardMarginAssetTypes[i]); !enabled { + continue + } + update.Asset = standardMarginAssetTypes[i] + if err := g.Websocket.Orderbook.Update(update); err != nil { + return err + } + } + + return nil +} + +var spotOrderbookUpdateKey = channelKey{&subscription.Subscription{Channel: subscription.OrderbookChannel}} + +var _ subscription.MatchableKey = channelKey{} + +type channelKey struct { + *subscription.Subscription +} + +func (k channelKey) Match(eachKey subscription.MatchableKey) bool { + return k.Subscription.Channel == eachKey.GetSubscription().Channel +} + +func (k channelKey) GetSubscription() *subscription.Subscription { + return k.Subscription +} diff --git a/exchanges/gateio/ws_ob_update_manager_test.go b/exchanges/gateio/ws_ob_update_manager_test.go new file mode 100644 index 00000000..ee7b2721 --- /dev/null +++ b/exchanges/gateio/ws_ob_update_manager_test.go @@ -0,0 +1,205 @@ +package gateio + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchange/websocket/buffer" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" +) + +func TestProcessUpdate(t *testing.T) { + t.Parallel() + + m := newWsOBUpdateManager(0) + err := m.ProcessUpdate(t.Context(), g, 1337, &orderbook.Update{}) + assert.ErrorIs(t, err, currency.ErrCurrencyPairEmpty) + + pair := currency.NewPair(currency.BABY, currency.BABYDOGE) + err = g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ + Exchange: g.Name, + Pair: pair, + Asset: asset.USDTMarginedFutures, + Bids: []orderbook.Tranche{{Price: 1, Amount: 1}}, + Asks: []orderbook.Tranche{{Price: 1, Amount: 1}}, + LastUpdated: time.Now(), + UpdatePushedAt: time.Now(), + LastUpdateID: 1336, + }) + require.NoError(t, err) + + err = m.ProcessUpdate(t.Context(), g, 1337, &orderbook.Update{ + UpdateID: 1338, + Pair: pair, + Asset: asset.USDTMarginedFutures, + AllowEmpty: true, + UpdateTime: time.Now(), + }) + require.NoError(t, err) + + // Test orderbook snapshot is behind update + err = m.ProcessUpdate(t.Context(), g, 1340, &orderbook.Update{ + UpdateID: 1341, + Pair: pair, + Asset: asset.USDTMarginedFutures, + AllowEmpty: true, + UpdateTime: time.Now(), + }) + require.NoError(t, err) + + cache := m.LoadCache(pair, asset.USDTMarginedFutures) + + cache.mtx.Lock() + assert.Len(t, cache.updates, 1) + assert.True(t, cache.updating) + cache.mtx.Unlock() + + // Test orderbook snapshot is behind update + err = m.ProcessUpdate(t.Context(), g, 1342, &orderbook.Update{ + UpdateID: 1343, + Pair: pair, + Asset: asset.USDTMarginedFutures, + AllowEmpty: true, + UpdateTime: time.Now(), + }) + require.NoError(t, err) + + cache.mtx.Lock() + assert.Len(t, cache.updates, 2) + assert.True(t, cache.updating) + cache.mtx.Unlock() + + time.Sleep(time.Millisecond * 2) // Allow sync delay to pass + + cache.mtx.Lock() + assert.Empty(t, cache.updates) + assert.False(t, cache.updating) + cache.mtx.Unlock() +} + +func TestLoadCache(t *testing.T) { + t.Parallel() + + m := newWsOBUpdateManager(0) + pair := currency.NewPair(currency.BABY, currency.BABYDOGE) + cache := m.LoadCache(pair, asset.USDTMarginedFutures) + assert.NotNil(t, cache) + assert.Len(t, m.lookup, 1) + + // Test cache is reused + cache2 := m.LoadCache(pair, asset.USDTMarginedFutures) + assert.Equal(t, cache, cache2) +} + +func TestSyncOrderbook(t *testing.T) { + t.Parallel() + + g := new(Gateio) + require.NoError(t, testexch.Setup(g), "Setup must not error") + require.NoError(t, g.UpdateTradablePairs(t.Context(), false)) + + // Add dummy subscription so that it can be matched and a limit/level can be extracted for initial orderbook sync spot. + err := g.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}) + require.NoError(t, err) + + m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay) + + for _, a := range []asset.Item{asset.Spot, asset.USDTMarginedFutures} { + pair := currency.NewPair(currency.ETH, currency.USDT) + err := g.CurrencyPairs.EnablePair(a, pair) + require.NoError(t, err) + cache := m.LoadCache(pair, a) + + cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: a}}} + cache.updating = true + err = cache.SyncOrderbook(t.Context(), g, pair, a) + require.NoError(t, err) + require.False(t, cache.updating) + require.Empty(t, cache.updates) + + expectedLimit := 20 + if a == asset.Spot { + expectedLimit = 100 + } + + b, err := g.Websocket.Orderbook.GetOrderbook(pair, a) + require.NoError(t, err) + require.Len(t, b.Bids, expectedLimit) + require.Len(t, b.Asks, expectedLimit) + } +} + +func TestApplyPendingUpdates(t *testing.T) { + t.Parallel() + + g := new(Gateio) + require.NoError(t, testexch.Setup(g), "Setup must not error") + require.NoError(t, g.UpdateTradablePairs(t.Context(), false)) + + m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay) + pair := currency.NewPair(currency.LTC, currency.USDT) + err := g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ + Exchange: g.Name, + Pair: pair, + Asset: asset.USDTMarginedFutures, + Bids: []orderbook.Tranche{{Price: 1, Amount: 1}}, + Asks: []orderbook.Tranche{{Price: 1, Amount: 1}}, + LastUpdated: time.Now(), + UpdatePushedAt: time.Now(), + LastUpdateID: 1335, + }) + require.NoError(t, err) + + cache := m.LoadCache(pair, asset.USDTMarginedFutures) + + update := &orderbook.Update{ + UpdateID: 1339, + Pair: pair, + Asset: asset.USDTMarginedFutures, + AllowEmpty: true, + UpdateTime: time.Now(), + } + + cache.updates = []pendingUpdate{{update: update, firstUpdateID: 1337}} + err = cache.applyPendingUpdates(g, asset.USDTMarginedFutures) + require.ErrorIs(t, err, errOrderbookSnapshotOutdated) + + cache.updates[0].firstUpdateID = 1336 + err = cache.applyPendingUpdates(g, asset.USDTMarginedFutures) + require.NoError(t, err) +} + +func TestApplyOrderbookUpdate(t *testing.T) { + t.Parallel() + + g := new(Gateio) + require.NoError(t, testexch.Setup(g), "Setup must not error") + require.NoError(t, g.UpdateTradablePairs(t.Context(), false)) + + pair := currency.NewBTCUSDT() + + update := &orderbook.Update{ + Pair: pair, + Asset: asset.USDTMarginedFutures, + AllowEmpty: true, + UpdateTime: time.Now(), + } + + err := applyOrderbookUpdate(g, update) + require.ErrorIs(t, err, buffer.ErrDepthNotFound) + + update.Asset = asset.Spot + err = applyOrderbookUpdate(g, update) + require.ErrorIs(t, err, buffer.ErrDepthNotFound) + + update.Pair = currency.NewPair(currency.BABY, currency.BABYDOGE) + err = applyOrderbookUpdate(g, update) + require.NoError(t, err) +} diff --git a/exchanges/kline/kline.go b/exchanges/kline/kline.go index 7b445029..320aa279 100644 --- a/exchanges/kline/kline.go +++ b/exchanges/kline/kline.go @@ -288,8 +288,14 @@ func durationToWord(in Interval) string { switch in { case Raw: return "raw" + case TenMilliseconds: + return "tenmillisec" + case TwentyMilliseconds: + return "twentymillisec" case HundredMilliseconds: return "hundredmillisec" + case TwoHundredAndFiftyMilliseconds: + return "twohundredfiftymillisec" case ThousandMilliseconds: return "thousandmillisec" case TenSecond: diff --git a/exchanges/kline/kline_test.go b/exchanges/kline/kline_test.go index 18f808d7..b758431d 100644 --- a/exchanges/kline/kline_test.go +++ b/exchanges/kline/kline_test.go @@ -6,7 +6,6 @@ import ( "math/rand" "os" "path/filepath" - "strings" "testing" "time" @@ -155,129 +154,40 @@ func TestDurationToWord(t *testing.T) { name string interval Interval }{ - { - "raw", - Raw, - }, - { - "hundredmillisec", - HundredMilliseconds, - }, - { - "thousandmillisec", - ThousandMilliseconds, - }, - { - "tensec", - TenSecond, - }, - { - "FifteenSecond", - FifteenSecond, - }, - { - "OneMin", - OneMin, - }, - { - "ThreeMin", - ThreeMin, - }, - { - "FiveMin", - FiveMin, - }, - { - "TenMin", - TenMin, - }, - { - "FifteenMin", - FifteenMin, - }, - { - "ThirtyMin", - ThirtyMin, - }, - { - "OneHour", - OneHour, - }, - { - "TwoHour", - TwoHour, - }, - { - "FourHour", - FourHour, - }, - { - "SixHour", - SixHour, - }, - { - "EightHour", - OneHour * 8, - }, - { - "TwelveHour", - TwelveHour, - }, - { - "OneDay", - OneDay, - }, - { - "ThreeDay", - ThreeDay, - }, - { - "FiveDay", - FiveDay, - }, - { - "FifteenDay", - FifteenDay, - }, - { - "OneWeek", - OneWeek, - }, - { - "TwoWeek", - TwoWeek, - }, - { - "OneMonth", - OneMonth, - }, - { - "ThreeMonth", - ThreeMonth, - }, - { - "SixMonth", - SixMonth, - }, - { - "OneYear", - OneYear, - }, - { - "notfound", - Interval(time.Hour * 1337), - }, + {"raw", Raw}, + {"tenmillisec", TenMilliseconds}, + {"twentymillisec", TwentyMilliseconds}, + {"hundredmillisec", HundredMilliseconds}, + {"twohundredfiftymillisec", TwoHundredAndFiftyMilliseconds}, + {"thousandmillisec", ThousandMilliseconds}, + {"tensec", TenSecond}, + {"fifteensecond", FifteenSecond}, + {"onemin", OneMin}, + {"threemin", ThreeMin}, + {"fivemin", FiveMin}, + {"tenmin", TenMin}, + {"fifteenmin", FifteenMin}, + {"thirtymin", ThirtyMin}, + {"onehour", OneHour}, + {"twohour", TwoHour}, + {"fourhour", FourHour}, + {"sixhour", SixHour}, + {"eighthour", OneHour * 8}, + {"twelvehour", TwelveHour}, + {"oneday", OneDay}, + {"threeday", ThreeDay}, + {"fiveday", FiveDay}, + {"fifteenday", FifteenDay}, + {"oneweek", OneWeek}, + {"twoweek", TwoWeek}, + {"onemonth", OneMonth}, + {"threemonth", ThreeMonth}, + {"sixmonth", SixMonth}, + {"oneyear", OneYear}, + {"notfound", Interval(time.Hour * 1337)}, } - for x := range testCases { - test := testCases[x] - t.Run(test.name, func(t *testing.T) { - t.Parallel() - t.Helper() - v := durationToWord(test.interval) - if !strings.EqualFold(v, test.name) { - t.Fatalf("%v: received %v expected %v", test.name, v, test.name) - } - }) + for _, tc := range testCases { + require.Equal(t, tc.name, durationToWord(tc.interval)) } } diff --git a/exchanges/kline/kline_types.go b/exchanges/kline/kline_types.go index a062ce97..bb0132a7 100644 --- a/exchanges/kline/kline_types.go +++ b/exchanges/kline/kline_types.go @@ -11,40 +11,43 @@ import ( // Consts here define basic time intervals const ( - Raw = Interval(-1) - HundredMilliseconds = Interval(100 * time.Millisecond) - ThousandMilliseconds = 10 * HundredMilliseconds - TenSecond = Interval(10 * time.Second) - FifteenSecond = Interval(15 * time.Second) - ThirtySecond = 2 * FifteenSecond - OneMin = Interval(time.Minute) - ThreeMin = 3 * OneMin - FiveMin = 5 * OneMin - TenMin = 10 * OneMin - FifteenMin = 15 * OneMin - ThirtyMin = 30 * OneMin - OneHour = Interval(time.Hour) - TwoHour = 2 * OneHour - ThreeHour = 3 * OneHour - FourHour = 4 * OneHour - SixHour = 6 * OneHour - SevenHour = 7 * OneHour - EightHour = 8 * OneHour - TwelveHour = 12 * OneHour - OneDay = 24 * OneHour - TwoDay = 2 * OneDay - ThreeDay = 3 * OneDay - SevenDay = 7 * OneDay - FifteenDay = 15 * OneDay - OneWeek = 7 * OneDay - TwoWeek = 2 * OneWeek - ThreeWeek = 3 * OneWeek - OneMonth = 30 * OneDay - ThreeMonth = 90 * OneDay - SixMonth = 2 * ThreeMonth - NineMonth = 3 * ThreeMonth - OneYear = 365 * OneDay - FiveDay = 5 * OneDay + Raw = Interval(-1) + TenMilliseconds = Interval(10 * time.Millisecond) + TwentyMilliseconds = 2 * TenMilliseconds + HundredMilliseconds = Interval(100 * time.Millisecond) + TwoHundredAndFiftyMilliseconds = Interval(250 * time.Millisecond) + ThousandMilliseconds = 10 * HundredMilliseconds + TenSecond = Interval(10 * time.Second) + FifteenSecond = Interval(15 * time.Second) + ThirtySecond = 2 * FifteenSecond + OneMin = Interval(time.Minute) + ThreeMin = 3 * OneMin + FiveMin = 5 * OneMin + TenMin = 10 * OneMin + FifteenMin = 15 * OneMin + ThirtyMin = 30 * OneMin + OneHour = Interval(time.Hour) + TwoHour = 2 * OneHour + ThreeHour = 3 * OneHour + FourHour = 4 * OneHour + SixHour = 6 * OneHour + SevenHour = 7 * OneHour + EightHour = 8 * OneHour + TwelveHour = 12 * OneHour + OneDay = 24 * OneHour + TwoDay = 2 * OneDay + ThreeDay = 3 * OneDay + SevenDay = 7 * OneDay + FifteenDay = 15 * OneDay + OneWeek = 7 * OneDay + TwoWeek = 2 * OneWeek + ThreeWeek = 3 * OneWeek + OneMonth = 30 * OneDay + ThreeMonth = 90 * OneDay + SixMonth = 2 * ThreeMonth + NineMonth = 3 * ThreeMonth + OneYear = 365 * OneDay + FiveDay = 5 * OneDay ) var ( diff --git a/exchanges/orderbook/orderbook_types.go b/exchanges/orderbook/orderbook_types.go index c5721bb0..47231c04 100644 --- a/exchanges/orderbook/orderbook_types.go +++ b/exchanges/orderbook/orderbook_types.go @@ -166,7 +166,7 @@ const ( // Update and things and stuff type Update struct { - UpdateID int64 // Used when no time is provided + UpdateID int64 UpdateTime time.Time UpdatePushedAt time.Time Asset asset.Item @@ -176,6 +176,8 @@ type Update struct { Pair currency.Pair // Checksum defines the expected value when the books have been verified Checksum uint32 + // AllowEmpty, when true, permits loading an empty order book update to set an UpdateID without including actual data. + AllowEmpty bool } // Movement defines orderbook traversal details from either hitting the bids or diff --git a/exchanges/subscription/fixtures_test.go b/exchanges/subscription/fixtures_test.go index e37272a5..f11888d7 100644 --- a/exchanges/subscription/fixtures_test.go +++ b/exchanges/subscription/fixtures_test.go @@ -1,6 +1,8 @@ package subscription import ( + "errors" + "fmt" "maps" "strings" "testing" @@ -13,6 +15,33 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) +var errValidateSubscriptionsTestError = errors.New("validate subscriptions test error") + +type mockExWithSubValidator struct { + GenerateBadSubscription bool + FailGetSubscriptions bool + *mockEx +} + +func (m *mockExWithSubValidator) ValidateSubscriptions(in List) error { + for _, sub := range in { + if sub.Channel == "fail-channel" { + return fmt.Errorf("%w: '%s'", errValidateSubscriptionsTestError, sub.String()) + } + } + return nil +} + +func (m *mockExWithSubValidator) GetSubscriptions() (List, error) { + if m.GenerateBadSubscription { + return List{{Channel: "fail-channel"}}, nil + } + if m.FailGetSubscriptions { + return nil, ErrNotFound + } + return nil, nil +} + type mockEx struct { pairs assetPairs assets asset.Items @@ -76,6 +105,7 @@ func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, e func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return m.assets } func (m *mockEx) CanUseAuthenticatedWebsocketEndpoints() bool { return m.auth } +func (m *mockEx) GetSubscriptions() (List, error) { return nil, nil } // equalLists is a utility function to compare subscription lists and show a pretty failure message // It overcomes the verbose depth of assert.ElementsMatch spewConfig diff --git a/exchanges/subscription/list.go b/exchanges/subscription/list.go index 8bd71296..3e896b28 100644 --- a/exchanges/subscription/list.go +++ b/exchanges/subscription/list.go @@ -22,6 +22,7 @@ type IExchange interface { GetSubscriptionTemplate(*Subscription) (*template.Template, error) CanUseAuthenticatedWebsocketEndpoints() bool IsAssetWebsocketSupported(a asset.Item) bool + GetSubscriptions() (List, error) } // Strings returns a sorted slice of subscriptions diff --git a/exchanges/subscription/subscription.go b/exchanges/subscription/subscription.go index 6b2cfc24..8a3fcfda 100644 --- a/exchanges/subscription/subscription.go +++ b/exchanges/subscription/subscription.go @@ -38,19 +38,27 @@ const ( // Public errors var ( - ErrNotFound = errors.New("subscription not found") - ErrNotSinglePair = errors.New("only single pair subscriptions expected") - ErrBatchingNotSupported = errors.New("subscription batching not supported") - ErrInStateAlready = errors.New("subscription already in state") - ErrInvalidState = errors.New("invalid subscription state") - ErrDuplicate = errors.New("duplicate subscription") - ErrUseConstChannelName = errors.New("must use standard channel name constants") - ErrNotSupported = errors.New("subscription channel not supported") + ErrNotFound = errors.New("subscription not found") + ErrNotSinglePair = errors.New("only single pair subscriptions expected") + ErrBatchingNotSupported = errors.New("subscription batching not supported") + ErrInStateAlready = errors.New("subscription already in state") + ErrInvalidState = errors.New("invalid subscription state") + ErrDuplicate = errors.New("duplicate subscription") + ErrUseConstChannelName = errors.New("must use standard channel name constants") + ErrNotSupported = errors.New("subscription channel not supported") + ErrExclusiveSubscription = errors.New("exclusive subscription detected") + ErrInvalidInterval = errors.New("invalid interval") + ErrInvalidLevel = errors.New("invalid level") ) // State tracks the status of a subscription channel type State uint8 +// ListValidator validates a list of subscriptions, this is optionally handled through expand templates method +type ListValidator interface { + ValidateSubscriptions(List) error +} + // Subscription container for streaming subscriptions type Subscription struct { Enabled bool `json:"enabled"` diff --git a/exchanges/subscription/template.go b/exchanges/subscription/template.go index 69c88fa7..f4a1d825 100644 --- a/exchanges/subscription/template.go +++ b/exchanges/subscription/template.go @@ -40,6 +40,7 @@ type tplCtx struct { // Calls e.GetSubscriptionTemplate to find a template for each subscription // Filters out Authenticated subscriptions if !e.CanUseAuthenticatedEndpoints // See README.md for more details +// The exchange can optionally implement ListValidator to have custom validation on subscriptions func (l List) ExpandTemplates(e IExchange) (List, error) { if !slices.ContainsFunc(l, func(s *Subscription) bool { return s.QualifiedChannel == "" }) { // Empty list, or already processed @@ -72,8 +73,21 @@ func (l List) ExpandTemplates(e IExchange) (List, error) { expanded, err2 := expandTemplate(e, s, maps.Clone(ap), assets) if err2 != nil { err = common.AppendError(err, fmt.Errorf("%s: %w", s, err2)) - } else { - subs = append(subs, expanded...) + continue + } + + subs = append(subs, expanded...) + } + + // Validate the subscriptions after expansion to capture fields that will be used in the template + if v, ok := e.(ListValidator); ok { + // Need to check against the already stored subscriptions, as we add additional subscriptions + storedSubs, err := e.GetSubscriptions() + if err != nil { + return nil, err + } + if err := v.ValidateSubscriptions(slices.Concat(subs, storedSubs)); err != nil { + return nil, fmt.Errorf("error validating subscriptions: %w", err) } } diff --git a/exchanges/subscription/template_test.go b/exchanges/subscription/template_test.go index f6badea5..28e5bad6 100644 --- a/exchanges/subscription/template_test.go +++ b/exchanges/subscription/template_test.go @@ -31,6 +31,14 @@ func TestExpandTemplates(t *testing.T) { {Channel: "batching", Asset: asset.Spot}, {Channel: "single-channel", Authenticated: true}, } + + _, err := l.ExpandTemplates(&mockExWithSubValidator{mockEx: e, GenerateBadSubscription: true}) + require.ErrorIs(t, err, errValidateSubscriptionsTestError) + _, err = l.ExpandTemplates(&mockExWithSubValidator{mockEx: e}) + require.NoError(t, err) + _, err = l.ExpandTemplates(&mockExWithSubValidator{mockEx: e, FailGetSubscriptions: true}) + require.ErrorIs(t, err, ErrNotFound) + got, err := l.ExpandTemplates(e) require.NoError(t, err, "ExpandTemplates must not error") exp := List{