gateio: Fix websocket orderbook incremental updates (#1863)

* gateio: Add websocket orderbook update manager

* RM println

* glorious: nit

* Adds delivery futures update processing as well

* Change to const value for delivery

* Drop check out of order, can reinstate if required.

* Adds in validation methods to ensure config changes are correct when expanding templates and return errors with correct info if not.

* fix some things and add in todo when this gets updated

* fix spelling

* linter: fix

* gk: initial nits

* gk: nits shift to template only verification with funcmap, rm interface for single sub checking.

* rm unused error

* linter: fix

* update to const frequency

* gk: wrap with panic and single invocation in template, change name

* gk: nits to check across stored subs with incoming subs

* linter: fix

* updates names, makes things slightly more efficient and adds tests

* linter: fix

* gk: sexc patch v2

* glorious: nits

* gk: nits

* Update exchanges/subscription/template.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

* linter: make peace with linter regulations

* glorious: Add TODO for future template integration

* glorious: commentary nits

* fix name

* give me a break, have a kit kat

* revert whoops

* update wording on comment

* revert secondary call to expand templates and update tests

* misc lint: fix

* Add spot orderbook update interval for 20ms, expand tests, piggy back limit/level off loaded subscription. Thanks to @thrasher-

* linter/spell: fix

* ai nits: drop go routine on mtx RUnlock

* Update exchanges/gateio/ws_ob_update_manager.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: revert to 100ms from 20ms waiting for config upgrade patch

* test: fix

* cranktakular: nits

* strings quoted in fmt call

* thrasher-: nits

* Update exchanges/gateio/gateio_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/gateio_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: nits

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
This commit is contained in:
Ryan O'Hara-Reid
2025-05-23 17:29:39 +10:00
committed by GitHub
parent 640960aec1
commit b281759573
22 changed files with 1145 additions and 373 deletions

View File

@@ -15,6 +15,11 @@ import (
const packageError = "websocket orderbook buffer error: %w"
// Public err vars
var (
ErrDepthNotFound = errors.New("orderbook depth not found")
)
var (
errExchangeConfigNil = errors.New("exchange config is nil")
errBufferConfigNil = errors.New("buffer config is nil")
@@ -22,7 +27,6 @@ var (
errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set")
errUpdateIsNil = errors.New("update is nil")
errUpdateNoTargets = errors.New("update bid/ask targets cannot be nil")
errDepthNotFound = errors.New("orderbook depth not found")
errRESTOverwrite = errors.New("orderbook has been overwritten by REST protocol")
errInvalidAction = errors.New("invalid action")
errAmendFailure = errors.New("orderbook amend update failure")
@@ -70,7 +74,7 @@ func (w *Orderbook) validate(u *orderbook.Update) error {
if u == nil {
return fmt.Errorf(packageError, errUpdateIsNil)
}
if len(u.Bids) == 0 && len(u.Asks) == 0 {
if len(u.Bids) == 0 && len(u.Asks) == 0 && !u.AllowEmpty {
return fmt.Errorf(packageError, errUpdateNoTargets)
}
return nil
@@ -87,7 +91,7 @@ func (w *Orderbook) Update(u *orderbook.Update) error {
book, ok := w.ob[key.PairAsset{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}]
if !ok {
return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s",
errDepthNotFound,
ErrDepthNotFound,
w.exchangeName,
u.Pair,
u.Asset)
@@ -309,15 +313,38 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
// GetOrderbook returns an orderbook copy as orderbook.Base
func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base, error) {
if p.IsEmpty() {
return nil, currency.ErrCurrencyPairEmpty
}
if !a.IsValid() {
return nil, asset.ErrInvalidAsset
}
w.mtx.Lock()
defer w.mtx.Unlock()
book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
if !ok {
return nil, fmt.Errorf("%s %s %s %w", w.exchangeName, p, a, errDepthNotFound)
return nil, fmt.Errorf("%s %w: %s.%s", w.exchangeName, ErrDepthNotFound, a, p)
}
return book.ob.Retrieve()
}
// LastUpdateID returns the last update ID of the orderbook
func (w *Orderbook) LastUpdateID(p currency.Pair, a asset.Item) (int64, error) {
if p.IsEmpty() {
return 0, currency.ErrCurrencyPairEmpty
}
if !a.IsValid() {
return 0, asset.ErrInvalidAsset
}
w.mtx.Lock()
defer w.mtx.Unlock()
book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
if !ok {
return 0, fmt.Errorf("%s %w: %s.%s", w.exchangeName, ErrDepthNotFound, a, p)
}
return book.ob.LastUpdateID()
}
// FlushBuffer flushes w.ob data to be garbage collected and refreshed when a
// connection is lost and reconnected
func (w *Orderbook) FlushBuffer() {
@@ -332,11 +359,7 @@ func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error {
defer w.mtx.Unlock()
book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
if !ok {
return fmt.Errorf("cannot flush orderbook %s %s %s %w",
w.exchangeName,
p,
a,
errDepthNotFound)
return fmt.Errorf("cannot flush orderbook %s %s %s %w", w.exchangeName, p, a, ErrDepthNotFound)
}
// error not needed in this return
_ = book.ob.Invalidate(errOrderbookFlushed)

View File

@@ -50,6 +50,7 @@ func createSnapshot(pair currency.Pair, bookVerifiy ...bool) (holder *Orderbook,
PriceDuplication: true,
LastUpdated: time.Now(),
VerifyOrderbook: len(bookVerifiy) > 0 && bookVerifiy[0],
LastUpdateID: 69420,
}
newBook := make(map[key.PairAsset]*orderbookHolder)
@@ -462,7 +463,7 @@ func TestOrderbookLastUpdateID(t *testing.T) {
err = holder.Update(&orderbook.Update{
Asks: asks,
Pair: cp,
UpdateID: int64(i) + 1,
UpdateID: int64(i) + 1 + 69420,
Asset: asset.Spot,
UpdateTime: time.Now(),
})
@@ -480,7 +481,7 @@ func TestOrderbookLastUpdateID(t *testing.T) {
ob, err := holder.GetOrderbook(cp, asset.Spot)
require.NoError(t, err)
assert.Equal(t, int64(len(itemArray)), ob.LastUpdateID)
assert.Equal(t, int64(len(itemArray)+69420), ob.LastUpdateID)
}
// TestRunUpdateWithoutSnapshot logic test
@@ -500,7 +501,7 @@ func TestRunUpdateWithoutSnapshot(t *testing.T) {
UpdateTime: time.Now(),
Asset: asset.Spot,
})
require.ErrorIs(t, err, errDepthNotFound)
require.ErrorIs(t, err, ErrDepthNotFound)
}
// TestRunUpdateWithoutAnyUpdates logic test
@@ -723,6 +724,12 @@ func TestGetOrderbook(t *testing.T) {
holder, _, _, err := createSnapshot(cp)
require.NoError(t, err)
_, err = holder.GetOrderbook(currency.EMPTYPAIR, asset.Spot)
require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
_, err = holder.GetOrderbook(cp, 0)
require.ErrorIs(t, err, asset.ErrInvalidAsset)
ob, err := holder.GetOrderbook(cp, asset.Spot)
require.NoError(t, err)
@@ -736,15 +743,35 @@ func TestGetOrderbook(t *testing.T) {
bidLen, err := bufferOb.ob.GetBidLength()
require.NoError(t, err)
if askLen != len(ob.Asks) ||
bidLen != len(ob.Bids) ||
b.Asset != ob.Asset ||
b.Exchange != ob.Exchange ||
b.LastUpdateID != ob.LastUpdateID ||
b.PriceDuplication != ob.PriceDuplication ||
b.Pair != ob.Pair {
t.Fatal("data on both books should be the same")
}
assert.Equal(t, askLen, len(ob.Asks), "ask length mismatch")
assert.Equal(t, bidLen, len(ob.Bids), "bid length mismatch")
assert.Equal(t, b.Asset, ob.Asset, "asset mismatch")
assert.Equal(t, b.Exchange, ob.Exchange, "exchange name mismatch")
assert.Equal(t, b.LastUpdateID, ob.LastUpdateID, "last update ID mismatch")
assert.Equal(t, b.PriceDuplication, ob.PriceDuplication, "price duplication mismatch")
assert.Equal(t, b.Pair, ob.Pair, "pair mismatch")
}
func TestLastUpdateID(t *testing.T) {
t.Parallel()
cp, err := getExclusivePair()
require.NoError(t, err)
holder, _, _, err := createSnapshot(cp)
require.NoError(t, err)
_, err = holder.LastUpdateID(currency.EMPTYPAIR, asset.Spot)
require.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
_, err = holder.LastUpdateID(cp, 0)
require.ErrorIs(t, err, asset.ErrInvalidAsset)
_, err = holder.LastUpdateID(cp, asset.FutureCombo)
require.ErrorIs(t, err, ErrDepthNotFound)
ob, err := holder.LastUpdateID(cp, asset.Spot)
require.NoError(t, err)
require.Equal(t, int64(69420), ob)
}
func TestSetup(t *testing.T) {
@@ -982,7 +1009,7 @@ func TestFlushOrderbook(t *testing.T) {
}
_, err = w.GetOrderbook(cp, asset.Spot)
require.ErrorIs(t, err, errDepthNotFound)
require.ErrorIs(t, err, ErrDepthNotFound)
require.NoError(t, w.LoadSnapshot(&snapShot1))
require.NoError(t, w.FlushOrderbook(cp, asset.Spot))

View File

@@ -189,6 +189,7 @@ func timeInForceFromString(tif string) (order.TimeInForce, error) {
type Gateio struct {
Counter common.Counter // Must be first due to alignment requirements
exchange.Base
wsOBUpdateMgr *wsOBUpdateManager
}
// ***************************************** SubAccounts ********************************
@@ -334,43 +335,23 @@ func (g *Gateio) GetTicker(ctx context.Context, currencyPair, timezone string) (
return nil, fmt.Errorf("no ticker data found for currency pair %v", currencyPair)
}
// GetIntervalString returns a string representation of the interval according to the Gateio exchange representation.
func (g *Gateio) GetIntervalString(interval kline.Interval) (string, error) {
// getIntervalString returns a string representation of the interval according to the Gateio exchange representation
func getIntervalString(interval kline.Interval) (string, error) {
switch interval {
case kline.HundredMilliseconds:
return "100ms", nil
case kline.ThousandMilliseconds:
return "1000ms", nil
case kline.TenSecond:
return "10s", nil
case kline.ThirtySecond:
return "30s", nil
case kline.OneMin:
return "1m", nil
case kline.FiveMin:
return "5m", nil
case kline.FifteenMin:
return "15m", nil
case kline.ThirtyMin:
return "30m", nil
case kline.OneHour:
return "1h", nil
case kline.TwoHour:
return "2h", nil
case kline.FourHour:
return "4h", nil
case kline.EightHour:
return "8h", nil
case kline.TwelveHour:
return "12h", nil
case kline.OneDay:
return "1d", nil
case kline.SevenDay:
return "7d", nil
case kline.OneMonth:
return "30d", nil
case kline.TenMilliseconds, kline.TwentyMilliseconds, kline.HundredMilliseconds, kline.TwoHundredAndFiftyMilliseconds,
kline.TenSecond, kline.ThirtySecond, kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin,
kline.OneHour, kline.TwoHour, kline.FourHour, kline.EightHour, kline.TwelveHour:
return interval.Short(), nil
default:
return "", kline.ErrUnsupportedInterval
return "", fmt.Errorf("%q: %w", interval.String(), kline.ErrUnsupportedInterval)
}
}
@@ -478,7 +459,7 @@ func (g *Gateio) GetCandlesticks(ctx context.Context, currencyPair currency.Pair
var err error
if interval.Duration().Microseconds() != 0 {
var intervalString string
intervalString, err = g.GetIntervalString(interval)
intervalString, err = getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -1963,7 +1944,7 @@ func (g *Gateio) GetFuturesCandlesticks(ctx context.Context, settle currency.Cod
params.Set("limit", strconv.FormatUint(limit, 10))
}
if interval.Duration().Microseconds() != 0 {
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -1993,7 +1974,7 @@ func (g *Gateio) PremiumIndexKLine(ctx context.Context, settleCurrency currency.
if limit > 0 {
params.Set("limit", strconv.FormatInt(limit, 10))
}
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -2059,7 +2040,7 @@ func (g *Gateio) GetFutureStats(ctx context.Context, settle currency.Code, contr
params.Set("from", strconv.FormatInt(from.Unix(), 10))
}
if int64(interval) != 0 {
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -2735,7 +2716,7 @@ func (g *Gateio) GetDeliveryFuturesCandlesticks(ctx context.Context, settle curr
params.Set("limit", strconv.FormatUint(limit, 10))
}
if int64(interval) != 0 {
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -3502,7 +3483,7 @@ func (g *Gateio) GetOptionFuturesCandlesticks(ctx context.Context, contract curr
if !to.IsZero() {
params.Set("to", strconv.FormatInt(to.Unix(), 10))
}
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -3528,7 +3509,7 @@ func (g *Gateio) GetOptionFuturesMarkPriceCandlesticks(ctx context.Context, unde
params.Set("to", strconv.FormatInt(to.Unix(), 10))
}
if int64(interval) != 0 {
intervalString, err := g.GetIntervalString(interval)
intervalString, err := getIntervalString(interval)
if err != nil {
return nil, err
}

View File

@@ -8,6 +8,7 @@ import (
"log"
"os"
"slices"
"strconv"
"strings"
"sync"
"testing"
@@ -2260,9 +2261,6 @@ func TestGenerateSubscriptionsSpot(t *testing.T) {
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
g.Websocket.SetCanUseAuthenticatedEndpoints(true)
g.Features.Subscriptions = append(g.Features.Subscriptions, &subscription.Subscription{
Enabled: true, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.ThousandMilliseconds, Levels: 5,
})
subs, err := g.generateSubscriptionsSpot()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
@@ -3345,3 +3343,224 @@ func TestTimeInForceString(t *testing.T) {
assert.Equal(t, valid.String, timeInForceString(valid.TimeInForce))
}
}
func TestIsSingleOrderbookChannel(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
channel string
expected bool
}{
{channel: spotOrderbookUpdateChannel, expected: true},
{channel: spotOrderbookChannel, expected: true},
{channel: spotOrderbookTickerChannel, expected: true},
{channel: futuresOrderbookChannel, expected: true},
{channel: futuresOrderbookTickerChannel, expected: true},
{channel: futuresOrderbookUpdateChannel, expected: true},
{channel: optionsOrderbookChannel, expected: true},
{channel: optionsOrderbookTickerChannel, expected: true},
{channel: optionsOrderbookUpdateChannel, expected: true},
{channel: spotTickerChannel, expected: false},
{channel: "sad", expected: false},
} {
assert.Equal(t, tc.expected, isSingleOrderbookChannel(tc.channel))
}
}
func TestValidateSubscriptions(t *testing.T) {
t.Parallel()
require.NoError(t, g.ValidateSubscriptions(nil))
require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSDT()}}}))
require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{
{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
}))
require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{
{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD(), currency.NewBTCUSDT()}},
}))
require.NoError(t, g.ValidateSubscriptions([]*subscription.Subscription{
{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSDT()}},
}))
require.ErrorIs(t, g.ValidateSubscriptions([]*subscription.Subscription{
{Channel: spotTickerChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookUpdateChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
{Channel: spotOrderbookChannel, Pairs: []currency.Pair{currency.NewBTCUSD()}},
}), subscription.ErrExclusiveSubscription)
}
func TestCandlesChannelIntervals(t *testing.T) {
t.Parallel()
s := &subscription.Subscription{Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: 0}
_, err := candlesChannelInterval(s)
require.ErrorIs(t, err, kline.ErrUnsupportedInterval, "candlestickChannelInterval must error correctly with a 0 interval")
s.Interval = kline.ThousandMilliseconds
i, err := candlesChannelInterval(s)
require.NoError(t, err)
assert.Equal(t, "1000ms", i)
}
func TestOrderbookChannelIntervals(t *testing.T) {
t.Parallel()
s := &subscription.Subscription{Channel: futuresOrderbookUpdateChannel, Interval: kline.TwentyMilliseconds, Levels: 100}
_, err := orderbookChannelInterval(s, asset.Futures)
require.ErrorIs(t, err, subscription.ErrInvalidInterval)
require.ErrorContains(t, err, "20ms only valid with Levels 20")
s.Levels = 20
i, err := orderbookChannelInterval(s, asset.Futures)
require.NoError(t, err)
assert.Equal(t, "20ms", i)
for s, exp := range map[*subscription.Subscription]error{
{Asset: asset.Binary, Channel: "unknown_channel", Interval: kline.OneYear}: nil,
{Asset: asset.Spot, Channel: spotOrderbookTickerChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.Spot, Channel: spotOrderbookTickerChannel, Interval: 0}: nil,
{Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.HundredMilliseconds}: nil,
{Asset: asset.Spot, Channel: spotOrderbookChannel, Interval: kline.ThousandMilliseconds}: nil,
{Asset: asset.Spot, Channel: spotOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.Spot, Channel: spotOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil,
{Asset: asset.Futures, Channel: futuresOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.Futures, Channel: futuresOrderbookTickerChannel, Interval: 0}: nil,
{Asset: asset.Futures, Channel: futuresOrderbookChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.Futures, Channel: futuresOrderbookChannel, Interval: 0}: nil,
{Asset: asset.Futures, Channel: futuresOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.Futures, Channel: futuresOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookTickerChannel, Interval: 0}: nil,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookChannel, Interval: 0}: nil,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil,
{Asset: asset.DeliveryFutures, Channel: futuresOrderbookUpdateChannel, Interval: kline.ThousandMilliseconds}: nil,
{Asset: asset.Options, Channel: optionsOrderbookTickerChannel, Interval: kline.TenMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.Options, Channel: optionsOrderbookTickerChannel, Interval: 0}: nil,
{Asset: asset.Options, Channel: optionsOrderbookChannel, Interval: kline.TwoHundredAndFiftyMilliseconds}: subscription.ErrInvalidInterval,
{Asset: asset.Options, Channel: optionsOrderbookChannel, Interval: 0}: nil,
{Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.OneDay}: subscription.ErrInvalidInterval,
{Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.HundredMilliseconds}: nil,
{Asset: asset.Options, Channel: optionsOrderbookUpdateChannel, Interval: kline.ThousandMilliseconds}: nil,
} {
t.Run(s.Asset.String()+"/"+s.Channel+"/"+s.Interval.Short(), func(t *testing.T) {
t.Parallel()
i, err := orderbookChannelInterval(s, s.Asset)
if exp != nil {
require.ErrorIs(t, err, exp)
} else {
switch {
case s.Channel == "unknown_channel":
assert.Empty(t, i, "orderbookChannelInterval should return empty for unknown channels")
case strings.HasSuffix(s.Channel, "_ticker"):
assert.Empty(t, i)
case s.Interval == 0:
assert.Equal(t, "0", i)
default:
exp, err2 := getIntervalString(s.Interval)
require.NoError(t, err2, "getIntervalString must not error for validating expected value")
require.Equal(t, exp, i)
}
}
})
}
}
func TestChannelLevels(t *testing.T) {
t.Parallel()
for s, exp := range map[*subscription.Subscription]error{
{Channel: "unknown_channel", Asset: asset.Binary}: nil,
{Channel: spotOrderbookTickerChannel, Asset: asset.Spot}: nil,
{Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Levels: 1}: subscription.ErrInvalidLevel,
{Channel: spotOrderbookUpdateChannel, Asset: asset.Spot}: nil,
{Channel: spotOrderbookUpdateChannel, Asset: asset.Spot, Levels: 100}: subscription.ErrInvalidLevel,
{Channel: spotOrderbookChannel, Asset: asset.Spot}: subscription.ErrInvalidLevel,
{Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 5}: nil,
{Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 10}: nil,
{Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 20}: nil,
{Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 50}: nil,
{Channel: spotOrderbookChannel, Asset: asset.Spot, Levels: 100}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures}: subscription.ErrInvalidLevel,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 1}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 5}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 10}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 20}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 50}: nil,
{Channel: futuresOrderbookChannel, Asset: asset.Futures, Levels: 100}: nil,
{Channel: futuresOrderbookTickerChannel, Asset: asset.Futures}: nil,
{Channel: futuresOrderbookTickerChannel, Asset: asset.Futures, Levels: 1}: subscription.ErrInvalidLevel,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures}: subscription.ErrInvalidLevel,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures, Levels: 20}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.Futures, Levels: 50}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures}: subscription.ErrInvalidLevel,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 5}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 10}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 20}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 50}: nil,
{Channel: futuresOrderbookUpdateChannel, Asset: asset.DeliveryFutures, Levels: 100}: nil,
{Channel: optionsOrderbookTickerChannel, Asset: asset.Options}: nil,
{Channel: optionsOrderbookTickerChannel, Asset: asset.Options, Levels: 1}: subscription.ErrInvalidLevel,
{Channel: optionsOrderbookUpdateChannel, Asset: asset.Options}: subscription.ErrInvalidLevel,
{Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 5}: nil,
{Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 10}: nil,
{Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 20}: nil,
{Channel: optionsOrderbookUpdateChannel, Asset: asset.Options, Levels: 50}: nil,
{Channel: optionsOrderbookChannel, Asset: asset.Options}: subscription.ErrInvalidLevel,
{Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 5}: nil,
{Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 10}: nil,
{Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 20}: nil,
{Channel: optionsOrderbookChannel, Asset: asset.Options, Levels: 50}: nil,
} {
t.Run(s.Asset.String()+"/"+s.Channel+"/"+strconv.Itoa(s.Levels), func(t *testing.T) {
t.Parallel()
l, err := channelLevels(s, s.Asset)
switch {
case exp != nil:
require.ErrorIs(t, err, exp)
case s.Levels == 0:
assert.Empty(t, l)
default:
require.NoError(t, err)
require.NotEmpty(t, l)
}
})
}
}
func TestGetIntervalString(t *testing.T) {
t.Parallel()
for k, exp := range map[kline.Interval]string{
kline.TenMilliseconds: "10ms",
kline.TwentyMilliseconds: "20ms",
kline.HundredMilliseconds: "100ms",
kline.TwoHundredAndFiftyMilliseconds: "250ms",
kline.ThousandMilliseconds: "1000ms",
kline.TenSecond: "10s",
kline.ThirtySecond: "30s",
kline.OneMin: "1m",
kline.FiveMin: "5m",
kline.FifteenMin: "15m",
kline.ThirtyMin: "30m",
kline.OneHour: "1h",
kline.TwoHour: "2h",
kline.FourHour: "4h",
kline.EightHour: "8h",
kline.TwelveHour: "12h",
kline.OneDay: "1d",
kline.SevenDay: "7d",
kline.OneMonth: "30d",
} {
t.Run(exp, func(t *testing.T) {
t.Parallel()
s, err := getIntervalString(k)
require.NoError(t, err)
assert.Equal(t, exp, s)
})
}
_, err := getIntervalString(0)
assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "0 should be an invalid interval")
_, err = getIntervalString(kline.FiveDay)
assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "Any other random interval should also be invalid")
}

