Subscriptions: Fix no enabled pairs for one asset (#1792)

If one asset is enabled but has no enabled pairs, we should ignore that
asset, even for non-pair related subscriptions.
That matches the existing code, but wasn't happening in the context of
asset.All subscriptions with just one asset in this state.
If a user wanted to have non-pair subscriptions, they should use
asset.Empty. Would expect other breakages with no pairs enabled, too.

Note: No enabled pairs for an enabled asset is a transient issue which
can occur due to enableOnePair racing against no available pairs. The
second run, available pairs would be populated and enableOnePair would
work. This situation could persist due to running with --dry or
containerised, though.

Fixes:
`
Okx websocket: subscription failure, myOrders all : subscription template did not generate the expected number of pair records for spread: Got 1;
Expected 0
`

Relates to #1420
This commit is contained in:
Gareth Kirwan
2025-03-06 01:08:08 +00:00
committed by GitHub
parent 0796e44063
commit d069ff2bf4
6 changed files with 84 additions and 44 deletions

View File

@@ -14,7 +14,7 @@ import (
)
type mockEx struct {
pairs currency.Pairs
pairs assetPairs
assets asset.Items
tpl string
auth bool
@@ -23,16 +23,13 @@ type mockEx struct {
}
func newMockEx() *mockEx {
pairs := currency.Pairs{btcusdtPair, ethusdcPair}
for _, b := range []currency.Code{currency.LTC, currency.XRP, currency.TRX} {
for _, q := range []currency.Code{currency.USDT, currency.USDC} {
pairs = append(pairs, currency.NewPair(b, q))
}
}
return &mockEx{
assets: asset.Items{asset.Spot, asset.Futures, asset.Index},
pairs: pairs,
pairs: assetPairs{
asset.Spot: currency.Pairs{ethusdcPair, btcusdtPair, currency.NewPair(currency.LTC, currency.USDT)},
asset.Futures: currency.Pairs{ethusdcPair, btcusdtPair},
asset.Index: currency.Pairs{btcusdtPair},
},
}
}
@@ -40,12 +37,12 @@ func (m *mockEx) IsAssetWebsocketSupported(a asset.Item) bool {
return a != asset.Index
}
func (m *mockEx) GetEnabledPairs(_ asset.Item) (currency.Pairs, error) {
return m.pairs, m.errPairs
func (m *mockEx) GetEnabledPairs(a asset.Item) (currency.Pairs, error) {
return m.pairs[a], m.errPairs
}
func (m *mockEx) GetPairFormat(_ asset.Item, _ bool) (currency.PairFormat, error) {
return currency.PairFormat{Uppercase: true}, m.errFormat
return currency.PairFormat{Uppercase: true, Delimiter: "-"}, m.errFormat
}
func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, error) {
@@ -65,7 +62,14 @@ func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, e
ap[asset.Spot] = ap[asset.Spot][0:1]
return ""
},
"batch": common.Batch[currency.Pairs],
"batch": func(pairs currency.Pairs, size int) []string {
s := []string{}
for _, p := range common.Batch(pairs, size) {
p = p.Format(currency.PairFormat{Uppercase: true})
s = append(s, p.Join())
}
return s
},
}).
ParseFiles("testdata/" + m.tpl)
}

View File

@@ -94,19 +94,6 @@ func (l List) SetStates(state State) error {
return err
}
func fillAssetPairs(ap assetPairs, a asset.Item, e IExchange) error {
p, err := e.GetEnabledPairs(a)
if err != nil {
return err
}
f, err := e.GetPairFormat(a, true)
if err != nil {
return err
}
ap[a] = common.SortStrings(p.Format(f))
return nil
}
// assetPairs returns a map of enabled pairs for the subscriptions in the list, formatted for the asset
func (l List) assetPairs(e IExchange) (assetPairs, error) {
at := []asset.Item{}
@@ -122,13 +109,13 @@ func (l List) assetPairs(e IExchange) (assetPairs, error) {
// Nothing to do
case asset.All:
for _, a := range at {
if err := fillAssetPairs(ap, a, e); err != nil {
if err := ap.populate(e, a); err != nil {
return nil, err
}
}
default:
if slices.Contains(at, s.Asset) {
if err := fillAssetPairs(ap, s.Asset, e); err != nil {
if err := ap.populate(e, s.Asset); err != nil {
return nil, err
}
}
@@ -169,3 +156,17 @@ func (l List) Public() List {
}
return n
}
// populate adds all enabled pairs for an asset to the assetPair map
func (ap assetPairs) populate(e IExchange, a asset.Item) error {
p, err := e.GetEnabledPairs(a)
if err != nil || len(p) == 0 {
return err
}
f, err := e.GetPairFormat(a, true)
if err != nil {
return err
}
ap[a] = common.SortStrings(p.Format(f))
return nil
}

View File

@@ -114,6 +114,31 @@ func TestAssetPairs(t *testing.T) {
}
}
// TestAssetPairsPopulate exercises assetPairs Populate
func TestAssetPairsPopulate(t *testing.T) {
e := newMockEx()
ap := assetPairs{}
err := ap.populate(e, asset.Spot)
require.NoError(t, err)
require.NotEmpty(t, ap)
assert.Equal(t, 3, len(ap[asset.Spot]), "populate should return correct number of pairs for spot")
assert.Equal(t, "BTC-USDT", ap[asset.Spot][0].String(), "populate should respect format and sort the pairs")
err = ap.populate(e, asset.Futures)
require.NoError(t, err)
assert.Equal(t, 2, len(ap[asset.Futures]), "populate should return correct number of pairs for futures")
exp := errors.New("expected error")
e.errFormat = exp
err = ap.populate(e, asset.Spot)
assert.ErrorIs(t, err, exp, "populate should error correctly on format error")
e.pairs = assetPairs{}
ap = assetPairs{}
err = ap.populate(e, asset.Spot)
require.NoError(t, err, "populate must not error with no pairs enabled")
assert.Empty(t, ap, "populate should return an empty map with no pairs enabled")
}
func TestListClone(t *testing.T) {
t.Parallel()
l := List{{Channel: TickerChannel}, {Channel: OrderbookChannel}}

View File

@@ -27,7 +27,7 @@ func TestExpandTemplates(t *testing.T) {
{Channel: "expand-pairs", Asset: asset.Spot, Levels: 2},
{Channel: "single-channel", QualifiedChannel: "just one sub already processed"},
{Channel: "update-asset-pairs", Asset: asset.All},
{Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[0:2], Levels: 3},
{Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][0:2], Levels: 3},
{Channel: "batching", Asset: asset.Spot},
{Channel: "single-channel", Authenticated: true},
}
@@ -35,21 +35,32 @@ func TestExpandTemplates(t *testing.T) {
require.NoError(t, err, "ExpandTemplates must not error")
exp := List{
{Channel: "single-channel", QualifiedChannel: "single-channel"},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs, Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs[asset.Spot], Interval: kline.FifteenMin},
{Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs[asset.Futures], Interval: kline.FifteenMin},
{Channel: "single-channel", QualifiedChannel: "just one sub already processed"},
{Channel: "update-asset-pairs", QualifiedChannel: "spot-btcusdt-update-asset-pairs", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[:1], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[1:2], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][1:2], Levels: 3},
{Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[asset.Spot][:1], Levels: 3},
}
for _, p := range e.pairs {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 1},
{Channel: "expand-pairs", QualifiedChannel: "future-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Futures, Pairs: currency.Pairs{p}, Levels: 1},
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@2", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 2},
}...)
for a, pairs := range e.pairs {
if a == asset.Index { // Not IsAssetWebsocketEnabled
continue
}
for _, p := range common.SortStrings(pairs) {
pStr := p.Swap().String()
if a == asset.Spot {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + pStr + "-expand-pairs@1", Asset: a, Pairs: currency.Pairs{p}, Levels: 1},
&Subscription{Channel: "expand-pairs", QualifiedChannel: "spot-" + pStr + "-expand-pairs@2", Asset: a, Pairs: currency.Pairs{p}, Levels: 2},
}...)
} else {
exp = append(exp,
&Subscription{Channel: "expand-pairs", QualifiedChannel: "future-" + pStr + "-expand-pairs@1", Asset: a, Pairs: currency.Pairs{p}, Levels: 1},
)
}
}
}
for _, b := range common.Batch(common.SortStrings(e.pairs), 3) {
for _, b := range common.Batch(common.SortStrings(e.pairs[asset.Spot]), 3) {
exp = append(exp, &Subscription{Channel: "batching", QualifiedChannel: "spot-" + b.Join() + "-batching", Asset: asset.Spot, Pairs: b})
}
@@ -73,9 +84,9 @@ func TestExpandTemplates(t *testing.T) {
got, err = l.ExpandTemplates(e)
require.NoError(t, err, "ExpandTemplates must not error")
exp = List{
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.OneHour},
{Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs[asset.Spot], Interval: kline.OneHour},
}
for _, p := range e.pairs {
for _, p := range e.pairs[asset.Spot] {
exp = append(exp, List{
{Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@4", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 4},
}...)

View File

@@ -10,7 +10,6 @@
{{/* Incorrect number of pair entries */}}
{{- .PairSeparator -}}
{{- .PairSeparator -}}
{{- .PairSeparator -}}
{{- else if eq .S.Channel "error4" }}
{{/* Too many BatchSize commands */}}
{{- range $asset, $pairs := $.AssetPairs }}

View File

@@ -27,7 +27,7 @@
{{- range $asset, $pairs := $.AssetPairs }}
{{- if eq $asset.String "spot" }}
{{- range $batch := batch $pairs 3 -}}
{{ assetName $asset }}-{{ $batch.Join -}} -batching
{{ assetName $asset }}-{{ $batch -}} -batching
{{- $.PairSeparator -}}
{{- end -}}
{{- $.BatchSize -}} 3