View File

@@ -2050,7 +2050,7 @@ type WsCandlesticks struct {
type WsOrderbookTickerData struct {
UpdateTime types.Time `json:"t"`
UpdateOrderID int64 `json:"u"`
CurrencyPair currency.Pair `json:"s"`
Pair currency.Pair `json:"s"`
BestBidPrice types.Number `json:"b"`
BestBidAmount types.Number `json:"B"`
BestAskPrice types.Number `json:"a"`
@@ -2059,12 +2059,12 @@ type WsOrderbookTickerData struct {
// WsOrderbookUpdate represents websocket orderbook update push data
type WsOrderbookUpdate struct {
UpdateTime types.Time `json:"t"`
CurrencyPair currency.Pair `json:"s"`
FirstOrderbookUpdatedID int64 `json:"U"` // First update order book id in this event since last update
LastOrderbookUpdatedID int64 `json:"u"`
Bids [][2]types.Number `json:"b"`
Asks [][2]types.Number `json:"a"`
UpdateTime types.Time `json:"t"`
Pair currency.Pair `json:"s"`
FirstUpdateID int64 `json:"U"` // First update order book id in this event since last update
LastUpdateID int64 `json:"u"`
Bids [][2]types.Number `json:"b"`
Asks [][2]types.Number `json:"a"`
}
// WsOrderbookSnapshot represents a websocket orderbook snapshot push data
@@ -2225,14 +2225,14 @@ type WsFuturesAndOptionsOrderbookUpdate struct {
ContractName currency.Pair `json:"s"`
FirstUpdatedID int64 `json:"U"`
LastUpdatedID int64 `json:"u"`
Bids []struct {
Price types.Number `json:"p"`
Size float64 `json:"s"`
} `json:"b"`
Asks []struct {
Price types.Number `json:"p"`
Size float64 `json:"s"`
} `json:"a"`
Bids []Tranche `json:"b"`
Asks []Tranche `json:"a"`
}
// Tranche represents a tranche of orderbook data
type Tranche struct {
Price types.Number `json:"p"`
Size float64 `json:"s"`
}
// WsFuturesOrderbookSnapshot represents a futures orderbook snapshot push data
@@ -2240,14 +2240,8 @@ type WsFuturesOrderbookSnapshot struct {
Timestamp types.Time `json:"t"`
Contract currency.Pair `json:"contract"`
OrderbookID int64 `json:"id"`
Asks []struct {
Price types.Number `json:"p"`
Size float64 `json:"s"`
} `json:"asks"`
Bids []struct {
Price types.Number `json:"p"`
Size float64 `json:"s"`
} `json:"bids"`
Asks []Tranche `json:"asks"`
Bids []Tranche `json:"bids"`
}
// WsFuturesOrderbookUpdateEvent represents futures orderbook push data with the event 'update'

View File

@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strconv"
"strings"
"text/template"
@@ -17,6 +18,7 @@ import (
"github.com/buger/jsonparser"
gws "github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
@@ -59,14 +61,14 @@ var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.Spot},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FiveMin},
{Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds},
{Enabled: false, Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Interval: kline.TenMilliseconds, Levels: 1},
{Enabled: false, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds, Levels: 100},
{Enabled: true, Channel: spotBalancesChannel, Asset: asset.Spot, Authenticated: true},
{Enabled: true, Channel: crossMarginBalanceChannel, Asset: asset.CrossMargin, Authenticated: true},
{Enabled: true, Channel: marginBalancesChannel, Asset: asset.Margin, Authenticated: true},
{Enabled: false, Channel: subscription.AllTradesChannel, Asset: asset.Spot},
}
var fetchedCurrencyPairSnapshotOrderbook = make(map[string]bool)
var subscriptionNames = map[string]string{
subscription.TickerChannel: spotTickerChannel,
subscription.OrderbookChannel: spotOrderbookUpdateChannel,
@@ -187,7 +189,7 @@ func (g *Gateio) WsHandleSpotData(ctx context.Context, respRaw []byte) error {
case spotOrderbookTickerChannel:
return g.processOrderbookTicker(push.Result, push.Time)
case spotOrderbookUpdateChannel:
return g.processOrderbookUpdate(push.Result, push.Time)
return g.processOrderbookUpdate(ctx, push.Result, push.Time)
case spotOrderbookChannel:
return g.processOrderbookSnapshot(push.Result, push.Time)
case spotOrdersChannel:
@@ -358,7 +360,7 @@ func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Tim
}
return g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: data.CurrencyPair,
Pair: data.Pair,
Asset: asset.Spot,
LastUpdated: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
@@ -367,41 +369,11 @@ func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Tim
})
}
func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Time) error {
func (g *Gateio) processOrderbookUpdate(ctx context.Context, incoming []byte, updatePushedAt time.Time) error {
var data WsOrderbookUpdate
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
if len(data.Asks) == 0 && len(data.Bids) == 0 {
return nil
}
enabledAssets := make([]asset.Item, 0, len(standardMarginAssetTypes))
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
enabledAssets = append(enabledAssets, a)
}
}
sPair := data.CurrencyPair.String()
if !fetchedCurrencyPairSnapshotOrderbook[sPair] {
orderbooks, err := g.UpdateOrderbook(context.Background(), data.CurrencyPair, asset.Spot) // currency pair orderbook data for Spot, Margin, and Cross Margin is same
if err != nil {
return err
}
// TODO: handle orderbook update synchronisation
for _, a := range enabledAssets {
assetOrderbook := *orderbooks
assetOrderbook.Asset = a
err = g.Websocket.Orderbook.LoadSnapshot(&assetOrderbook)
if err != nil {
return err
}
}
fetchedCurrencyPairSnapshotOrderbook[sPair] = true
}
asks := make([]orderbook.Tranche, len(data.Asks))
for x := range data.Asks {
asks[x].Price = data.Asks[x][0].Float64()
@@ -412,21 +384,16 @@ func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Tim
bids[x].Price = data.Bids[x][0].Float64()
bids[x].Amount = data.Bids[x][1].Float64()
}
for _, a := range enabledAssets {
if err := g.Websocket.Orderbook.Update(&orderbook.Update{
UpdateTime: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
Pair: data.CurrencyPair,
Asset: a,
Asks: asks,
Bids: bids,
}); err != nil {
return err
}
}
return nil
return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdateID, &orderbook.Update{
UpdateID: data.LastUpdateID,
UpdateTime: data.UpdateTime.Time(),
UpdatePushedAt: updatePushedAt,
Pair: data.Pair,
Asset: asset.Spot,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.Time) error {
@@ -682,9 +649,10 @@ func (g *Gateio) GetSubscriptionTemplate(_ *subscription.Subscription) (*templat
Funcs(template.FuncMap{
"channelName": channelName,
"singleSymbolChannel": singleSymbolChannel,
"interval": g.GetIntervalString,
}).
Parse(subTplText)
"orderbookInterval": orderbookChannelInterval,
"candlesInterval": candlesChannelInterval,
"levels": channelLevels,
}).Parse(subTplText)
}
// manageSubs sends a websocket message to subscribe or unsubscribe from a list of channel
@@ -782,20 +750,166 @@ func singleSymbolChannel(name string) bool {
return false
}
// ValidateSubscriptions implements the subscription.ListValidator interface.
// It ensures that, for each orderbook pair asset, only one type of subscription (e.g., best bid/ask, orderbook update, or orderbook snapshot)
// is active at a time. Multiple concurrent subscriptions for the same asset are disallowed to prevent orderbook data corruption.
func (g *Gateio) ValidateSubscriptions(l subscription.List) error {
orderbookGuard := map[key.PairAsset]string{}
for _, s := range l {
n := channelName(s)
if !isSingleOrderbookChannel(n) {
continue
}
for _, p := range s.Pairs {
k := key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: s.Asset}
existingChanName, ok := orderbookGuard[k]
if !ok {
orderbookGuard[k] = n
continue
}
if existingChanName != n {
return fmt.Errorf("%w for %q %q between %q and %q, please enable only one type", subscription.ErrExclusiveSubscription, k.Pair(), k.Asset, existingChanName, n)
}
}
}
return nil
}
// isSingleOrderbookChannel checks if the specified channel represents a single orderbook subscription.
// It returns true for channels like orderbook updates, snapshots, or tickers, as multiple subscriptions
// for the same pair asset could corrupt the stored orderbook data.
func isSingleOrderbookChannel(name string) bool {
switch name {
case spotOrderbookUpdateChannel,
spotOrderbookChannel,
spotOrderbookTickerChannel,
futuresOrderbookChannel,
futuresOrderbookTickerChannel,
futuresOrderbookUpdateChannel,
optionsOrderbookChannel,
optionsOrderbookTickerChannel,
optionsOrderbookUpdateChannel:
return true
}
return false
}
var channelIntervalsMap = map[asset.Item]map[string][]kline.Interval{
asset.Spot: {
spotOrderbookTickerChannel: {},
spotOrderbookChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds},
spotOrderbookUpdateChannel: {kline.TwentyMilliseconds, kline.HundredMilliseconds},
},
asset.Futures: {
futuresOrderbookTickerChannel: {},
futuresOrderbookChannel: {0},
futuresOrderbookUpdateChannel: {kline.TwentyMilliseconds, kline.HundredMilliseconds},
},
asset.DeliveryFutures: {
futuresOrderbookTickerChannel: {},
futuresOrderbookChannel: {0},
futuresOrderbookUpdateChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds},
},
asset.Options: {
optionsOrderbookTickerChannel: {},
optionsOrderbookChannel: {0},
optionsOrderbookUpdateChannel: {kline.HundredMilliseconds, kline.ThousandMilliseconds},
},
}
func candlesChannelInterval(s *subscription.Subscription) (string, error) {
if s.Channel == subscription.CandlesChannel {
return getIntervalString(s.Interval)
}
return "", nil
}
func orderbookChannelInterval(s *subscription.Subscription, a asset.Item) (string, error) {
cName := channelName(s)
assetChannels, ok := channelIntervalsMap[a]
if !ok {
return "", nil
}
switch intervals, ok := assetChannels[cName]; {
case !ok:
return "", nil
case len(intervals) == 0:
if s.Interval != 0 {
return "", fmt.Errorf("%w for %s: %q; interval not supported for channel", subscription.ErrInvalidInterval, cName, s.Interval)
}
return "", nil
case !slices.Contains(intervals, s.Interval):
return "", fmt.Errorf("%w for %s: %q; supported: %q", subscription.ErrInvalidInterval, cName, s.Interval, intervals)
case cName == futuresOrderbookUpdateChannel && s.Interval == kline.TwentyMilliseconds && s.Levels != 20:
return "", fmt.Errorf("%w for %q: 20ms only valid with Levels 20", subscription.ErrInvalidInterval, cName)
case s.Interval == 0:
return "0", nil // Do not move this into getIntervalString, it's only valid for ws subs
}
return getIntervalString(s.Interval)
}
var channelLevelsMap = map[asset.Item]map[string][]int{
asset.Spot: {
spotOrderbookTickerChannel: {},
spotOrderbookUpdateChannel: {},
spotOrderbookChannel: {1, 5, 10, 20, 50, 100},
},
asset.Futures: {
futuresOrderbookChannel: {1, 5, 10, 20, 50, 100},
futuresOrderbookTickerChannel: {},
futuresOrderbookUpdateChannel: {20, 50, 100},
},
asset.DeliveryFutures: {
futuresOrderbookChannel: {1, 5, 10, 20, 50, 100},
futuresOrderbookTickerChannel: {},
futuresOrderbookUpdateChannel: {5, 10, 20, 50, 100},
},
asset.Options: {
optionsOrderbookTickerChannel: {},
optionsOrderbookUpdateChannel: {5, 10, 20, 50},
optionsOrderbookChannel: {5, 10, 20, 50},
},
}
func channelLevels(s *subscription.Subscription, a asset.Item) (string, error) {
cName := channelName(s)
assetChannels, ok := channelLevelsMap[a]
if !ok {
return "", nil
}
switch levels, ok := assetChannels[cName]; {
case !ok:
return "", nil
case len(levels) == 0:
if s.Levels != 0 {
return "", fmt.Errorf("%w for %s: `%d`; levels not supported for channel", subscription.ErrInvalidLevel, cName, s.Levels)
}
return "", nil
case !slices.Contains(levels, s.Levels):
return "", fmt.Errorf("%w for %s: %d; supported: %v", subscription.ErrInvalidLevel, cName, s.Levels, levels)
}
return strconv.Itoa(s.Levels), nil
}
const subTplText = `
{{- with $name := channelName $.S }}
{{- range $asset, $pairs := $.AssetPairs }}
{{- if singleSymbolChannel $name }}
{{- range $i, $p := $pairs -}}
{{- if eq $name "spot.candlesticks" }}{{ interval $.S.Interval -}} , {{- end }}
{{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }}
{{- $p }}
{{- if eq "spot.order_book" $name -}} , {{- $.S.Levels }}{{ end }}
{{- if hasPrefix "spot.order_book" $name -}} , {{- interval $.S.Interval }}{{ end }}
{{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }}
{{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }}
{{- $.PairSeparator }}
{{- end }}
{{- $.AssetSeparator }}
{{- else }}
{{- $pairs.Join }}
{{- $pairs.Join }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -25,25 +25,23 @@ const (
// delivery testnet urls
deliveryTestNetBTCTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/btc" //nolint:unused // Can be used for testing
deliveryTestNetUSDTTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/usdt" //nolint:unused // Can be used for testing
deliveryFuturesUpdateLimit uint64 = 100
)
var defaultDeliveryFuturesSubscriptions = []string{
futuresTickersChannel,
futuresTradesChannel,
futuresOrderbookChannel,
futuresOrderbookUpdateChannel,
futuresCandlesticksChannel,
}
var fetchedFuturesCurrencyPairSnapshotOrderbook = make(map[string]bool)
// WsDeliveryFuturesConnect initiates a websocket connection for delivery futures account
func (g *Gateio) WsDeliveryFuturesConnect(ctx context.Context, conn websocket.Connection) error {
err := g.CurrencyPairs.IsAssetEnabled(asset.DeliveryFutures)
if err != nil {
if err := g.CurrencyPairs.IsAssetEnabled(asset.DeliveryFutures); err != nil {
return err
}
err = conn.DialContext(ctx, &gws.Dialer{}, http.Header{})
if err != nil {
if err := conn.DialContext(ctx, &gws.Dialer{}, http.Header{}); err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{
@@ -64,6 +62,7 @@ func (g *Gateio) WsDeliveryFuturesConnect(ctx context.Context, conn websocket.Co
}
// GenerateDeliveryFuturesDefaultSubscriptions returns delivery futures default subscriptions params.
// TODO: Update to use the new subscription template system
func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.List, error) {
_, err := g.GetCredentials(context.Background())
if err != nil {
@@ -92,6 +91,9 @@ func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() (subscription.Lis
params["interval"] = "0"
case futuresCandlesticksChannel:
params["interval"] = kline.FiveMin
case futuresOrderbookUpdateChannel:
params["frequency"] = kline.HundredMilliseconds
params["level"] = strconv.FormatUint(deliveryFuturesUpdateLimit, 10)
}
fPair, err := g.FormatExchangeCurrency(pairs[j], asset.DeliveryFutures)
if err != nil {
@@ -165,7 +167,7 @@ func (g *Gateio) generateDeliveryFuturesPayload(ctx context.Context, conn websoc
frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval)
if okay {
var frequencyString string
frequencyString, err = g.GetIntervalString(frequency)
frequencyString, err = getIntervalString(frequency)
if err != nil {
return nil, err
}
@@ -188,7 +190,7 @@ func (g *Gateio) generateDeliveryFuturesPayload(ctx context.Context, conn websoc
interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if okay {
var intervalString string
intervalString, err = g.GetIntervalString(interval)
intervalString, err = getIntervalString(interval)
if err != nil {
return nil, err
}

View File

@@ -45,12 +45,13 @@ const (
futuresReduceRiskLimitsChannel = "futures.reduce_risk_limits"
futuresPositionsChannel = "futures.positions"
futuresAutoOrdersChannel = "futures.autoorders"
futuresOrderbookUpdateLimit uint64 = 20
)
var defaultFuturesSubscriptions = []string{
futuresTickersChannel,
futuresTradesChannel,
futuresOrderbookChannel,
futuresOrderbookUpdateChannel,
futuresCandlesticksChannel,
}
@@ -85,6 +86,7 @@ func (g *Gateio) WsFuturesConnect(ctx context.Context, conn websocket.Connection
}
// GenerateFuturesDefaultSubscriptions returns default subscriptions information.
// TODO: Update to use the new subscription template system
func (g *Gateio) GenerateFuturesDefaultSubscriptions(a asset.Item) (subscription.List, error) {
channelsToSubscribe := defaultFuturesSubscriptions
if g.Websocket.CanUseAuthenticatedEndpoints() {
@@ -110,8 +112,9 @@ func (g *Gateio) GenerateFuturesDefaultSubscriptions(a asset.Item) (subscription
case futuresCandlesticksChannel:
params["interval"] = kline.FiveMin
case futuresOrderbookUpdateChannel:
params["frequency"] = kline.ThousandMilliseconds
params["level"] = "100"
// This is the fastest frequency available for futures orderbook updates 20 levels every 20ms
params["frequency"] = kline.TwentyMilliseconds
params["level"] = strconv.FormatUint(futuresOrderbookUpdateLimit, 10)
}
fPair, err := g.FormatExchangeCurrency(pairs[j], a)
if err != nil {
@@ -163,7 +166,7 @@ func (g *Gateio) WsHandleFuturesData(ctx context.Context, respRaw []byte, a asse
case futuresOrderbookTickerChannel:
return g.processFuturesOrderbookTicker(push.Result)
case futuresOrderbookUpdateChannel:
return g.processFuturesAndOptionsOrderbookUpdate(push.Result, a)
return g.processFuturesOrderbookUpdate(ctx, push.Result, a, push.Time)
case futuresCandlesticksChannel:
return g.processFuturesCandlesticks(respRaw, a)
case futuresOrdersChannel:
@@ -247,7 +250,7 @@ func (g *Gateio) generateFuturesPayload(ctx context.Context, conn websocket.Conn
frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval)
if okay {
var frequencyString string
frequencyString, err = g.GetIntervalString(frequency)
frequencyString, err = getIntervalString(frequency)
if err != nil {
return nil, err
}
@@ -270,7 +273,7 @@ func (g *Gateio) generateFuturesPayload(ctx context.Context, conn websocket.Conn
interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if okay {
var intervalString string
intervalString, err = g.GetIntervalString(interval)
intervalString, err = getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -403,50 +406,32 @@ func (g *Gateio) processFuturesOrderbookTicker(incoming []byte) error {
return nil
}
func (g *Gateio) processFuturesAndOptionsOrderbookUpdate(incoming []byte, assetType asset.Item) error {
func (g *Gateio) processFuturesOrderbookUpdate(ctx context.Context, incoming []byte, a asset.Item, pushTime time.Time) error {
var data WsFuturesAndOptionsOrderbookUpdate
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
if (assetType == asset.Options && !fetchedOptionsCurrencyPairSnapshotOrderbook[data.ContractName.String()]) ||
(assetType != asset.Options && !fetchedFuturesCurrencyPairSnapshotOrderbook[data.ContractName.String()]) {
orderbooks, err := g.UpdateOrderbook(context.Background(), data.ContractName, assetType)
if err != nil {
return err
}
if orderbooks.LastUpdateID < data.FirstUpdatedID || orderbooks.LastUpdateID > data.LastUpdatedID {
return nil
}
err = g.Websocket.Orderbook.LoadSnapshot(orderbooks)
if err != nil {
return err
}
if assetType == asset.Options {
fetchedOptionsCurrencyPairSnapshotOrderbook[data.ContractName.String()] = true
} else {
fetchedFuturesCurrencyPairSnapshotOrderbook[data.ContractName.String()] = true
}
}
updates := orderbook.Update{
UpdateTime: data.Timestamp.Time(),
Pair: data.ContractName,
Asset: assetType,
}
updates.Asks = make([]orderbook.Tranche, len(data.Asks))
asks := make([]orderbook.Tranche, len(data.Asks))
for x := range data.Asks {
updates.Asks[x].Amount = data.Asks[x].Size
updates.Asks[x].Price = data.Asks[x].Price.Float64()
asks[x].Price = data.Asks[x].Price.Float64()
asks[x].Amount = data.Asks[x].Size
}
updates.Bids = make([]orderbook.Tranche, len(data.Bids))
bids := make([]orderbook.Tranche, len(data.Bids))
for x := range data.Bids {
updates.Bids[x].Amount = data.Bids[x].Size
updates.Bids[x].Price = data.Bids[x].Price.Float64()
bids[x].Price = data.Bids[x].Price.Float64()
bids[x].Amount = data.Bids[x].Size
}
if len(updates.Asks) == 0 && len(updates.Bids) == 0 {
return errors.New("malformed orderbook data")
}
return g.Websocket.Orderbook.Update(&updates)
return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
UpdatePushedAt: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processFuturesOrderbookSnapshot(event string, incoming []byte, assetType asset.Item, updatePushedAt time.Time) error {

View File

@@ -51,6 +51,8 @@ const (
optionsPositionCloseChannel = "options.position_closes"
optionsBalancesChannel = "options.balances"
optionsPositionsChannel = "options.positions"
optionOrderbookUpdateLimit uint64 = 50
)
var defaultOptionsSubscriptions = []string{
@@ -60,12 +62,9 @@ var defaultOptionsSubscriptions = []string{
optionsUnderlyingTradesChannel,
optionsContractCandlesticksChannel,
optionsUnderlyingCandlesticksChannel,
optionsOrderbookChannel,
optionsOrderbookUpdateChannel,
}
var fetchedOptionsCurrencyPairSnapshotOrderbook = make(map[string]bool)
// WsOptionsConnect initiates a websocket connection to options websocket endpoints.
func (g *Gateio) WsOptionsConnect(ctx context.Context, conn websocket.Connection) error {
err := g.CurrencyPairs.IsAssetEnabled(asset.Options)
@@ -94,6 +93,7 @@ func (g *Gateio) WsOptionsConnect(ctx context.Context, conn websocket.Connection
}
// GenerateOptionsDefaultSubscriptions generates list of channel subscriptions for options asset type.
// TODO: Update to use the new subscription template system
func (g *Gateio) GenerateOptionsDefaultSubscriptions() (subscription.List, error) {
channelsToSubscribe := defaultOptionsSubscriptions
var userID int64
@@ -140,8 +140,8 @@ getEnabledPairs:
case optionsContractCandlesticksChannel, optionsUnderlyingCandlesticksChannel:
params["interval"] = kline.FiveMin
case optionsOrderbookUpdateChannel:
params["interval"] = kline.ThousandMilliseconds
params["level"] = "20"
params["interval"] = kline.HundredMilliseconds
params["level"] = strconv.FormatUint(optionOrderbookUpdateLimit, 10)
case optionsOrdersChannel,
optionsUserTradesChannel,
optionsLiquidatesChannel,
@@ -247,7 +247,7 @@ func (g *Gateio) generateOptionsPayload(ctx context.Context, conn websocket.Conn
if !ok {
return nil, fmt.Errorf("%w, missing options orderbook interval", orderbook.ErrOrderbookInvalid)
}
intervalString, err = g.GetIntervalString(interval)
intervalString, err = getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -262,7 +262,7 @@ func (g *Gateio) generateOptionsPayload(ctx context.Context, conn websocket.Conn
if !ok {
return nil, errors.New("missing options underlying candlesticks interval")
}
intervalString, err = g.GetIntervalString(interval)
intervalString, err = getIntervalString(interval)
if err != nil {
return nil, err
}
@@ -327,7 +327,7 @@ func (g *Gateio) WsHandleOptionsData(ctx context.Context, respRaw []byte) error
case optionsOrderbookTickerChannel:
return g.processOrderbookTickerPushData(respRaw)
case optionsOrderbookUpdateChannel:
return g.processFuturesAndOptionsOrderbookUpdate(push.Result, asset.Options)
return g.processOptionsOrderbookUpdate(ctx, push.Result, asset.Options, push.Time)
case optionsOrdersChannel:
return g.processOptionsOrderPushData(respRaw)
case optionsUserTradesChannel:
@@ -498,6 +498,33 @@ func (g *Gateio) processOrderbookTickerPushData(incoming []byte) error {
return nil
}
func (g *Gateio) processOptionsOrderbookUpdate(ctx context.Context, incoming []byte, a asset.Item, pushTime time.Time) error {
var data WsFuturesAndOptionsOrderbookUpdate
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
asks := make([]orderbook.Tranche, len(data.Asks))
for x := range data.Asks {
asks[x].Price = data.Asks[x].Price.Float64()
asks[x].Amount = data.Asks[x].Size
}
bids := make([]orderbook.Tranche, len(data.Bids))
for x := range data.Bids {
bids[x].Price = data.Bids[x].Price.Float64()
bids[x].Amount = data.Bids[x].Size
}
return g.wsOBUpdateMgr.ProcessUpdate(ctx, g, data.FirstUpdatedID, &orderbook.Update{
UpdateID: data.LastUpdatedID,
UpdateTime: data.Timestamp.Time(),
UpdatePushedAt: pushTime,
Pair: data.ContractName,
Asset: a,
Asks: asks,
Bids: bids,
AllowEmpty: true,
})
}
func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, incoming []byte, updatePushedAt time.Time) error {
if event == "all" {
var data WsOptionsOrderbookSnapshot

View File

@@ -185,7 +185,7 @@ func TestWebsocketSpotGetOrderStatus(t *testing.T) {
sharedtestvalues.SkipTestIfCredentialsUnset(t, g, canManipulateRealOrders)
testexch.UpdatePairsOnce(t, g)
g := newExchangeWithWebsocket(t, asset.Spot)
g := newExchangeWithWebsocket(t, asset.Spot) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
got, err := g.WebsocketSpotGetOrderStatus(t.Context(), "644999650452", BTCUSDT, "")
require.NoError(t, err)
@@ -199,7 +199,7 @@ func newExchangeWithWebsocket(t *testing.T, a asset.Item) *Gateio {
if apiKey == "" || apiSecret == "" {
t.Skip()
}
g := new(Gateio)
g := new(Gateio) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
testexch.UpdatePairsOnce(t, g)
g.API.AuthenticatedSupport = true

View File

@@ -177,6 +177,7 @@ func (g *Gateio) SetDefaults() {
g.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit
g.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout
g.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit
g.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
}
// Setup sets user configuration
@@ -649,6 +650,11 @@ func (g *Gateio) UpdateTickers(ctx context.Context, a asset.Item) error {
// UpdateOrderbook updates and returns the orderbook for a currency pair
func (g *Gateio) UpdateOrderbook(ctx context.Context, p currency.Pair, a asset.Item) (*orderbook.Base, error) {
return g.UpdateOrderbookWithLimit(ctx, p, a, 0)
}
// UpdateOrderbookWithLimit updates and returns the orderbook for a currency pair with a set orderbook size limit
func (g *Gateio) UpdateOrderbookWithLimit(ctx context.Context, p currency.Pair, a asset.Item, limit uint64) (*orderbook.Base, error) {
p, err := g.FormatExchangeCurrency(p, a)
if err != nil {
return nil, err
@@ -664,18 +670,18 @@ func (g *Gateio) UpdateOrderbook(ctx context.Context, p currency.Pair, a asset.I
if a != asset.Spot && !available {
return nil, fmt.Errorf("%v instrument %v does not have orderbook data", a, p)
}
o, err = g.GetOrderbook(ctx, p.String(), "", 0, true)
o, err = g.GetOrderbook(ctx, p.String(), "", limit, true)
case asset.CoinMarginedFutures, asset.USDTMarginedFutures:
var settle currency.Code
settle, err = getSettlementCurrency(p, a)
if err != nil {
return nil, err
}
o, err = g.GetFuturesOrderbook(ctx, settle, p.String(), "", 0, true)
o, err = g.GetFuturesOrderbook(ctx, settle, p.String(), "", limit, true)
case asset.DeliveryFutures:
o, err = g.GetDeliveryOrderbook(ctx, currency.USDT, "", p, 0, true)
o, err = g.GetDeliveryOrderbook(ctx, currency.USDT, "", p, limit, true)
case asset.Options:
o, err = g.GetOptionsOrderbook(ctx, p, "", 0, true)
o, err = g.GetOptionsOrderbook(ctx, p, "", limit, true)
default:
return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, a)
}

View File

@@ -0,0 +1,207 @@
package gateio
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
)
const defaultWSSnapshotSyncDelay = 2 * time.Second
var errOrderbookSnapshotOutdated = errors.New("orderbook snapshot is outdated")
type wsOBUpdateManager struct {
lookup map[key.PairAsset]*updateCache
snapshotSyncDelay time.Duration
mtx sync.RWMutex
}
type updateCache struct {
updates []pendingUpdate
updating bool
mtx sync.Mutex
}
type pendingUpdate struct {
update *orderbook.Update
firstUpdateID int64
}
func newWsOBUpdateManager(snapshotSyncDelay time.Duration) *wsOBUpdateManager {
return &wsOBUpdateManager{lookup: make(map[key.PairAsset]*updateCache), snapshotSyncDelay: snapshotSyncDelay}
}
// ProcessUpdate processes an orderbook update by syncing snapshot, caching updates and applying them
func (m *wsOBUpdateManager) ProcessUpdate(ctx context.Context, g *Gateio, firstUpdateID int64, update *orderbook.Update) error {
cache := m.LoadCache(update.Pair, update.Asset)
cache.mtx.Lock()
defer cache.mtx.Unlock()
if cache.updating {
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
return nil
}
lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(update.Pair, update.Asset)
if err != nil && !errors.Is(err, buffer.ErrDepthNotFound) {
return err
}
if lastUpdateID+1 >= firstUpdateID {
return applyOrderbookUpdate(g, update)
}
// Orderbook is behind notifications, flush store to prevent trading on stale data
if err := g.Websocket.Orderbook.FlushOrderbook(update.Pair, update.Asset); err != nil && !errors.Is(err, buffer.ErrDepthNotFound) {
return err
}
cache.updating = true
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
go func() {
select {
case <-ctx.Done():
return
case <-time.After(m.snapshotSyncDelay):
if err := cache.SyncOrderbook(ctx, g, update.Pair, update.Asset); err != nil {
g.Websocket.DataHandler <- fmt.Errorf("failed to sync orderbook for %v %v: %w", update.Pair, update.Asset, err)
}
}
}()
return nil
}
// LoadCache loads the cache for the given pair and asset. If the cache does not exist, it creates a new one.
func (m *wsOBUpdateManager) LoadCache(p currency.Pair, a asset.Item) *updateCache {
m.mtx.RLock()
cache, ok := m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
m.mtx.RUnlock()
if !ok {
m.mtx.Lock()
cache = &updateCache{}
m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] = cache
m.mtx.Unlock()
}
return cache
}
// SyncOrderbook fetches and synchronises an orderbook snapshot to the limit size so that pending updates can be
// applied to the orderbook.
func (c *updateCache) SyncOrderbook(ctx context.Context, g *Gateio, pair currency.Pair, a asset.Item) error {
// TODO: When subscription config is added for all assets update limits to use sub.Levels
var limit uint64
switch a {
case asset.Spot:
sub := g.Websocket.GetSubscription(spotOrderbookUpdateKey)
if sub == nil {
return fmt.Errorf("no subscription found for %q", spotOrderbookUpdateKey)
}
// There is no way to set levels when we subscribe for this specific subscription case.
// Extract limit from interval e.g. 20ms == 20 limit book and 100ms == 100 limit book.
limit = uint64(sub.Interval.Duration().Milliseconds()) //nolint:gosec // No overflow risk
case asset.USDTMarginedFutures, asset.USDCMarginedFutures:
limit = futuresOrderbookUpdateLimit
case asset.DeliveryFutures:
limit = deliveryFuturesUpdateLimit
case asset.Options:
limit = optionOrderbookUpdateLimit
}
book, err := g.UpdateOrderbookWithLimit(ctx, pair, a, limit)
c.mtx.Lock() // lock here to prevent ws handle data interference with REST request above
defer func() {
c.updates = nil
c.updating = false
c.mtx.Unlock()
}()
if err != nil {
return err
}
if a != asset.Spot {
if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
}
} else {
// Spot, Margin, and Cross Margin books are all classified as spot
for i := range standardMarginAssetTypes {
if enabled, _ := g.IsPairEnabled(pair, standardMarginAssetTypes[i]); !enabled {
continue
}
book.Asset = standardMarginAssetTypes[i]
if err := g.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
}
}
}
return c.applyPendingUpdates(g, a)
}
// ApplyPendingUpdates applies all pending updates to the orderbook
func (c *updateCache) applyPendingUpdates(g *Gateio, a asset.Item) error {
for _, data := range c.updates {
lastUpdateID, err := g.Websocket.Orderbook.LastUpdateID(data.update.Pair, a)
if err != nil {
return err
}
nextID := lastUpdateID + 1
if data.firstUpdateID > nextID {
return errOrderbookSnapshotOutdated
}
if data.update.UpdateID < nextID {
continue // skip updates that are behind the current orderbook
}
if err := applyOrderbookUpdate(g, data.update); err != nil {
return err
}
}
return nil
}
// applyOrderbookUpdate applies an orderbook update to the orderbook
func applyOrderbookUpdate(g *Gateio, update *orderbook.Update) error {
if update.Asset != asset.Spot {
return g.Websocket.Orderbook.Update(update)
}
for i := range standardMarginAssetTypes {
if enabled, _ := g.IsPairEnabled(update.Pair, standardMarginAssetTypes[i]); !enabled {
continue
}
update.Asset = standardMarginAssetTypes[i]
if err := g.Websocket.Orderbook.Update(update); err != nil {
return err
}
}
return nil
}
var spotOrderbookUpdateKey = channelKey{&subscription.Subscription{Channel: subscription.OrderbookChannel}}
var _ subscription.MatchableKey = channelKey{}
type channelKey struct {
*subscription.Subscription
}
func (k channelKey) Match(eachKey subscription.MatchableKey) bool {
return k.Subscription.Channel == eachKey.GetSubscription().Channel
}
func (k channelKey) GetSubscription() *subscription.Subscription {
return k.Subscription
}

View File

@@ -0,0 +1,205 @@
package gateio
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
)
func TestProcessUpdate(t *testing.T) {
t.Parallel()
m := newWsOBUpdateManager(0)
err := m.ProcessUpdate(t.Context(), g, 1337, &orderbook.Update{})
assert.ErrorIs(t, err, currency.ErrCurrencyPairEmpty)
pair := currency.NewPair(currency.BABY, currency.BABYDOGE)
err = g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
UpdatePushedAt: time.Now(),
LastUpdateID: 1336,
})
require.NoError(t, err)
err = m.ProcessUpdate(t.Context(), g, 1337, &orderbook.Update{
UpdateID: 1338,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err)
// Test orderbook snapshot is behind update
err = m.ProcessUpdate(t.Context(), g, 1340, &orderbook.Update{
UpdateID: 1341,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err)
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
cache.mtx.Lock()
assert.Len(t, cache.updates, 1)
assert.True(t, cache.updating)
cache.mtx.Unlock()
// Test orderbook snapshot is behind update
err = m.ProcessUpdate(t.Context(), g, 1342, &orderbook.Update{
UpdateID: 1343,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
})
require.NoError(t, err)
cache.mtx.Lock()
assert.Len(t, cache.updates, 2)
assert.True(t, cache.updating)
cache.mtx.Unlock()
time.Sleep(time.Millisecond * 2) // Allow sync delay to pass
cache.mtx.Lock()
assert.Empty(t, cache.updates)
assert.False(t, cache.updating)
cache.mtx.Unlock()
}
func TestLoadCache(t *testing.T) {
t.Parallel()
m := newWsOBUpdateManager(0)
pair := currency.NewPair(currency.BABY, currency.BABYDOGE)
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
assert.NotNil(t, cache)
assert.Len(t, m.lookup, 1)
// Test cache is reused
cache2 := m.LoadCache(pair, asset.USDTMarginedFutures)
assert.Equal(t, cache, cache2)
}
func TestSyncOrderbook(t *testing.T) {
t.Parallel()
g := new(Gateio)
require.NoError(t, testexch.Setup(g), "Setup must not error")
require.NoError(t, g.UpdateTradablePairs(t.Context(), false))
// Add dummy subscription so that it can be matched and a limit/level can be extracted for initial orderbook sync spot.
err := g.Websocket.AddSubscriptions(nil, &subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
require.NoError(t, err)
m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
for _, a := range []asset.Item{asset.Spot, asset.USDTMarginedFutures} {
pair := currency.NewPair(currency.ETH, currency.USDT)
err := g.CurrencyPairs.EnablePair(a, pair)
require.NoError(t, err)
cache := m.LoadCache(pair, a)
cache.updates = []pendingUpdate{{update: &orderbook.Update{Pair: pair, Asset: a}}}
cache.updating = true
err = cache.SyncOrderbook(t.Context(), g, pair, a)
require.NoError(t, err)
require.False(t, cache.updating)
require.Empty(t, cache.updates)
expectedLimit := 20
if a == asset.Spot {
expectedLimit = 100
}
b, err := g.Websocket.Orderbook.GetOrderbook(pair, a)
require.NoError(t, err)
require.Len(t, b.Bids, expectedLimit)
require.Len(t, b.Asks, expectedLimit)
}
}
func TestApplyPendingUpdates(t *testing.T) {
t.Parallel()
g := new(Gateio)
require.NoError(t, testexch.Setup(g), "Setup must not error")
require.NoError(t, g.UpdateTradablePairs(t.Context(), false))
m := newWsOBUpdateManager(defaultWSSnapshotSyncDelay)
pair := currency.NewPair(currency.LTC, currency.USDT)
err := g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: pair,
Asset: asset.USDTMarginedFutures,
Bids: []orderbook.Tranche{{Price: 1, Amount: 1}},
Asks: []orderbook.Tranche{{Price: 1, Amount: 1}},
LastUpdated: time.Now(),
UpdatePushedAt: time.Now(),
LastUpdateID: 1335,
})
require.NoError(t, err)
cache := m.LoadCache(pair, asset.USDTMarginedFutures)
update := &orderbook.Update{
UpdateID: 1339,
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
}
cache.updates = []pendingUpdate{{update: update, firstUpdateID: 1337}}
err = cache.applyPendingUpdates(g, asset.USDTMarginedFutures)
require.ErrorIs(t, err, errOrderbookSnapshotOutdated)
cache.updates[0].firstUpdateID = 1336
err = cache.applyPendingUpdates(g, asset.USDTMarginedFutures)
require.NoError(t, err)
}
func TestApplyOrderbookUpdate(t *testing.T) {
t.Parallel()
g := new(Gateio)
require.NoError(t, testexch.Setup(g), "Setup must not error")
require.NoError(t, g.UpdateTradablePairs(t.Context(), false))
pair := currency.NewBTCUSDT()
update := &orderbook.Update{
Pair: pair,
Asset: asset.USDTMarginedFutures,
AllowEmpty: true,
UpdateTime: time.Now(),
}
err := applyOrderbookUpdate(g, update)
require.ErrorIs(t, err, buffer.ErrDepthNotFound)
update.Asset = asset.Spot
err = applyOrderbookUpdate(g, update)
require.ErrorIs(t, err, buffer.ErrDepthNotFound)
update.Pair = currency.NewPair(currency.BABY, currency.BABYDOGE)
err = applyOrderbookUpdate(g, update)
require.NoError(t, err)
}

View File

@@ -288,8 +288,14 @@ func durationToWord(in Interval) string {
switch in {
case Raw:
return "raw"
case TenMilliseconds:
return "tenmillisec"
case TwentyMilliseconds:
return "twentymillisec"
case HundredMilliseconds:
return "hundredmillisec"
case TwoHundredAndFiftyMilliseconds:
return "twohundredfiftymillisec"
case ThousandMilliseconds:
return "thousandmillisec"
case TenSecond:

View File

@@ -6,7 +6,6 @@ import (
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"
@@ -155,129 +154,40 @@ func TestDurationToWord(t *testing.T) {
name string
interval Interval
}{
{
"raw",
Raw,
},
{
"hundredmillisec",
HundredMilliseconds,
},
{
"thousandmillisec",
ThousandMilliseconds,
},
{
"tensec",
TenSecond,
},
{
"FifteenSecond",
FifteenSecond,
},
{
"OneMin",
OneMin,
},
{
"ThreeMin",
ThreeMin,
},
{
"FiveMin",
FiveMin,
},
{
"TenMin",
TenMin,
},
{
"FifteenMin",
FifteenMin,
},
{
"ThirtyMin",
ThirtyMin,
},
{
"OneHour",
OneHour,
},
{
"TwoHour",
TwoHour,
},
{
"FourHour",
FourHour,
},
{
"SixHour",
SixHour,
},
{
"EightHour",
OneHour * 8,
},
{
"TwelveHour",
TwelveHour,
},
{
"OneDay",
OneDay,
},
{
"ThreeDay",
ThreeDay,
},
{
"FiveDay",
FiveDay,
},
{
"FifteenDay",
FifteenDay,
},
{
"OneWeek",
OneWeek,
},
{
"TwoWeek",
TwoWeek,
},
{
"OneMonth",
OneMonth,
},
{
"ThreeMonth",
ThreeMonth,
},
{
"SixMonth",
SixMonth,
},
{
"OneYear",
OneYear,
},
{
"notfound",
Interval(time.Hour * 1337),
},
{"raw", Raw},
{"tenmillisec", TenMilliseconds},
{"twentymillisec", TwentyMilliseconds},
{"hundredmillisec", HundredMilliseconds},
{"twohundredfiftymillisec", TwoHundredAndFiftyMilliseconds},
{"thousandmillisec", ThousandMilliseconds},
{"tensec", TenSecond},
{"fifteensecond", FifteenSecond},
{"onemin", OneMin},
{"threemin", ThreeMin},
{"fivemin", FiveMin},
{"tenmin", TenMin},
{"fifteenmin", FifteenMin},
{"thirtymin", ThirtyMin},
{"onehour", OneHour},
{"twohour", TwoHour},
{"fourhour", FourHour},
{"sixhour", SixHour},
{"eighthour", OneHour * 8},
{"twelvehour", TwelveHour},
{"oneday", OneDay},
{"threeday", ThreeDay},
{"fiveday", FiveDay},
{"fifteenday", FifteenDay},
{"oneweek", OneWeek},
{"twoweek", TwoWeek},
{"onemonth", OneMonth},
{"threemonth", ThreeMonth},
{"sixmonth", SixMonth},
{"oneyear", OneYear},
{"notfound", Interval(time.Hour * 1337)},
}
for x := range testCases {
test := testCases[x]
t.Run(test.name, func(t *testing.T) {
t.Parallel()
t.Helper()
v := durationToWord(test.interval)
if !strings.EqualFold(v, test.name) {
t.Fatalf("%v: received %v expected %v", test.name, v, test.name)
}
})
for _, tc := range testCases {
require.Equal(t, tc.name, durationToWord(tc.interval))
}
}

View File

@@ -11,40 +11,43 @@ import (
// Consts here define basic time intervals
const (
Raw = Interval(-1)
HundredMilliseconds = Interval(100 * time.Millisecond)
ThousandMilliseconds = 10 * HundredMilliseconds
TenSecond = Interval(10 * time.Second)
FifteenSecond = Interval(15 * time.Second)
ThirtySecond = 2 * FifteenSecond
OneMin = Interval(time.Minute)
ThreeMin = 3 * OneMin
FiveMin = 5 * OneMin
TenMin = 10 * OneMin
FifteenMin = 15 * OneMin
ThirtyMin = 30 * OneMin
OneHour = Interval(time.Hour)
TwoHour = 2 * OneHour
ThreeHour = 3 * OneHour
FourHour = 4 * OneHour
SixHour = 6 * OneHour
SevenHour = 7 * OneHour
EightHour = 8 * OneHour
TwelveHour = 12 * OneHour
OneDay = 24 * OneHour
TwoDay = 2 * OneDay
ThreeDay = 3 * OneDay
SevenDay = 7 * OneDay
FifteenDay = 15 * OneDay
OneWeek = 7 * OneDay
TwoWeek = 2 * OneWeek
ThreeWeek = 3 * OneWeek
OneMonth = 30 * OneDay
ThreeMonth = 90 * OneDay
SixMonth = 2 * ThreeMonth
NineMonth = 3 * ThreeMonth
OneYear = 365 * OneDay
FiveDay = 5 * OneDay
Raw = Interval(-1)
TenMilliseconds = Interval(10 * time.Millisecond)
TwentyMilliseconds = 2 * TenMilliseconds
HundredMilliseconds = Interval(100 * time.Millisecond)
TwoHundredAndFiftyMilliseconds = Interval(250 * time.Millisecond)
ThousandMilliseconds = 10 * HundredMilliseconds
TenSecond = Interval(10 * time.Second)
FifteenSecond = Interval(15 * time.Second)
ThirtySecond = 2 * FifteenSecond
OneMin = Interval(time.Minute)
ThreeMin = 3 * OneMin
FiveMin = 5 * OneMin
TenMin = 10 * OneMin
FifteenMin = 15 * OneMin
ThirtyMin = 30 * OneMin
OneHour = Interval(time.Hour)
TwoHour = 2 * OneHour
ThreeHour = 3 * OneHour
FourHour = 4 * OneHour
SixHour = 6 * OneHour
SevenHour = 7 * OneHour
EightHour = 8 * OneHour
TwelveHour = 12 * OneHour
OneDay = 24 * OneHour
TwoDay = 2 * OneDay
ThreeDay = 3 * OneDay
SevenDay = 7 * OneDay
FifteenDay = 15 * OneDay
OneWeek = 7 * OneDay
TwoWeek = 2 * OneWeek
ThreeWeek = 3 * OneWeek
OneMonth = 30 * OneDay
ThreeMonth = 90 * OneDay
SixMonth = 2 * ThreeMonth
NineMonth = 3 * ThreeMonth
OneYear = 365 * OneDay
FiveDay = 5 * OneDay
)
var (

View File

@@ -166,7 +166,7 @@ const (
// Update and things and stuff
type Update struct {
UpdateID int64 // Used when no time is provided
UpdateID int64
UpdateTime time.Time
UpdatePushedAt time.Time
Asset asset.Item
@@ -176,6 +176,8 @@ type Update struct {
Pair currency.Pair
// Checksum defines the expected value when the books have been verified
Checksum uint32
// AllowEmpty, when true, permits loading an empty order book update to set an UpdateID without including actual data.
AllowEmpty bool
}
// Movement defines orderbook traversal details from either hitting the bids or

View File

@@ -1,6 +1,8 @@
package subscription
import (
"errors"
"fmt"
"maps"
"strings"
"testing"
@@ -13,6 +15,33 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
var errValidateSubscriptionsTestError = errors.New("validate subscriptions test error")
type mockExWithSubValidator struct {
GenerateBadSubscription bool
FailGetSubscriptions bool
*mockEx
}
func (m *mockExWithSubValidator) ValidateSubscriptions(in List) error {
for _, sub := range in {
if sub.Channel == "fail-channel" {
return fmt.Errorf("%w: '%s'", errValidateSubscriptionsTestError, sub.String())
}
}
return nil
}
func (m *mockExWithSubValidator) GetSubscriptions() (List, error) {
if m.GenerateBadSubscription {
return List{{Channel: "fail-channel"}}, nil
}
if m.FailGetSubscriptions {
return nil, ErrNotFound
}
return nil, nil
}
type mockEx struct {
pairs assetPairs
assets asset.Items
@@ -76,6 +105,7 @@ func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, e
func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return m.assets }
func (m *mockEx) CanUseAuthenticatedWebsocketEndpoints() bool { return m.auth }
func (m *mockEx) GetSubscriptions() (List, error) { return nil, nil }
// equalLists is a utility function to compare subscription lists and show a pretty failure message
// It overcomes the verbose depth of assert.ElementsMatch spewConfig

View File

@@ -22,6 +22,7 @@ type IExchange interface {
GetSubscriptionTemplate(*Subscription) (*template.Template, error)
CanUseAuthenticatedWebsocketEndpoints() bool
IsAssetWebsocketSupported(a asset.Item) bool
GetSubscriptions() (List, error)
}
// Strings returns a sorted slice of subscriptions

View File

@@ -38,19 +38,27 @@ const (
// Public errors
var (
ErrNotFound = errors.New("subscription not found")
ErrNotSinglePair = errors.New("only single pair subscriptions expected")
ErrBatchingNotSupported = errors.New("subscription batching not supported")
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrUseConstChannelName = errors.New("must use standard channel name constants")
ErrNotSupported = errors.New("subscription channel not supported")
ErrNotFound = errors.New("subscription not found")
ErrNotSinglePair = errors.New("only single pair subscriptions expected")
ErrBatchingNotSupported = errors.New("subscription batching not supported")
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrUseConstChannelName = errors.New("must use standard channel name constants")
ErrNotSupported = errors.New("subscription channel not supported")
ErrExclusiveSubscription = errors.New("exclusive subscription detected")
ErrInvalidInterval = errors.New("invalid interval")
ErrInvalidLevel = errors.New("invalid level")
)
// State tracks the status of a subscription channel
type State uint8
// ListValidator validates a list of subscriptions, this is optionally handled through expand templates method
type ListValidator interface {
ValidateSubscriptions(List) error
}
// Subscription container for streaming subscriptions
type Subscription struct {
Enabled bool `json:"enabled"`

View File

@@ -40,6 +40,7 @@ type tplCtx struct {
// Calls e.GetSubscriptionTemplate to find a template for each subscription
// Filters out Authenticated subscriptions if !e.CanUseAuthenticatedEndpoints
// See README.md for more details
// The exchange can optionally implement ListValidator to have custom validation on subscriptions
func (l List) ExpandTemplates(e IExchange) (List, error) {
if !slices.ContainsFunc(l, func(s *Subscription) bool { return s.QualifiedChannel == "" }) {
// Empty list, or already processed
@@ -72,8 +73,21 @@ func (l List) ExpandTemplates(e IExchange) (List, error) {
expanded, err2 := expandTemplate(e, s, maps.Clone(ap), assets)
if err2 != nil {
err = common.AppendError(err, fmt.Errorf("%s: %w", s, err2))
} else {
subs = append(subs, expanded...)
continue
}
subs = append(subs, expanded...)
}
// Validate the subscriptions after expansion to capture fields that will be used in the template
if v, ok := e.(ListValidator); ok {
// Need to check against the already stored subscriptions, as we add additional subscriptions
storedSubs, err := e.GetSubscriptions()
if err != nil {
return nil, err
}
if err := v.ValidateSubscriptions(slices.Concat(subs, storedSubs)); err != nil {
return nil, fmt.Errorf("error validating subscriptions: %w", err)
}
}

View File

@@ -31,6 +31,14 @@ func TestExpandTemplates(t *testing.T) {
{Channel: "batching", Asset: asset.Spot},
{Channel: "single-channel", Authenticated: true},
}
_, err := l.ExpandTemplates(&mockExWithSubValidator{mockEx: e, GenerateBadSubscription: true})
require.ErrorIs(t, err, errValidateSubscriptionsTestError)
_, err = l.ExpandTemplates(&mockExWithSubValidator{mockEx: e})
require.NoError(t, err)
_, err = l.ExpandTemplates(&mockExWithSubValidator{mockEx: e, FailGetSubscriptions: true})
require.ErrorIs(t, err, ErrNotFound)
got, err := l.ExpandTemplates(e)
require.NoError(t, err, "ExpandTemplates must not error")
exp := List{