From b41fe276846bde36fae6da4354f7a1381191d70a Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 9 Aug 2024 03:33:15 +0100 Subject: [PATCH] Kucoin: Add subscription templating and various fixes (#1579) * Currency: Variadic Pairs.Add This version of Pairs.Add is simpler and [more performant](https://gist.github.com/gbjk/06a1fc1832d04ee41213ca518938cf74) Behavioural difference: If there's nothing to add, the same slice is returned unaltered. This seems like good sauce * Currency: Variadic Remove * Common: Add Batch function * Common: Add common.SortStrings for stringers * Subscriptions: Add batching to templates * Subscriptions: Sort list of pairs * Kucoin: Switch to sub templating * Kucoin: Simplify channel prefix usage * Kucoin: Fix race on fetchedFuturesOrderbook * Subscriptions: Filter AssetPairs Now only the assetPairs relevant to the subscription are in the context * Subscriptions: Respect subscription Pairs * Subscriptions: Trim AssetSeparator early We want to trim before checking for "AssetSeparator vs All" because a template should be allowed to reuse a range template and generate just one trailing AssetSeparator whilst using a specific Asset * Kucoin: Fix empty margin asset added * Kucoin: Add Subscription batching Turns out that contary to the documentation, kucoin supports batching of all symbols and currencies * Kucoin: Fix checkSubscriptions and coverage * Subscriptions: Simplify error checking This reduces the complexity of error checking to just be "do we get the correct numbers". Fixes Asset.All with only one asset erroring on xpandPairs, because we trimmed the only asset separator, and then errored that we're not xpanding Assets and the asset on the sub is asset.All This use-case conflicted with commit 6bbd546d74, which required: ``` Subscriptions: Trim AssetSeparator early We want to trim before checking for "AssetSeparator vs All" because a template should be allowed to reuse a range template and generate just one trailing AssetSeparator whilst using a specific Asset ``` Now we set up the assets earlier, and we remove the check for xpandAssets, since the number of asset lines matching is all that matters. I've removed the asset tests for this, but they were correctly erroring on the number of asset lines instead. Everything hits coverage, as well. * Kucoin: Remove deprecated fundingBook endpoint * BTCMarkets: Use common.Batch --- README.md | 10 +- .../exchanges_templates/kucoin.tmpl | 40 ++ .../exchanges_templates/subscription.tmpl | 20 +- common/common.go | 46 +- common/common_test.go | 53 +- currency/manager.go | 11 +- currency/manager_test.go | 40 +- currency/pairs.go | 28 +- currency/pairs_test.go | 121 +--- exchanges/asset/asset.go | 3 +- exchanges/btcmarkets/btcmarkets_wrapper.go | 22 +- exchanges/exchange.go | 14 +- exchanges/kucoin/README.md | 106 +-- exchanges/kucoin/kucoin.go | 2 +- exchanges/kucoin/kucoin_test.go | 298 ++++++--- exchanges/kucoin/kucoin_types.go | 18 +- exchanges/kucoin/kucoin_websocket.go | 622 ++++++++---------- exchanges/kucoin/kucoin_wrapper.go | 23 +- exchanges/kucoin/testdata/wsHandleData.json | 2 - exchanges/orderbook/orderbook.go | 15 +- exchanges/subscription/README.md | 20 +- exchanges/subscription/fixtures_test.go | 35 +- exchanges/subscription/list.go | 2 +- exchanges/subscription/list_test.go | 12 +- exchanges/subscription/template.go | 207 +++--- exchanges/subscription/template_test.go | 100 ++- exchanges/subscription/testdata/errors.tmpl | 39 +- .../subscription/testdata/subscriptions.tmpl | 53 +- 28 files changed, 1039 insertions(+), 923 deletions(-) create mode 100644 cmd/documentation/exchanges_templates/kucoin.tmpl diff --git a/README.md b/README.md index 5b885954..be393b5a 100644 --- a/README.md +++ b/README.md @@ -142,12 +142,12 @@ Binaries will be published once the codebase reaches a stable condition. |User|Contribution Amount| |--|--| -| [thrasher-](https://github.com/thrasher-) | 690 | -| [shazbert](https://github.com/shazbert) | 330 | -| [dependabot[bot]](https://github.com/apps/dependabot) | 287 | +| [thrasher-](https://github.com/thrasher-) | 692 | +| [shazbert](https://github.com/shazbert) | 333 | +| [dependabot[bot]](https://github.com/apps/dependabot) | 293 | | [gloriousCode](https://github.com/gloriousCode) | 234 | | [dependabot-preview[bot]](https://github.com/apps/dependabot-preview) | 88 | -| [gbjk](https://github.com/gbjk) | 76 | +| [gbjk](https://github.com/gbjk) | 80 | | [xtda](https://github.com/xtda) | 47 | | [lrascao](https://github.com/lrascao) | 27 | | [Beadko](https://github.com/Beadko) | 17 | @@ -162,8 +162,8 @@ Binaries will be published once the codebase reaches a stable condition. | [marcofranssen](https://github.com/marcofranssen) | 8 | | [140am](https://github.com/140am) | 8 | | [TaltaM](https://github.com/TaltaM) | 6 | +| [cranktakular](https://github.com/cranktakular) | 6 | | [dackroyd](https://github.com/dackroyd) | 5 | -| [cranktakular](https://github.com/cranktakular) | 5 | | [khcchiu](https://github.com/khcchiu) | 5 | | [yangrq1018](https://github.com/yangrq1018) | 4 | | [woshidama323](https://github.com/woshidama323) | 3 | diff --git a/cmd/documentation/exchanges_templates/kucoin.tmpl b/cmd/documentation/exchanges_templates/kucoin.tmpl new file mode 100644 index 00000000..1eb9ca85 --- /dev/null +++ b/cmd/documentation/exchanges_templates/kucoin.tmpl @@ -0,0 +1,40 @@ +{{define "exchanges kucoin" -}} +{{template "header" .}} +## Kucoin Exchange + +### Current Features + ++ REST Support ++ Websocket Support + +### Subscriptions + +Default Public Subscriptions: +- Ticker for spot, margin and futures +- Orderbook for spot, margin and futures +- All trades for spot and margin + +Default Authenticated Subscriptions: +- All trades for futures +- Stop Order Lifecycle events for futures +- Account Balance events for spot, margin and futures +- Margin Position updates +- Margin Loan updates + +Subscriptions are subject to enabled assets and pairs. + +Limitations: +- 100 symbols per subscription +- 300 symbols per connection + +Due to these limitations, if more than 10 symbols are enabled, ticker will subscribe to ticker:all. + +Unimplemented subscriptions: +- Candles for Futures +- Market snapshot for currency + +### Please click GoDocs chevron above to view current GoDoc information for this package + +{{template "contributions"}} +{{template "donations" .}} +{{end}} diff --git a/cmd/documentation/exchanges_templates/subscription.tmpl b/cmd/documentation/exchanges_templates/subscription.tmpl index b54b7cd5..4df4403a 100644 --- a/cmd/documentation/exchanges_templates/subscription.tmpl +++ b/cmd/documentation/exchanges_templates/subscription.tmpl @@ -20,12 +20,30 @@ The template is provided with a single context structure: AssetPairs map[asset.Item]currency.Pairs AssetSeparator string PairSeparator string + BatchSize string ``` Subscriptions may fan out many channels for assets and pairs, to support exchanges which require individual subscriptions. -To allow the template to communicate how to handle its output it should use the provided separators: +To allow the template to communicate how to handle its output it should use the provided directives: - AssetSeparator should be added at the end of each section related to assets - PairSeparator should be added at the end of each pair +- BatchSize should be added with a number directly before AssetSeparator to indicate pairs have been batched + +Example: +```{{` +{{- range $asset, $pairs := $.AssetPairs }} + {{- range $b := batch $pairs 30 -}} + {{- $.S.Channel -}} : {{- $b.Join -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 30 + {{- $.AssetSeparator }} +{{- end }} +`}}``` + +Assets and pairs should be output in the sequence in AssetPairs since text/template range function uses an sorted order for map keys. + +Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription We use separators like this because it allows mono-templates to decide at runtime whether to fan out. diff --git a/common/common.go b/common/common.go index 9b8ec990..2aec8fc8 100644 --- a/common/common.go +++ b/common/common.go @@ -15,6 +15,7 @@ import ( "path/filepath" "reflect" "regexp" + "slices" "strconv" "strings" "sync" @@ -388,20 +389,6 @@ func ChangePermission(directory string) error { }) } -// SplitStringSliceByLimit splits a slice of strings into slices by input limit and returns a slice of slice of strings -func SplitStringSliceByLimit(in []string, limit uint) [][]string { - var stringSlice []string - sliceSlice := make([][]string, 0, len(in)/int(limit)+1) - for len(in) >= int(limit) { - stringSlice, in = in[:limit], in[limit:] - sliceSlice = append(sliceSlice, stringSlice) - } - if len(in) > 0 { - sliceSlice = append(sliceSlice, in) - } - return sliceSlice -} - // AddPaddingOnUpperCase adds padding to a string when detecting an upper case letter. If // there are multiple upper case items like `ThisIsHTTPExample`, it will only // pad between like this `This Is HTTP Example`. @@ -653,3 +640,34 @@ func GetTypeAssertError(required string, received interface{}, fieldDescription } return fmt.Errorf("%w from %T to %s%s", ErrTypeAssertFailure, received, required, description) } + +// Batch takes a slice type and converts it into a slice of containing slices of length batchSize, and any remainder in the final batch +// batchSize <= 0 will return the entire input slice in one batch +func Batch[S ~[]E, E any](blobs S, batchSize int) []S { + if len(blobs) == 0 { + return []S{} + } + blobs = slices.Clone(blobs) + if batchSize <= 0 { + return []S{blobs} + } + i := 0 + batches := make([]S, (len(blobs)+batchSize-1)/batchSize) + for batchSize < len(blobs) { + blobs, batches[i] = blobs[batchSize:], blobs[:batchSize:batchSize] + i++ + } + if len(blobs) > 0 { + batches[i] = blobs + } + return batches +} + +// SortStrings takes a slice of fmt.Stringer implementers and returns a new ascending sorted slice +func SortStrings[S ~[]E, E fmt.Stringer](x S) S { + n := slices.Clone(x) + slices.SortFunc(n, func(a, b E) int { + return strings.Compare(a.String(), b.String()) + }) + return n +} diff --git a/common/common_test.go b/common/common_test.go index a468392b..93ebfe7a 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -565,26 +565,6 @@ func initStringSlice(size int) (out []string) { return } -func TestSplitStringSliceByLimit(t *testing.T) { - t.Parallel() - slice50 := initStringSlice(50) - out := SplitStringSliceByLimit(slice50, 20) - if len(out) != 3 { - t.Errorf("expected len() to be 3 instead received: %v", len(out)) - } - if len(out[0]) != 20 { - t.Errorf("expected len() to be 20 instead received: %v", len(out[0])) - } - - out = SplitStringSliceByLimit(slice50, 50) - if len(out) != 1 { - t.Errorf("expected len() to be 3 instead received: %v", len(out)) - } - if len(out[0]) != 50 { - t.Errorf("expected len() to be 20 instead received: %v", len(out[0])) - } -} - func TestAddPaddingOnUpperCase(t *testing.T) { t.Parallel() @@ -856,3 +836,36 @@ func TestErrorCollector(t *testing.T) { require.True(t, ok, "Must return a multiError") assert.Len(t, errs.Unwrap(), 2, "Should have 2 errors") } + +// TestBatch ensures the Batch function does not regress into common behavioural faults if implementation changes +func TestBatch(t *testing.T) { + s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + b := Batch(s, 3) + require.Len(t, b, 4) + assert.Len(t, b[0], 3) + assert.Len(t, b[3], 1) + + b[0][0] = 42 + assert.Equal(t, 1, s[0], "Changing the batches must not change the source") + + require.NotPanics(t, func() { Batch(s, -1) }, "Must not panic on negative batch size") + done := make(chan any, 1) + go func() { done <- Batch(s, 0) }() + require.Eventually(t, func() bool { return len(done) > 0 }, time.Second, time.Millisecond, "Batch 0 must not hang") + + for _, i := range []int{-1, 0, 50} { + b = Batch(s, i) + require.Lenf(t, b, 1, "A batch size of %v should produce a single batch", i) + assert.Lenf(t, b[0], len(s), "A batch size of %v should produce a single batch", i) + } +} + +type A int + +func (a A) String() string { + return strconv.Itoa(int(a)) +} + +func TestSortStrings(t *testing.T) { + assert.Equal(t, []A{1, 2, 5, 6}, SortStrings([]A{6, 2, 5, 1})) +} diff --git a/currency/manager.go b/currency/manager.go index d7639f2c..8486aa78 100644 --- a/currency/manager.go +++ b/currency/manager.go @@ -289,11 +289,14 @@ func (p *PairsManager) DisablePair(a asset.Item, pair Pair) error { return err } - enabled, err := pairStore.Enabled.Remove(pair) - if err != nil { - return err + enabledLen := len(pairStore.Enabled) + + pairStore.Enabled = pairStore.Enabled.Remove(pair) + + if enabledLen == len(pairStore.Enabled) { + return fmt.Errorf("%w %s", ErrPairNotFound, pair) } - pairStore.Enabled = enabled + return nil } diff --git a/currency/manager_test.go b/currency/manager_test.go index 4f50b929..a7064846 100644 --- a/currency/manager_test.go +++ b/currency/manager_test.go @@ -394,42 +394,30 @@ func TestDisablePair(t *testing.T) { t.Parallel() p := initTest(t) - if err := p.DisablePair(asset.Empty, EMPTYPAIR); !errors.Is(err, asset.ErrNotSupported) { - t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported) - } + err := p.DisablePair(asset.Empty, EMPTYPAIR) + assert.ErrorIs(t, err, asset.ErrNotSupported, "Empty asset should error") - if err := p.DisablePair(asset.Spot, EMPTYPAIR); !errors.Is(err, ErrCurrencyPairEmpty) { - t.Fatalf("received: '%v' but expected: '%v'", err, ErrCurrencyPairEmpty) - } + err = p.DisablePair(asset.Spot, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Empty pair should error") p.Pairs = nil - // Test disabling a pair when the pair manager is not initialised - if err := p.DisablePair(asset.Spot, NewPair(BTC, USD)); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(BTC, USD)) + assert.ErrorIs(t, err, ErrPairManagerNotInitialised, "Uninitialised PairManager should error") - // Test asset type which doesn't exist p = initTest(t) - if err := p.DisablePair(asset.Futures, EMPTYPAIR); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.CoinMarginedFutures, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Non-existent asset type should error") - // Test asset type which has an empty pair store p.Pairs[asset.Spot] = nil - if err := p.DisablePair(asset.Spot, EMPTYPAIR); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Empty pair store should error") - // Test disabling a pair which isn't enabled p = initTest(t) - if err := p.DisablePair(asset.Spot, NewPair(LTC, USD)); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(LTC, USD)) + assert.ErrorIs(t, err, ErrPairNotFound, "Not Enabled pair should error") - // Test disabling a valid pair and ensure nil is empty - if err := p.DisablePair(asset.Spot, NewPair(BTC, USD)); err != nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(BTC, USD)) + assert.NoError(t, err, "DisablePair should not error") } func TestEnablePair(t *testing.T) { diff --git a/currency/pairs.go b/currency/pairs.go index aea9f168..1d9dc884 100644 --- a/currency/pairs.go +++ b/currency/pairs.go @@ -199,28 +199,26 @@ func (p Pairs) GetPairsByCurrencies(currencies Currencies) Pairs { return pairs } -// Remove removes the specified pair from the list of pairs if it exists -func (p Pairs) Remove(pair Pair) (Pairs, error) { - pairs := slices.Clone(p) - for x := range p { - if p[x].Equal(pair) { - return append(pairs[:x], pairs[x+1:]...), nil +// Remove removes the specified pairs from the list of pairs if they exist +func (p Pairs) Remove(rem ...Pair) Pairs { + n := make(Pairs, 0, len(p)) + for _, pN := range p { + if !slices.ContainsFunc(rem, func(pX Pair) bool { return pX.Equal(pN) }) { + n = append(n, pN) } } - return nil, fmt.Errorf("%s %w", pair, ErrPairNotFound) + return slices.Clip(n) } -// Add adds a specified pair to the list of pairs if it doesn't exist +// Add adds pairs to the list of pairs ignoring duplicates func (p Pairs) Add(pairs ...Pair) Pairs { - merge := append(slices.Clone(p), pairs...) - var filterInt int - for x := len(p); x < len(merge); x++ { - if !merge[:len(p)+filterInt].Contains(merge[x], true) { - merge[len(p)+filterInt] = merge[x] - filterInt++ + n := slices.Clone(p) + for _, a := range pairs { + if !n.Contains(a, true) { + n = append(n, a) } } - return merge[:len(p)+filterInt] + return n } // GetMatch returns either the pair that is equal including the reciprocal for diff --git a/currency/pairs_test.go b/currency/pairs_test.go index f305f846..96191748 100644 --- a/currency/pairs_test.go +++ b/currency/pairs_test.go @@ -3,6 +3,7 @@ package currency import ( "encoding/json" "errors" + "slices" "testing" "github.com/stretchr/testify/assert" @@ -202,108 +203,40 @@ func TestRemove(t *testing.T) { NewPair(LTC, USDT), } - compare := make(Pairs, len(oldPairs)) - copy(compare, oldPairs) + compare := slices.Clone(oldPairs) - p := NewPair(BTC, USD) - newPairs, err := oldPairs.Remove(p) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } + newPairs := oldPairs.Remove(oldPairs[:2]...) - err = compare.ContainsAll(oldPairs, true) - if err != nil { - t.Fatal(err) - } + err := compare.ContainsAll(oldPairs, true) + assert.NoError(t, err, "Remove should not affect the original pairs") - if newPairs.Contains(p, true) || len(newPairs) != 2 { - t.Error("TestRemove unexpected result") - } + require.Len(t, newPairs, 1, "Remove should remove a pair") + require.Equal(t, oldPairs[2], newPairs[0], "Remove should leave the final pair") - _, err = newPairs.Remove(p) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } - - newPairs, err = oldPairs.Remove(p) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - newPairs, err = newPairs.Remove(NewPair(LTC, USD)) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - err = compare.ContainsAll(oldPairs, true) - if err != nil { - t.Fatal(err) - } - - _, err = newPairs.Remove(NewPair(LTC, USD)) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } - - newPairs, err = newPairs.Remove(NewPair(LTC, USDT)) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - if len(newPairs) != 0 { - t.Error("unexpected value") - } - - _, err = newPairs.Remove(NewPair(LTC, USDT)) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } + newPairs = newPairs.Remove(oldPairs[0]) + assert.Len(t, newPairs, 1, newPairs, "Remove have no effect on non-included pairs") } func TestAdd(t *testing.T) { t.Parallel() - var pairs = Pairs{ - NewPair(BTC, USD), - NewPair(LTC, USD), - NewPair(LTC, USDT), - } - // Test adding a new pair to the list of pairs - p := NewPair(BTC, USDT) - pairs = pairs.Add(p) - if !pairs.Contains(p, true) || len(pairs) != 4 { - t.Error("TestAdd unexpected result") - } - // Now test adding a pair which already exists - pairs = pairs.Add(p) - if len(pairs) != 4 { - t.Error("TestAdd unexpected result") - } - // Test adding multiple pairs - pairs = pairs.Add(NewPair(BTC, LTC), NewPair(ETH, USD)) - if len(pairs) != 6 { - t.Error("TestAdd unexpected result") - } - // Test adding multiple duplicate pairs - pairs = pairs.Add(NewPair(ETH, USDT), NewPair(ETH, USDT)) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - // Test whether the original pairs have been modified - pairsWithExtraBaggage := make(Pairs, 0, len(pairs)+3) - pairsWithExtraBaggage = append(pairsWithExtraBaggage, pairs...) - brain := NewPair(BRAIN, USD) - withBrain := pairsWithExtraBaggage.Add(NewPair(BTC, LTC), brain) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - assert.Equal(t, brain, withBrain[len(withBrain)-1]) - badger := NewPair(BADGER, USD) - withBadger := pairsWithExtraBaggage.Add(NewPair(BTC, LTC), badger) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - assert.Equal(t, badger, withBadger[len(withBadger)-1]) - assert.Equal(t, brain, withBrain[len(withBrain)-1]) + orig := Pairs{NewPair(BTC, USD), NewPair(LTC, USD), NewPair(LTC, USDT)} + p := slices.Clone(orig) + p2 := Pairs{NewPair(BTC, USDT), NewPair(ETH, USD), NewPair(BTC, ETH)} + + pT := p.Add(p...) + assert.Equal(t, pT.Join(), orig.Join(), "Adding only existing pairs should return same Pairs") + assert.Equal(t, p.Join(), orig.Join(), "Should not effect original") + + pT = p.Add(p2...) + assert.Equal(t, pT.Join(), append(orig, p2...).Join(), "Adding new pairs should return correct Pairs") + assert.Equal(t, p.Join(), orig.Join(), "Should not effect original") + + p = slices.Grow(slices.Clone(orig), len(p2)) // Grow so that append doesn't alloc + pT1 := p.Add(p2[0]) + pT2 := p.Add(p2[1]) + pT1[3] = p2[2] // If Add doesn't allocate an new underlying array, this would affect PT2 as well + assert.Equal(t, p.Join(), orig.Join(), "Pairs underlying array should not be shared with original") + assert.Equal(t, pT2.Join(), append(orig, p2[1]).Join(), "Pairs underlying array should not be shared with siblings") } func TestContains(t *testing.T) { diff --git a/exchanges/asset/asset.go b/exchanges/asset/asset.go index 52f13cf4..e229dffe 100644 --- a/exchanges/asset/asset.go +++ b/exchanges/asset/asset.go @@ -153,8 +153,7 @@ func (a Items) JoinToString(separator string) string { return strings.Join(a.Strings(), separator) } -// IsValid returns whether or not the supplied asset type is valid or -// not +// IsValid returns whether or not the supplied asset type is valid or not func (a Item) IsValid() bool { return a != Empty && supportedFlag&a == a } diff --git a/exchanges/btcmarkets/btcmarkets_wrapper.go b/exchanges/btcmarkets/btcmarkets_wrapper.go index ece27598..e189ddab 100644 --- a/exchanges/btcmarkets/btcmarkets_wrapper.go +++ b/exchanges/btcmarkets/btcmarkets_wrapper.go @@ -613,7 +613,7 @@ func (b *BTCMarkets) CancelBatchOrders(ctx context.Context, o []order.Cancel) (* // CancelAllOrders cancels all orders associated with a currency pair func (b *BTCMarkets) CancelAllOrders(ctx context.Context, _ *order.Cancel) (order.CancelAllResponse, error) { - var resp order.CancelAllResponse + resp := order.CancelAllResponse{Status: map[string]string{}} orders, err := b.GetOrders(ctx, "", -1, -1, -1, true) if err != nil { return resp, err @@ -623,21 +623,18 @@ func (b *BTCMarkets) CancelAllOrders(ctx context.Context, _ *order.Cancel) (orde for x := range orders { orderIDs[x] = orders[x].OrderID } - splitOrders := common.SplitStringSliceByLimit(orderIDs, 20) - tempMap := make(map[string]string) - for z := range splitOrders { - tempResp, err := b.CancelBatch(ctx, splitOrders[z]) + for _, batch := range common.Batch(orderIDs, 20) { + cancelResp, err := b.CancelBatch(ctx, batch) if err != nil { return resp, err } - for y := range tempResp.CancelOrders { - tempMap[tempResp.CancelOrders[y].OrderID] = "Success" + for _, r := range cancelResp.CancelOrders { + resp.Status[r.OrderID] = "Success" } - for z := range tempResp.UnprocessedRequests { - tempMap[tempResp.UnprocessedRequests[z].RequestID] = "Cancellation Failed" + for _, r := range cancelResp.UnprocessedRequests { + resp.Status[r.RequestID] = "Cancellation Failed" } } - resp.Status = tempMap return resp, nil } @@ -883,9 +880,8 @@ func (b *BTCMarkets) GetOrderHistory(ctx context.Context, req *order.MultiOrderR tempArray = append(tempArray, orders[z].OrderID) } } - splitOrders := common.SplitStringSliceByLimit(tempArray, 50) - for x := range splitOrders { - tempData, err := b.GetBatchTrades(ctx, splitOrders[x]) + for _, batch := range common.Batch(tempArray, 50) { + tempData, err := b.GetBatchTrades(ctx, batch) if err != nil { return resp, err } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 318f2d0c..17646350 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -776,10 +776,7 @@ func (b *Base) UpdatePairs(incoming currency.Pairs, a asset.Item, enabled, force if err != nil { continue } - diff.Remove, err = diff.Remove.Remove(enabledPairs[x]) - if err != nil { - return err - } + diff.Remove = diff.Remove.Remove(enabledPairs[x]) enabledPairs[target] = match.Format(pFmt) } target++ @@ -1823,19 +1820,14 @@ func (b *Base) ParallelChanOp(channels subscription.List, m func(subscription.Li return errBatchSizeZero } - var j int - for i := 0; i < len(channels); i += batchSize { - j += batchSize - if j >= len(channels) { - j = len(channels) - } + for _, b := range common.Batch(channels, batchSize) { wg.Add(1) go func(c subscription.List) { defer wg.Done() if err := m(c); err != nil { errC <- err } - }(channels[i:j]) + }(b) } wg.Wait() diff --git a/exchanges/kucoin/README.md b/exchanges/kucoin/README.md index ab42b51c..3e122a4d 100644 --- a/exchanges/kucoin/README.md +++ b/exchanges/kucoin/README.md @@ -25,101 +25,35 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader + REST Support + Websocket Support -### How to enable +### Subscriptions -+ [Enable via configuration](https://github.com/thrasher-corp/gocryptotrader/tree/master/config#enable-exchange-via-config-example) +Default Public Subscriptions: +- Ticker for spot, margin and futures +- Orderbook for spot, margin and futures +- All trades for spot and margin -+ Individual package example below: +Default Authenticated Subscriptions: +- All trades for futures +- Stop Order Lifecycle events for futures +- Account Balance events for spot, margin and futures +- Margin Position updates +- Margin Loan updates -```go - // Exchanges will be abstracted out in further updates and examples will be - // supplied then -``` +Subscriptions are subject to enabled assets and pairs. -### How to do REST public/private calls +Limitations: +- 100 symbols per subscription +- 300 symbols per connection -+ If enabled via "configuration".json file the exchange will be added to the -IBotExchange array in the ```go var bot Bot``` and you will only be able to use -the wrapper interface functions for accessing exchange data. View routines.go -for an example of integration usage with GoCryptoTrader. Rudimentary example -below: +Due to these limitations, if more than 10 symbols are enabled, ticker will subscribe to ticker:all. -main.go -```go -var b exchange.IBotExchange - -for i := range bot.Exchanges { - if bot.Exchanges[i].GetName() == "Kucoin" { - b = bot.Exchanges[i] - } -} - -// Public calls - wrapper functions - -// Fetches current ticker information -tick, err := b.FetchTicker() -if err != nil { - // Handle error -} - -// Fetches current orderbook information -ob, err := b.FetchOrderbook() -if err != nil { - // Handle error -} - -// Private calls - wrapper functions - make sure your APIKEY and APISECRET are -// set and AuthenticatedAPISupport is set to true - -// Fetches current account information -accountInfo, err := b.GetAccountInfo() -if err != nil { - // Handle error -} -``` - -+ If enabled via individually importing package, rudimentary example below: - -```go -// Public calls - -// Fetches current ticker information -ticker, err := b.GetTicker() -if err != nil { - // Handle error -} - -// Fetches current orderbook information -ob, err := b.GetOrderBook() -if err != nil { - // Handle error -} - -// Private calls - make sure your APIKEY and APISECRET are set and -// AuthenticatedAPISupport is set to true - -// GetUserInfo returns account info -accountInfo, err := b.GetUserInfo(...) -if err != nil { - // Handle error -} - -// Submits an order and the exchange and returns its tradeID -tradeID, err := b.Trade(...) -if err != nil { - // Handle error -} -``` - -### How to do Websocket public/private calls - -```go - // Exchanges will be abstracted out in further updates and examples will be - // supplied then -``` +Unimplemented subscriptions: +- Candles for Futures +- Market snapshot for currency ### Please click GoDocs chevron above to view current GoDoc information for this package + ## Contribution Please feel free to submit any pull requests or suggest any desired features to be added. diff --git a/exchanges/kucoin/kucoin.go b/exchanges/kucoin/kucoin.go index 4543679c..53a13894 100644 --- a/exchanges/kucoin/kucoin.go +++ b/exchanges/kucoin/kucoin.go @@ -1764,7 +1764,7 @@ func (ku *Kucoin) SendAuthHTTPRequest(ctx context.Context, ePath exchange.URL, e return resp.GetError() } -func (ku *Kucoin) intervalToString(interval kline.Interval) (string, error) { +func intervalToString(interval kline.Interval) (string, error) { switch interval { case kline.OneMin: return "1min", nil diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index 5b88b128..863c239c 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -6,7 +6,6 @@ import ( "errors" "log" "os" - "strings" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/core" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" @@ -28,6 +28,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -58,7 +59,7 @@ func TestMain(m *testing.M) { getFirstTradablePairOfAssets() ku.setupOrderbookManager() - fetchedFuturesSnapshotOrderbook = map[string]bool{} + fetchedFuturesOrderbook = map[string]bool{} os.Exit(m.Run()) } @@ -1973,130 +1974,141 @@ func TestPushData(t *testing.T) { testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", ku.wsHandleData) } -func verifySubs(tb testing.TB, subs subscription.List, a asset.Item, prefix string, expected ...string) { - tb.Helper() - var sub *subscription.Subscription - for i, s := range subs { - if s.Asset == a && strings.HasPrefix(s.Channel, prefix) { - if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) { - continue - } - if sub != nil { - assert.Failf(tb, "Too many subs with prefix", "Asset %s; Prefix %s", a.String(), prefix) - return - } - sub = subs[i] - } - } - if assert.NotNil(tb, sub, "Should find a sub for asset %s with prefix %s for %s", a.String(), prefix, strings.Join(expected, ", ")) { - suffix := strings.TrimPrefix(sub.Channel, prefix) - if len(expected) == 0 { - assert.Empty(tb, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix) - } else { - currs := strings.Split(suffix, ",") - assert.ElementsMatch(tb, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix) - } - } -} - -// Pairs for Subscription tests: -// Only in Spot: BTC-USDT, ETH-USDT -// In Both: ETH-BTC, LTC-USDT -// Only in Margin: TRX-BTC, SOL-USDC - func TestGenerateSubscriptions(t *testing.T) { t.Parallel() + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + + // Pairs overlap for spot/margin tests: + // Only in Spot: BTC-USDT, ETH-USDT + // In Both: ETH-BTC, LTC-USDT + // Only in Margin: TRX-BTC, SOL-USDC + subPairs := currency.Pairs{} + for _, pp := range [][]string{ + {"BTC", "USDT", "-"}, {"ETH", "BTC", "-"}, {"ETH", "USDT", "-"}, {"LTC", "USDT", "-"}, // Spot + {"ETH", "BTC", "-"}, {"LTC", "USDT", "-"}, {"SOL", "USDC", "-"}, {"TRX", "BTC", "-"}, // Margin + {"ETH", "USDCM", ""}, {"SOL", "USDTM", ""}, {"XBT", "USDCM", ""}, // Futures + } { + subPairs = append(subPairs, currency.NewPairWithDelimiter(pp[0], pp[1], pp[2])) + } + + exp := subscription.List{ + {Channel: subscription.TickerChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/market/ticker:" + subPairs[0:4].Join()}, + {Channel: subscription.TickerChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/market/ticker:" + subPairs[6:8].Join()}, + {Channel: subscription.TickerChannel, Asset: asset.Futures, Pairs: subPairs[8:], QualifiedChannel: "/contractMarket/tickerV2:" + subPairs[8:].Join()}, + {Channel: subscription.OrderbookChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/spotMarket/level2Depth5:" + subPairs[0:4].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.OrderbookChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/spotMarket/level2Depth5:" + subPairs[6:8].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.OrderbookChannel, Asset: asset.Futures, Pairs: subPairs[8:], QualifiedChannel: "/contractMarket/level2Depth5:" + subPairs[8:].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.AllTradesChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/market/match:" + subPairs[0:4].Join()}, + {Channel: subscription.AllTradesChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/market/match:" + subPairs[6:8].Join()}, + } + subs, err := ku.generateSubscriptions() require.NoError(t, err, "generateSubscriptions must not error") + testsubs.EqualLists(t, exp, subs) - assert.Len(t, subs, 11, "Should generate the correct number of subs when not logged in") - - verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. - - verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") - - verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") - - for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) - verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) - } -} - -func TestGenerateAuthSubscriptions(t *testing.T) { - t.Parallel() - - ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes ku.Websocket.SetCanUseAuthenticatedEndpoints(true) + var loanPairs currency.Pairs + loanCurrs := common.SortStrings(subPairs[0:8].GetCurrencies()) + for _, c := range loanCurrs { + loanPairs = append(loanPairs, currency.Pair{Base: c}) + } + + exp = append(exp, subscription.List{ + {Asset: asset.Futures, Channel: futuresTradeOrderChannel, QualifiedChannel: "/contractMarket/tradeOrders", Pairs: subPairs[8:]}, + {Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, QualifiedChannel: "/contractMarket/advancedOrders", Pairs: subPairs[8:]}, + {Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, QualifiedChannel: "/contractAccount/wallet", Pairs: subPairs[8:]}, + {Asset: asset.Margin, Channel: marginPositionChannel, QualifiedChannel: "/margin/position", Pairs: subPairs[4:8]}, + {Asset: asset.Margin, Channel: marginLoanChannel, QualifiedChannel: "/margin/loan:" + loanCurrs.Join(), Pairs: loanPairs}, + {Channel: accountBalanceChannel, QualifiedChannel: "/account/balance"}, + }...) + + subs, err = ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions with Auth must not error") + testsubs.EqualLists(t, exp, subs) +} + +func TestGenerateTickerAllSub(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + for i := 0; i <= 10; i++ { + err = ku.CurrencyPairs.EnablePair(asset.Spot, avail[i]) + require.NoError(t, common.ExcludeError(err, currency.ErrPairAlreadyEnabled), "EnablePair must not error") + } + + enabled, err := ku.GetEnabledPairs(asset.Spot) + require.NoError(t, err, "GetEnabledPairs must not error") + + ku.Features.Subscriptions = subscription.List{{Channel: subscription.TickerChannel, Asset: asset.Spot}} + exp := subscription.List{ + {Channel: subscription.TickerChannel, Asset: asset.Spot, QualifiedChannel: "/market/ticker:all", Pairs: enabled}, + } subs, err := ku.generateSubscriptions() require.NoError(t, err, "generateSubscriptions with Auth must not error") - assert.Len(t, subs, 24, "Should generate the correct number of subs when logged in") - - verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. - - verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") - - verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") - - for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) - verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) - } - for _, c := range []string{"SOL", "BTC", "TRX", "LTC", "USDC", "USDT", "ETH"} { - verifySubs(t, subs, asset.Margin, "/margin/loan:", c) - } - verifySubs(t, subs, asset.Spot, "/account/balance") - verifySubs(t, subs, asset.Margin, "/margin/position") - verifySubs(t, subs, asset.Margin, "/margin/fundingBook:", "SOL", "BTC", "TRX", "LTC", "USDT", "USDC", "ETH") - verifySubs(t, subs, asset.Futures, "/contractAccount/wallet") - verifySubs(t, subs, asset.Futures, "/contractMarket/advancedOrders") - verifySubs(t, subs, asset.Futures, "/contractMarket/tradeOrders") + testsubs.EqualLists(t, exp, subs) } -func TestGenerateCandleSubscription(t *testing.T) { +// TestGenerateOtherSubscriptions exercises non-default subscriptions +func TestGenerateOtherSubscriptions(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - ku.Features.Subscriptions = subscription.List{ - {Channel: subscription.CandlesChannel, Interval: kline.FourHour}, + + subs := subscription.List{ + {Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FourHour}, + {Channel: marketSnapshotChannel, Asset: asset.Spot}, } - subs, err := ku.generateSubscriptions() - assert.NoError(t, err, "generateSubscriptions with Candles should not error") - - assert.Len(t, subs, 6, "Should generate the correct number of subs for candles") - for _, c := range []string{"BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC"} { - verifySubs(t, subs, asset.Spot, "/market/candles:", c+"_4hour") - } - for _, c := range []string{"SOL-USDC", "TRX-BTC"} { - verifySubs(t, subs, asset.Margin, "/market/candles:", c+"_4hour") + for _, s := range subs { + ku.Features.Subscriptions = subscription.List{s} + got, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + require.Len(t, got, 1, "Should generate just one sub") + assert.NotEmpty(t, got[0].QualifiedChannel, "Qualified Channel should not be empty") + if got[0].Channel == subscription.CandlesChannel { + assert.Equal(t, "/market/candles:BTC-USDT_4hour,ETH-BTC_4hour,ETH-USDT_4hour,LTC-USDT_4hour", got[0].QualifiedChannel, "QualifiedChannel should be correct") + } } } -func TestGenerateMarketSubscription(t *testing.T) { +// TestCheckSubscriptions ensures checkSubscriptions upgrades user config correctly +func TestCheckSubscriptions(t *testing.T) { t.Parallel() - ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - ku.Features.Subscriptions = subscription.List{ - {Channel: marketSnapshotChannel}, + ku := &Kucoin{ //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + Base: exchange.Base{ + Config: &config.Exchange{ + Features: &config.FeaturesConfig{ + Subscriptions: subscription.List{ + {Enabled: true, Channel: "ticker"}, + {Enabled: true, Channel: "allTrades"}, + {Enabled: true, Channel: "orderbook", Interval: kline.HundredMilliseconds}, + {Enabled: true, Channel: "/contractMarket/tickerV2:%s"}, + {Enabled: true, Channel: "/contractMarket/level2Depth50:%s"}, + {Enabled: true, Channel: "/margin/fundingBook:%s", Authenticated: true}, + {Enabled: true, Channel: "/account/balance", Authenticated: true}, + {Enabled: true, Channel: "/margin/position", Authenticated: true}, + {Enabled: true, Channel: "/margin/loan:%s", Authenticated: true}, + {Enabled: true, Channel: "/contractMarket/tradeOrders", Authenticated: true}, + {Enabled: true, Channel: "/contractMarket/advancedOrders", Authenticated: true}, + {Enabled: true, Channel: "/contractAccount/wallet", Authenticated: true}, + }, + }, + }, + Features: exchange.Features{}, + }, } - subs, err := ku.generateSubscriptions() - assert.NoError(t, err, "generateSubscriptions with MarketSnapshot should not error") - - assert.Len(t, subs, 7, "Should generate the correct number of subs for snapshot") - for _, c := range []string{"BTC", "ETH", "LTC", "USDT"} { - verifySubs(t, subs, asset.Spot, "/market/snapshot:", c) - } - for _, c := range []string{"SOL", "USDC", "TRX"} { - verifySubs(t, subs, asset.Margin, "/market/snapshot:", c) - } + ku.checkSubscriptions() + testsubs.EqualLists(t, defaultSubscriptions, ku.Features.Subscriptions) + testsubs.EqualLists(t, defaultSubscriptions, ku.Config.Features.Subscriptions) } func TestGetAvailableTransferChains(t *testing.T) { @@ -2490,14 +2502,90 @@ func TestProcessMarketSnapshot(t *testing.T) { } } -func TestSubscribeMarketSnapshot(t *testing.T) { +// TestSubscribeBatches ensures that endpoints support batching, contrary to kucoin api docs +func TestSubscribeBatches(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} testexch.SetupWs(t, ku) - err := ku.Subscribe(subscription.List{{Channel: marketSymbolSnapshotChannel, Pairs: currency.Pairs{currency.Pair{Base: currency.BTC}}}}) - assert.NoError(t, err, "Subscribe to MarketSnapshot should not error") + ku.Features.Subscriptions = subscription.List{ + {Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Asset: asset.Futures, Channel: subscription.TickerChannel}, + {Asset: asset.Spot, Channel: marketSnapshotChannel}, + } + + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, len(ku.Features.Subscriptions), "Must generate batched subscriptions") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe to small batches should not error") +} + +// TestSubscribeTickerAll ensures that ticker subscriptions switch to using all and it works + +// TestSubscribeBatchLimit exercises the kucoin batch limits of 300 per connection +// Ensures batching of 100 pairs and the connection symbol limit is still 300 at Kucoin's end +func TestSubscribeBatchLimit(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, ku) + + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:299], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}} + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 3, "Must get 3 subs") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe must not error") + + err = ku.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:320], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}} + subs, err = ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 4, "Must get 4 subs") + + err = ku.Subscribe(subs) + require.ErrorContains(t, err, "exceed max subscription count limitation of 300 per session", "Subscribe to MarketSnapshot must error above connection symbol limit") +} + +func TestSubscribeTickerAll(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, ku) + + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:500], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.TickerChannel}} + + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 1, "Must generate one subscription") + require.Equal(t, "/market/ticker:all", subs[0].QualifiedChannel, "QualifiedChannel must be correct") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe to must not error") } func TestSeedLocalCache(t *testing.T) { diff --git a/exchanges/kucoin/kucoin_types.go b/exchanges/kucoin/kucoin_types.go index c3ed39ba..1d710c09 100644 --- a/exchanges/kucoin/kucoin_types.go +++ b/exchanges/kucoin/kucoin_types.go @@ -35,7 +35,6 @@ var ( errInvalidLeverage = errors.New("invalid leverage value") errInvalidClientOrderID = errors.New("no client order ID supplied, this endpoint requires a UUID or similar string") errInvalidMsgType = errors.New("message type field not valid") - errSubscriptionPairRequired = errors.New("pair required for manual subscriptions") subAccountRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-32}$") subAccountPassphraseRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-24}$") @@ -956,19 +955,6 @@ type WsPriceIndicator struct { Value float64 `json:"value"` } -// WsMarginFundingBook represents order book changes on margin. -type WsMarginFundingBook struct { - Sequence int64 `json:"sequence"` - Currency string `json:"currency"` - DailyInterestRate float64 `json:"dailyIntRate"` - AnnualInterestRate float64 `json:"annualIntRate"` - Term int64 `json:"term"` - Size float64 `json:"size"` - Side string `json:"side"` - Timestamp convert.ExchangeTime `json:"ts"` // In Nanosecond - -} - // WsTradeOrder represents a private trade order push data. type WsTradeOrder struct { Symbol string `json:"symbol"` @@ -1079,8 +1065,8 @@ type WsFuturesTicker struct { FilledTime convert.ExchangeTime `json:"ts"` } -// WsFuturesOrderbokInfo represents Level 2 order book information. -type WsFuturesOrderbokInfo struct { +// WsFuturesOrderbookInfo represents Level 2 order book information. +type WsFuturesOrderbookInfo struct { Sequence int64 `json:"sequence"` Change string `json:"change"` Timestamp convert.ExchangeTime `json:"timestamp"` diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 5b7c14bd..35738bfe 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" "net/http" + "slices" "strconv" "strings" "sync" + "text/template" "time" "github.com/buger/jsonparser" @@ -18,6 +20,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/request" @@ -28,60 +31,48 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) -var fetchedFuturesSnapshotOrderbook map[string]bool +var fetchedFuturesOrderbookMutex sync.Mutex +var fetchedFuturesOrderbook map[string]bool const ( publicBullets = "/v1/bullet-public" privateBullets = "/v1/bullet-private" // spot channels - marketAllTickersChannel = "/market/ticker:all" - marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}... - marketSymbolSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol} - marketSnapshotChannel = "/market/snapshot:%v" // /market/snapshot:{market} <--- market represents a currency - marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{pair},{pair}... - marketOrderbookLevel2to5Channel = "/spotMarket/level2Depth5:%s" // /spotMarket/level2Depth5:{symbol},{symbol}... - marketOrderbokLevel2To50Channel = "/spotMarket/level2Depth50:%s" // /spotMarket/level2Depth50:{symbol},{symbol}... - marketCandlesChannel = "/market/candles:%s_%s" // /market/candles:{symbol}_{interval} - marketMatchChannel = "/market/match:%s" // /market/match:{symbol},{symbol}... - indexPriceIndicatorChannel = "/indicator/index:%s" // /indicator/index:{symbol0},{symbol1}.. - markPriceIndicatorChannel = "/indicator/markPrice:%s" // /indicator/markPrice:{symbol0},{symbol1}... - marginFundingbookChangeChannel = "/margin/fundingBook:%s" // /margin/fundingBook:{currency0},{currency1}... + marketTickerChannel = "/market/ticker" // /market/ticker:{symbol},... + marketSnapshotChannel = "/market/snapshot" // /market/snapshot:{symbol},... + marketOrderbookChannel = "/market/level2" // /market/level2:{symbol},... + marketOrderbookDepth5Channel = "/spotMarket/level2Depth5" // /spotMarket/level2Depth5:{symbol},... + marketOrderbookDepth50Channel = "/spotMarket/level2Depth50" // /spotMarket/level2Depth50:{symbol},... + marketCandlesChannel = "/market/candles" // /market/candles:{symbol}_{interval},... + marketMatchChannel = "/market/match" // /market/match:{symbol},... + indexPriceIndicatorChannel = "/indicator/index" // /indicator/index:{symbol},... + markPriceIndicatorChannel = "/indicator/markPrice" // /indicator/markPrice:{symbol},... // Private channels privateSpotTradeOrders = "/spotMarket/tradeOrders" accountBalanceChannel = "/account/balance" marginPositionChannel = "/margin/position" - marginLoanChannel = "/margin/loan:%s" // /margin/loan:{currency} + marginLoanChannel = "/margin/loan" // /margin/loan:{currency} spotMarketAdvancedChannel = "/spotMarket/advancedOrders" // futures channels - futuresTickerV2Channel = "/contractMarket/tickerV2:%s" // /contractMarket/tickerV2:{symbol} - futuresTickerChannel = "/contractMarket/ticker:%s" // /contractMarket/ticker:{symbol} - futuresOrderbookLevel2Channel = "/contractMarket/level2:%s" // /contractMarket/level2:{symbol} - futuresExecutionDataChannel = "/contractMarket/execution:%s" // /contractMarket/execution:{symbol} - futuresOrderbookLevel2Depth5Channel = "/contractMarket/level2Depth5:%s" // /contractMarket/level2Depth5:{symbol} - futuresOrderbookLevel2Depth50Channel = "/contractMarket/level2Depth50:%s" // /contractMarket/level2Depth50:{symbol} - futuresContractMarketDataChannel = "/contract/instrument:%s" // /contract/instrument:{symbol} + futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},... + futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},... + futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},... + futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},... + futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},... + futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},... futuresSystemAnnouncementChannel = "/contract/announcement" - futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot:%s" // /contractMarket/snapshot:{symbol} + futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},... // futures private channels - futuresTradeOrdersBySymbolChannel = "/contractMarket/tradeOrders:%s" // /contractMarket/tradeOrders:{symbol} - futuresTradeOrderChannel = "/contractMarket/tradeOrders" + futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},... + futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},... futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders" futuresAccountBalanceEventChannel = "/contractAccount/wallet" - futuresPositionChangeEventChannel = "/contract/position:%s" // /contract/position:{symbol} ) -var subscriptionNames = map[string]string{ - subscription.TickerChannel: marketAllTickersChannel, // This allows more subscriptions on the orderbook channel for this specific connection. - subscription.OrderbookChannel: marketOrderbookLevel2to5Channel, // This does not require a REST request to get the orderbook. - subscription.CandlesChannel: marketCandlesChannel, - subscription.AllTradesChannel: marketMatchChannel, - // No equivalents for: AllOrders, MyTrades, MyOrders -} - var ( // maxWSUpdateBuffer defines max websocket updates to apply when an // orderbook is initially fetched @@ -94,12 +85,27 @@ var ( maxWSOrderbookWorkers = 10 ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Margin, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Futures, Channel: futuresTradeOrderChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Margin, Channel: marginPositionChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Margin, Channel: marginLoanChannel, Authenticated: true}, + {Enabled: true, Channel: accountBalanceChannel, Authenticated: true}, +} + // WsConnect creates a new websocket connection. func (ku *Kucoin) WsConnect() error { if !ku.Websocket.IsEnabled() || !ku.IsEnabled() { return stream.ErrWebsocketNotEnabled } - fetchedFuturesSnapshotOrderbook = map[string]bool{} + fetchedFuturesOrderbookMutex.Lock() + fetchedFuturesOrderbook = map[string]bool{} + fetchedFuturesOrderbookMutex.Unlock() var dialer websocket.Dialer dialer.HandshakeTimeout = ku.Config.HTTPTimeout dialer.Proxy = http.ProxyFromEnvironment @@ -206,9 +212,8 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { return nil } topicInfo := strings.Split(resp.Topic, ":") - switch { - case strings.HasPrefix(marketAllTickersChannel, topicInfo[0]), - strings.HasPrefix(marketTickerChannel, topicInfo[0]): + switch topicInfo[0] { + case marketTickerChannel: var instruments string if topicInfo[1] == "all" { instruments = resp.Subject @@ -216,117 +221,74 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { instruments = topicInfo[1] } return ku.processTicker(resp.Data, instruments, topicInfo[0]) - case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]): + case marketSnapshotChannel: return ku.processMarketSnapshot(resp.Data, topicInfo[0]) - case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]): + case marketOrderbookChannel: return ku.processOrderbookWithDepth(respData, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(marketOrderbookLevel2to5Channel, topicInfo[0]), - strings.HasPrefix(marketOrderbokLevel2To50Channel, topicInfo[0]): + case marketOrderbookDepth5Channel, marketOrderbookDepth50Channel: return ku.processOrderbook(resp.Data, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(marketCandlesChannel, topicInfo[0]): + case marketCandlesChannel: symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter) if len(symbolAndInterval) != 2 { return errMalformedData } return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0]) - case strings.HasPrefix(marketMatchChannel, topicInfo[0]): + case marketMatchChannel: return ku.processTradeData(resp.Data, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(indexPriceIndicatorChannel, topicInfo[0]): + case indexPriceIndicatorChannel, markPriceIndicatorChannel: var response WsPriceIndicator return ku.processData(resp.Data, &response) - case strings.HasPrefix(markPriceIndicatorChannel, topicInfo[0]): - var response WsPriceIndicator - return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginFundingbookChangeChannel, topicInfo[0]): - var response WsMarginFundingBook - return ku.processData(resp.Data, &response) - case strings.HasPrefix(privateSpotTradeOrders, topicInfo[0]): + case privateSpotTradeOrders: return ku.processOrderChangeEvent(resp.Data, topicInfo[0]) - case strings.HasPrefix(accountBalanceChannel, topicInfo[0]): + case accountBalanceChannel: return ku.processAccountBalanceChange(resp.Data) - case strings.HasPrefix(marginPositionChannel, topicInfo[0]): + case marginPositionChannel: if resp.Subject == "debt.ratio" { var response WsDebtRatioChange return ku.processData(resp.Data, &response) } var response WsPositionStatus return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginLoanChannel, topicInfo[0]) && resp.Subject == "order.done": - var response WsMarginTradeOrderDoneEvent - return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginLoanChannel, topicInfo[0]): - return ku.processMarginLendingTradeOrderEvent(resp.Data) - case strings.HasPrefix(spotMarketAdvancedChannel, topicInfo[0]): - return ku.processStopOrderEvent(resp.Data) - case strings.HasPrefix(futuresTickerV2Channel, topicInfo[0]), - strings.HasPrefix(futuresTickerChannel, topicInfo[0]): - return ku.processFuturesTickerV2(resp.Data) - case strings.HasPrefix(futuresOrderbookLevel2Channel, topicInfo[0]): - if !fetchedFuturesSnapshotOrderbook[topicInfo[1]] { - fetchedFuturesSnapshotOrderbook[topicInfo[1]] = true - var enabledPairs currency.Pairs - enabledPairs, err = ku.GetEnabledPairs(asset.Futures) - if err != nil { - return err - } - var cp currency.Pair - cp, err = enabledPairs.DeriveFrom(topicInfo[1]) - if err != nil { - return err - } - var orderbooks *orderbook.Base - orderbooks, err = ku.FetchOrderbook(context.Background(), cp, asset.Futures) - if err != nil { - return err - } - err = ku.Websocket.Orderbook.LoadSnapshot(orderbooks) - if err != nil { - return err - } + case marginLoanChannel: + if resp.Subject == "order.done" { + var response WsMarginTradeOrderDoneEvent + return ku.processData(resp.Data, &response) + } else { + return ku.processMarginLendingTradeOrderEvent(resp.Data) } - return ku.processFuturesOrderbookLevel2(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresExecutionDataChannel, topicInfo[0]): + case spotMarketAdvancedChannel: + return ku.processStopOrderEvent(resp.Data) + case futuresTickerChannel: + return ku.processFuturesTickerV2(resp.Data) + case futuresExecutionDataChannel: var response WsFuturesExecutionData return ku.processData(resp.Data, &response) - case strings.HasPrefix(futuresOrderbookLevel2Depth5Channel, topicInfo[0]), - strings.HasPrefix(futuresOrderbookLevel2Depth50Channel, topicInfo[0]): - if !fetchedFuturesSnapshotOrderbook[topicInfo[1]] { - fetchedFuturesSnapshotOrderbook[topicInfo[1]] = true - var enabledPairs currency.Pairs - enabledPairs, err = ku.GetEnabledPairs(asset.Futures) - if err != nil { - return err - } - cp, err := enabledPairs.DeriveFrom(topicInfo[1]) - if err != nil { - return err - } - orderbooks, err := ku.FetchOrderbook(context.Background(), cp, asset.Futures) - if err != nil { - return err - } - err = ku.Websocket.Orderbook.LoadSnapshot(orderbooks) - if err != nil { - return err - } + case futuresOrderbookChannel: + if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil { + return err } - return ku.processFuturesOrderbookLevel5(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresContractMarketDataChannel, topicInfo[0]): - if resp.Subject == "mark.index.price" { + return ku.processFuturesOrderbookLevel2(resp.Data, topicInfo[1]) + case futuresOrderbookDepth5Channel, futuresOrderbookDepth50Channel: + if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil { + return err + } + return ku.processFuturesOrderbookSnapshot(resp.Data, topicInfo[1]) + case futuresContractMarketDataChannel: + switch resp.Subject { + case "mark.index.price": return ku.processFuturesMarkPriceAndIndexPrice(resp.Data, topicInfo[1]) - } else if resp.Subject == "funding.rate" { + case "funding.rate": return ku.processFuturesFundingData(resp.Data, topicInfo[1]) } - case strings.HasPrefix(futuresSystemAnnouncementChannel, topicInfo[0]): + case futuresSystemAnnouncementChannel: return ku.processFuturesSystemAnnouncement(resp.Data, resp.Subject) - case strings.HasPrefix(futuresTrasactionStatisticsTimerEventChannel, topicInfo[0]): + case futuresTrasactionStatisticsTimerEventChannel: return ku.processFuturesTransactionStatistics(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresTradeOrdersBySymbolChannel, topicInfo[0]), - strings.HasPrefix(futuresTradeOrderChannel, topicInfo[0]): + case futuresTradeOrderChannel: return ku.processFuturesPrivateTradeOrders(resp.Data) - case strings.HasPrefix(futuresStopOrdersLifecycleEventChannel, topicInfo[0]): + case futuresStopOrdersLifecycleEventChannel: return ku.processFuturesStopOrderLifecycleEvent(resp.Data) - case strings.HasPrefix(futuresAccountBalanceEventChannel, topicInfo[0]): + case futuresAccountBalanceEventChannel: switch resp.Subject { case "orderMargin.change": var response WsFuturesOrderMarginEvent @@ -337,15 +299,16 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { var response WsFuturesWithdrawalAmountAndTransferOutAmountEvent return ku.processData(resp.Data, &response) } - case strings.HasPrefix(futuresPositionChangeEventChannel, topicInfo[0]): - if resp.Subject == "position.change" { + case futuresPositionChangeEventChannel: + switch resp.Subject { + case "position.change": if resp.ChannelType == "private" { var response WsFuturesPosition return ku.processData(resp.Data, &response) } var response WsFuturesMarkPricePositionChanges return ku.processData(resp.Data, &response) - } else if resp.Subject == "position.settlement" { + case "position.settlement": var response WsFuturesPositionFundingSettlement return ku.processData(resp.Data, &response) } @@ -502,7 +465,30 @@ func (ku *Kucoin) processFuturesMarkPriceAndIndexPrice(respData []byte, instrume return nil } -func (ku *Kucoin) processFuturesOrderbookLevel5(respData []byte, instrument string) error { +// ensureFuturesOrderbookSnapshotLoaded makes sure an initial futures orderbook snapshot is loaded +func (ku *Kucoin) ensureFuturesOrderbookSnapshotLoaded(symbol string) error { + fetchedFuturesOrderbookMutex.Lock() + defer fetchedFuturesOrderbookMutex.Unlock() + if fetchedFuturesOrderbook[symbol] { + return nil + } + fetchedFuturesOrderbook[symbol] = true + enabledPairs, err := ku.GetEnabledPairs(asset.Futures) + if err != nil { + return err + } + cp, err := enabledPairs.DeriveFrom(symbol) + if err != nil { + return err + } + orderbooks, err := ku.FetchOrderbook(context.Background(), cp, asset.Futures) + if err != nil { + return err + } + return ku.Websocket.Orderbook.LoadSnapshot(orderbooks) +} + +func (ku *Kucoin) processFuturesOrderbookSnapshot(respData []byte, instrument string) error { response := WsOrderbookLevel5Response{} if err := json.Unmarshal(respData, &response); err != nil { return err @@ -527,7 +513,7 @@ func (ku *Kucoin) processFuturesOrderbookLevel5(respData []byte, instrument stri } func (ku *Kucoin) processFuturesOrderbookLevel2(respData []byte, instrument string) error { - resp := WsFuturesOrderbokInfo{} + resp := WsFuturesOrderbookInfo{} if err := json.Unmarshal(respData, &resp); err != nil { return err } @@ -959,40 +945,14 @@ func (ku *Kucoin) Unsubscribe(subscriptions subscription.List) error { return ku.manageSubscriptions(subscriptions, "unsubscribe") } -// expandManualSubscription takes a subscription list and expand all the subscriptions across the relevant assets and pairs -func (ku *Kucoin) expandManualSubscriptions(in subscription.List) (subscription.List, error) { - subs := make(subscription.List, 0, len(in)) - for _, s := range in { - if isSymbolChannel(s.Channel) { - if len(s.Pairs) == 0 { - return nil, errSubscriptionPairRequired - } - a := s.Asset - if !a.IsValid() { - a = getChannelsAssetType(s.Channel) - } - assetPairs := map[asset.Item]currency.Pairs{a: s.Pairs} - n, err := ku.expandSubscription(s, assetPairs) - if err != nil { - return nil, err - } - subs = append(subs, n...) - } else { - subs = append(subs, s) - } - } - return subs, nil -} - func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) error { var errs error - subs, errs = ku.expandManualSubscriptions(subs) for _, s := range subs { msgID := strconv.FormatInt(ku.Websocket.Conn.GenerateMessageID(false), 10) req := WsSubscriptionInput{ ID: msgID, Type: operation, - Topic: s.Channel, + Topic: s.QualifiedChannel, PrivateChannel: s.Authenticated, Response: true, } @@ -1003,6 +963,13 @@ func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) switch { case err != nil: errs = common.AppendError(errs, err) + case rType == "error": + code, _ := jsonparser.GetUnsafeString(respRaw, "code") + msg, msgErr := jsonparser.GetUnsafeString(respRaw, "data") + if msgErr != nil { + msg = "unknown error" + } + errs = common.AppendError(errs, fmt.Errorf("%s (%s)", msg, code)) case rType != "ack": errs = common.AppendError(errs, fmt.Errorf("%w: %s from %s", errInvalidMsgType, rType, respRaw)) default: @@ -1023,192 +990,28 @@ func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) return errs } -// getChannelsAssetType returns the asset type to which the subscription channel belongs to or asset.Empty -func getChannelsAssetType(channelName string) asset.Item { - switch channelName { - case futuresTickerV2Channel, futuresTickerChannel, futuresOrderbookLevel2Channel, futuresExecutionDataChannel, futuresOrderbookLevel2Depth5Channel, futuresOrderbookLevel2Depth50Channel, futuresContractMarketDataChannel, futuresSystemAnnouncementChannel, futuresTrasactionStatisticsTimerEventChannel, futuresTradeOrdersBySymbolChannel, futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel, futuresPositionChangeEventChannel: - return asset.Futures - case marketTickerChannel, marketAllTickersChannel, - marketSnapshotChannel, marketSymbolSnapshotChannel, - marketOrderbookLevel2Channels, marketOrderbookLevel2to5Channel, - marketOrderbokLevel2To50Channel, marketCandlesChannel, - marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, - privateSpotTradeOrders, accountBalanceChannel, spotMarketAdvancedChannel: - return asset.Spot - case marginFundingbookChangeChannel, marginPositionChannel, marginLoanChannel: - return asset.Margin - default: - return asset.Empty - } -} - // generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (ku *Kucoin) generateSubscriptions() (subscription.List, error) { - assetPairs := map[asset.Item]currency.Pairs{} - for _, a := range ku.GetAssetTypes(false) { - if p, err := ku.GetEnabledPairs(a); err == nil { - assetPairs[a] = p - } else { - assetPairs[a] = currency.Pairs{} // err is probably that Asset isn't enabled, but we don't care about errors of any type - } - } - authed := ku.Websocket.CanUseAuthenticatedEndpoints() - subscriptions := subscription.List{} - for _, s := range ku.Features.Subscriptions { - if !authed && s.Authenticated { - continue - } - subs, err := ku.expandSubscription(s, assetPairs) - if err != nil { - return nil, err - } - subscriptions = append(subscriptions, subs...) - } - return subscriptions, nil + return ku.Features.Subscriptions.ExpandTemplates(ku) } -// expandSubscription takes a subscription and expands it across the relevant assets and pairs passed in -func (ku *Kucoin) expandSubscription(baseSub *subscription.Subscription, assetPairs map[asset.Item]currency.Pairs) (subscription.List, error) { - var subscriptions = subscription.List{} - if baseSub == nil { - return nil, common.ErrNilPointer - } - s := baseSub.Clone() - s.Channel = channelName(s.Channel) - if !s.Asset.IsValid() { - s.Asset = getChannelsAssetType(s.Channel) - } - - if len(assetPairs[s.Asset]) == 0 { - return nil, nil - } - - switch { - case s.Channel == marginLoanChannel: - for _, c := range assetPairs[asset.Margin].GetCurrencies() { - i := s.Clone() - i.Channel = fmt.Sprintf(s.Channel, c) - subscriptions = append(subscriptions, i) - } - case s.Channel == marketCandlesChannel: - interval, err := ku.intervalToString(s.Interval) - if err != nil { - return nil, err - } - subs := spotOrMarginPairSubs(assetPairs, s, false, interval) - subscriptions = append(subscriptions, subs...) - case s.Channel == marginFundingbookChangeChannel: - s.Channel = fmt.Sprintf(s.Channel, assetPairs[asset.Margin].GetCurrencies().Join()) - subscriptions = append(subscriptions, s) - case s.Channel == marketSnapshotChannel: - subs, err := spotOrMarginCurrencySubs(assetPairs, s) - if err != nil { - return nil, err - } - subscriptions = append(subscriptions, subs...) - case getChannelsAssetType(s.Channel) == asset.Futures && isSymbolChannel(s.Channel): - for _, p := range assetPairs[asset.Futures] { - c, err := ku.FormatExchangeCurrency(p, asset.Futures) - if err != nil { - continue - } - i := s.Clone() - i.Channel = fmt.Sprintf(s.Channel, c) - subscriptions = append(subscriptions, i) - } - case isSymbolChannel(s.Channel): - // Subscriptions which can use a single comma-separated sub per asset - subs := spotOrMarginPairSubs(assetPairs, s, true) - subscriptions = append(subscriptions, subs...) - default: - subscriptions = append(subscriptions, s) - } - return subscriptions, nil -} - -// isSymbolChannel returns true it this channel path ends in a formatting %s to accept a Symbol -func isSymbolChannel(c string) bool { - return strings.HasSuffix(c, "%s") || strings.HasSuffix(c, "%v") -} - -// channelName converts global channel Names used in config of channel input into kucoin channel names -// returns the name unchanged if no match is found -func channelName(name string) string { - if s, ok := subscriptionNames[name]; ok { - return s - } - return name -} - -// spotOrMarginPairSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs -// If there's a Spot subscription, it won't be added again as a Margin subscription -// If joined param is true then one subscription per asset type with the currencies comma delimited -func spotOrMarginPairSubs(assetPairs map[asset.Item]currency.Pairs, b *subscription.Subscription, join bool, fmtArgs ...any) subscription.List { - subs := subscription.List{} - add := func(a asset.Item, pairs currency.Pairs) { - if len(pairs) == 0 { - return - } - if join { - f := append([]any{pairs.Join()}, fmtArgs...) - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, f...) - subs = append(subs, s) - } else { - for i := range pairs { - f := append([]any{pairs[i].String()}, fmtArgs...) - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, f...) - subs = append(subs, s) - } - } - } - - add(asset.Spot, assetPairs[asset.Spot]) - - marginPairs := currency.Pairs{} - for _, p := range assetPairs[asset.Margin] { - if !assetPairs[asset.Spot].Contains(p, false) { - marginPairs = marginPairs.Add(p) - } - } - add(asset.Margin, marginPairs) - - return subs -} - -// spotOrMarginCurrencySubs accepts a map of pairs and a template subscription and returns a list of subscriptions for every currency in Spot and Margin pairs -// If there's a Spot subscription, it won't be added again as a Margin subscription -func spotOrMarginCurrencySubs(assetPairs map[asset.Item]currency.Pairs, b *subscription.Subscription) (subscription.List, error) { - if b == nil { - return nil, common.ErrNilPointer - } - subs := subscription.List{} - add := func(a asset.Item, currs currency.Currencies) { - if len(currs) == 0 { - return - } - for _, c := range currs { - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, c) - subs = append(subs, s) - } - } - - add(asset.Spot, assetPairs[asset.Spot].GetCurrencies()) - - marginCurrencies := currency.Currencies{} - for _, c := range assetPairs[asset.Margin].GetCurrencies() { - if !assetPairs[asset.Spot].ContainsCurrency(c) { - marginCurrencies = marginCurrencies.Add(c) - } - } - add(asset.Margin, marginCurrencies) - - return subs, nil +// GetSubscriptionTemplate returns a subscription channel template +func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl"). + Funcs(template.FuncMap{ + "channelName": channelName, + "removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string { + spotPairs, _ := ku.GetEnabledPairs(asset.Spot) + return removeSpotFromMargin(s, ap, spotPairs) + }, + "isCurrencyChannel": isCurrencyChannel, + "isSymbolChannel": isSymbolChannel, + "channelInterval": channelInterval, + "assetCurrencies": assetCurrencies, + "joinPairsWithInterval": joinPairsWithInterval, + "batch": common.Batch[currency.Pairs], + }). + Parse(subTplText) } // orderbookManager defines a way of managing and maintaining synchronisation @@ -1777,3 +1580,158 @@ func (ku *Kucoin) CalculateAssets(topic string, cp currency.Pair) ([]asset.Item, return resp, nil } } + +// checkSubscriptions looks for any backwards incompatibilities with missing assets +// This should be unnecessary and removable by 2025 +func (ku *Kucoin) checkSubscriptions() { + upgraded := false + for _, s := range ku.Config.Features.Subscriptions { + if s.Asset != asset.Empty { + continue + } + upgraded = true + s.Channel = strings.TrimSuffix(s.Channel, ":%s") + switch s.Channel { + case subscription.TickerChannel, subscription.OrderbookChannel: + s.Asset = asset.All + case subscription.AllTradesChannel: + for _, d := range defaultSubscriptions { + if d.Channel == s.Channel { + ku.Config.Features.Subscriptions = append(ku.Config.Features.Subscriptions, d) + } + } + case futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel: + s.Asset = asset.Futures + case marginPositionChannel, marginLoanChannel: + s.Asset = asset.Margin + } + } + ku.Config.Features.Subscriptions = slices.DeleteFunc(ku.Config.Features.Subscriptions, func(s *subscription.Subscription) bool { + switch s.Channel { + case "/contractMarket/level2Depth50", // Replaced by subsctiption.Orderbook for asset.All + "/contractMarket/tickerV2", // Replaced by subscription.Ticker for asset.All + "/margin/fundingBook": // Deprecated and removed + return true + case subscription.AllTradesChannel: + return s.Asset == asset.Empty + } + return false + }) + if upgraded { + ku.Features.Subscriptions = subscription.List{} + for _, s := range ku.Config.Features.Subscriptions { + if s.Enabled { + ku.Features.Subscriptions = append(ku.Features.Subscriptions, s) + } + } + } +} + +// channelName returns the correct channel name for the asset +func channelName(s *subscription.Subscription, a asset.Item) string { + switch s.Channel { + case subscription.TickerChannel: + if a == asset.Futures { + return futuresTickerChannel + } + return marketTickerChannel + case subscription.OrderbookChannel: + if a == asset.Futures { + return futuresOrderbookDepth5Channel + } + return marketOrderbookDepth5Channel // This does not require a REST request to get the orderbook. + case subscription.CandlesChannel: + return marketCandlesChannel // No support in GCT yet for Futures candles + case subscription.AllTradesChannel: + return marketMatchChannel // No support in GCT yet for Futures all trades + } + return s.Channel +} + +// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints +func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string { + if strings.HasPrefix(s.Channel, "/margin") { + return "" + } + if p, ok := ap[asset.Margin]; ok { + ap[asset.Margin] = p.Remove(spotPairs...) + } + return "" +} + +// isSymbolChannel returns if the channel expects receive a symbol +func isSymbolChannel(s *subscription.Subscription) bool { + switch channelName(s, s.Asset) { + case privateSpotTradeOrders, accountBalanceChannel, marginPositionChannel, spotMarketAdvancedChannel, futuresSystemAnnouncementChannel, + futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel: + return false + } + return true +} + +// isCurrencyChannel returns if the channel expects receive a currency +func isCurrencyChannel(s *subscription.Subscription) bool { + return s.Channel == marginLoanChannel +} + +// channelInterval returns the channel interval if it has one +func channelInterval(s *subscription.Subscription) string { + if channelName(s, s.Asset) == marketCandlesChannel { + if i, err := intervalToString(s.Interval); err == nil { + return i + } + } + return "" +} + +// assetCurrencies returns the currencies from all pairs in an asset +// Updates the AssetPairs map parameter to contain only those currencies as Base items for expandTemplates to see +func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) currency.Currencies { + cs := common.SortStrings(ap[s.Asset].GetCurrencies()) + p := currency.Pairs{} + for _, c := range cs { + p = append(p, currency.Pair{Base: c}) + } + ap[s.Asset] = p + return cs +} + +// joinPairsWithInterval returns a list of currency pair symbols joined by comma +// If the subscription has a viable interval it's appended after each symbol +func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string { + out := make([]string, len(b)) + suffix, err := intervalToString(s.Interval) + if err == nil { + suffix = "_" + suffix + } + for i, p := range b { + out[i] = p.String() + suffix + } + return strings.Join(out, ",") +} + +const subTplText = ` +{{- removeSpotFromMargin $.S $.AssetPairs -}} +{{- if isCurrencyChannel $.S }} + {{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}} +{{- else if isSymbolChannel $.S }} + {{ range $asset, $pairs := $.AssetPairs }} + {{- with $name := channelName $.S $asset }} + {{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}} + {{- $name -}} :all + {{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}} + {{- $.BatchSize -}} {{ len $pairs }} + {{- else -}} + {{- range $b := batch $pairs 100 -}} + {{- $name -}} : {{- joinPairsWithInterval $b $.S -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 100 + {{- end }} + {{- end }} + {{ $.AssetSeparator }} + {{- end }} +{{- else -}} + {{ channelName $.S $.S.Asset }} +{{- end }} +` diff --git a/exchanges/kucoin/kucoin_wrapper.go b/exchanges/kucoin/kucoin_wrapper.go index d7cb0299..6cf00ffa 100644 --- a/exchanges/kucoin/kucoin_wrapper.go +++ b/exchanges/kucoin/kucoin_wrapper.go @@ -28,7 +28,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -144,21 +143,7 @@ func (ku *Kucoin) SetDefaults() { GlobalResultLimit: 1500, }, }, - Subscriptions: subscription.List{ - // Where we can we use generic names - {Enabled: true, Channel: subscription.TickerChannel}, // marketTickerChannel - {Enabled: true, Channel: subscription.AllTradesChannel}, // marketMatchChannel - {Enabled: true, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, // marketOrderbookLevel2Channels - {Enabled: true, Channel: futuresTickerV2Channel}, - {Enabled: true, Channel: futuresOrderbookLevel2Depth50Channel}, - {Enabled: true, Channel: marginFundingbookChangeChannel, Authenticated: true}, - {Enabled: true, Channel: accountBalanceChannel, Authenticated: true}, - {Enabled: true, Channel: marginPositionChannel, Authenticated: true}, - {Enabled: true, Channel: marginLoanChannel, Authenticated: true}, - {Enabled: true, Channel: futuresTradeOrderChannel, Authenticated: true}, - {Enabled: true, Channel: futuresStopOrdersLifecycleEventChannel, Authenticated: true}, - {Enabled: true, Channel: futuresAccountBalanceEventChannel, Authenticated: true}, - }, + Subscriptions: defaultSubscriptions.Clone(), } ku.Requester, err = request.New(ku.Name, common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout), @@ -197,6 +182,8 @@ func (ku *Kucoin) Setup(exch *config.Exchange) error { return err } + ku.checkSubscriptions() + wsRunningEndpoint, err := ku.API.Endpoints.GetURL(exchange.WebsocketSpot) if err != nil { return err @@ -1390,7 +1377,7 @@ func (ku *Kucoin) GetHistoricCandles(ctx context.Context, pair currency.Pair, a }) } case asset.Spot, asset.Margin: - intervalString, err := ku.intervalToString(interval) + intervalString, err := intervalToString(interval) if err != nil { return nil, err } @@ -1442,7 +1429,7 @@ func (ku *Kucoin) GetHistoricCandlesExtended(ctx context.Context, pair currency. } case asset.Spot, asset.Margin: var intervalString string - intervalString, err = ku.intervalToString(interval) + intervalString, err = intervalToString(interval) if err != nil { return nil, err } diff --git a/exchanges/kucoin/testdata/wsHandleData.json b/exchanges/kucoin/testdata/wsHandleData.json index 75f620a4..85d400b1 100644 --- a/exchanges/kucoin/testdata/wsHandleData.json +++ b/exchanges/kucoin/testdata/wsHandleData.json @@ -7,7 +7,6 @@ {"type":"message","topic":"/market/match:BTC-USDT","subject":"trade.l3match","data":{"sequence":"1545896669145","type":"match","symbol":"BTC-USDT","side":"buy","price":"0.08200000000000000000","size":"0.01022222000000000000","tradeId":"5c24c5da03aa673885cd67aa","takerOrderId":"5c24c5d903aa6772d55b371e","makerOrderId":"5c2187d003aa677bd09d5c93","time":"1545913818099033203"}} {"id":"","type":"message","topic":"/indicator/index:USDT-BTC","subject":"tick","data":{"symbol":"USDT-BTC","granularity":5000,"timestamp":1551770400000,"value":0.0001092}} {"type":"message","topic":"/indicator/markPrice:USDT-BTC","subject":"tick","data":{"symbol":"USDT-BTC","granularity":5000,"timestamp":1551770400000,"value":0.0001093}} -{"type":"message","topic":"/margin/fundingBook:USDT","subject":"funding.update","data":{"annualIntRate":0.0547,"currency":"USDT","dailyIntRate":0.00015,"sequence":87611418,"side":"lend","size":25040,"term":7,"ts":1671005721087508700}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"buy","orderId":"5efab07953bdea00089965d2","type":"open","orderTime":1593487481683297800,"size":"0.1","filledSize":"0","price":"0.937","clientOid":"1593487481000906","remainSize":"0.1","status":"open","ts":1593487481683297800}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"sell","orderId":"5efab07953bdea00089965fa","liquidity":"taker","type":"match","orderTime":1593487482038606000,"size":"0.1","filledSize":"0.1","price":"0.938","matchPrice":"0.96738","matchSize":"0.1","tradeId":"5efab07a4ee4c7000a82d6d9","clientOid":"1593487481000313","remainSize":"0","status":"match","ts":1593487482038606000}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"sell","orderId":"5efab07953bdea00089965fa","type":"filled","orderTime":1593487482038606000,"size":"0.1","filledSize":"0.1","price":"0.938","clientOid":"1593487481000313","remainSize":"0","status":"done","ts":1593487482038606000}} @@ -21,7 +20,6 @@ {"type":"message","topic":"/margin/loan:BTC","subject":"order.done","channelType":"private","data":{"currency":"BTC","orderId":"ac928c66ca53498f9c13a127a60e8","reason":"filled","side":"lend","ts":1553846081210005000}} {"type":"message","topic":"/spotMarket/advancedOrders","subject":"stopOrder","channelType":"private","data":{"createdAt":1589789942337,"orderId":"5ec244f6a8a75e0009958237","orderPrice":"0.00062","orderType":"stop","side":"sell","size":"1","stop":"entry","stopPrice":"0.00062","symbol":"KCS-BTC","tradeType":"TRADE","triggerSuccess":true,"ts":1589790121382281200,"type":"triggered"}} {"subject":"tickerV2","topic":"/contractMarket/tickerV2:ETHUSDCM","data":{"symbol":"ETHUSDCM","bestBidSize":795,"bestBidPrice":3200,"bestAskPrice":3600,"bestAskSize":284,"ts":1553846081210005000}} -{"subject":"ticker","topic":"/contractMarket/ticker:ETHUSDCM","data":{"symbol":"ETHUSDCM","sequence":45,"side":"sell","price":3600,"size":16,"tradeId":"5c9dcf4170744d6f5a3d32fb","bestBidSize":795,"bestBidPrice":3200,"bestAskPrice":3600,"bestAskSize":284,"ts":1553846081210005000}} {"subject":"level2","topic":"/contractMarket/level2:ETHUSDCM","type":"message","data":{"sequence":18,"change":"5000.0,sell,83","timestamp":1551770400000}} {"type":"message","topic":"/contractMarket/execution:ETHUSDCM","subject":"match","data":{"makerUserId":"6287c3015c27f000017d0c2f","symbol":"ETHUSDCM","sequence":31443494,"side":"buy","size":35,"price":23083,"takerOrderId":"63f94040839d00000193264b","makerOrderId":"63f94036839d0000019310c3","takerUserId":"6133f817230d8d000607b941","tradeId":"63f940400000650065f4996f","ts":1677279296134648800}} {"type":"message","topic":"/contractMarket/level2Depth5:ETHUSDCM","subject":"level2","data":{"sequence":1672332328701,"asks":[[23149,13703],[23150,1460],[23151,941],[23152,4591],[23153,4107]],"bids":[[23148,22801],[23147,4766],[23146,1388],[23145,2593],[23144,6286]],"ts":1677280435684,"timestamp":1677280435684}} diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index 60514d6c..877c25d1 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -122,8 +122,7 @@ func (s *Service) DeployDepth(exchange string, p currency.Pair, a asset.Item) (* return book, nil } -// GetDepth returns the actual depth struct for potential subsystems and -// strategies to interact with +// GetDepth returns the actual depth struct for potential subsystems and strategies to interact with func (s *Service) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { s.mu.Lock() defer s.mu.Unlock() @@ -139,9 +138,7 @@ func (s *Service) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Dep Asset: a, }] if !ok { - return nil, fmt.Errorf("%w associated with base currency %s", - errCannotFindOrderbook, - p.Quote) + return nil, fmt.Errorf("%w associated with base currency %s", errCannotFindOrderbook, p.Quote) } return book, nil } @@ -160,9 +157,7 @@ func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Bas defer s.mu.Unlock() m1, ok := s.books[strings.ToLower(exchange)] if !ok { - return nil, fmt.Errorf("%w for %s exchange", - errCannotFindOrderbook, - exchange) + return nil, fmt.Errorf("%w for %s exchange", errCannotFindOrderbook, exchange) } book, ok := m1.m[key.PairAsset{ Base: p.Base.Item, @@ -170,9 +165,7 @@ func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Bas Asset: a, }] if !ok { - return nil, fmt.Errorf("%w associated with base currency %s", - errCannotFindOrderbook, - p.Quote) + return nil, fmt.Errorf("%w associated with base currency %s", errCannotFindOrderbook, p.Quote) } return book.Retrieve() } diff --git a/exchanges/subscription/README.md b/exchanges/subscription/README.md index ff4d7a6f..bbc9492c 100644 --- a/exchanges/subscription/README.md +++ b/exchanges/subscription/README.md @@ -38,12 +38,30 @@ The template is provided with a single context structure: AssetPairs map[asset.Item]currency.Pairs AssetSeparator string PairSeparator string + BatchSize string ``` Subscriptions may fan out many channels for assets and pairs, to support exchanges which require individual subscriptions. -To allow the template to communicate how to handle its output it should use the provided separators: +To allow the template to communicate how to handle its output it should use the provided directives: - AssetSeparator should be added at the end of each section related to assets - PairSeparator should be added at the end of each pair +- BatchSize should be added with a number directly before AssetSeparator to indicate pairs have been batched + +Example: +``` +{{- range $asset, $pairs := $.AssetPairs }} + {{- range $b := batch $pairs 30 -}} + {{- $.S.Channel -}} : {{- $b.Join -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 30 + {{- $.AssetSeparator }} +{{- end }} +``` + +Assets and pairs should be output in the sequence in AssetPairs since text/template range function uses an sorted order for map keys. + +Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription We use separators like this because it allows mono-templates to decide at runtime whether to fan out. diff --git a/exchanges/subscription/fixtures_test.go b/exchanges/subscription/fixtures_test.go index 918c1f81..32683472 100644 --- a/exchanges/subscription/fixtures_test.go +++ b/exchanges/subscription/fixtures_test.go @@ -8,26 +8,46 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) type mockEx struct { + pairs currency.Pairs + assets asset.Items tpl string auth bool errPairs error errFormat error } +func newMockEx() *mockEx { + pairs := currency.Pairs{btcusdtPair, ethusdcPair} + for _, b := range []currency.Code{currency.LTC, currency.XRP, currency.TRX} { + for _, q := range []currency.Code{currency.USDT, currency.USDC} { + pairs = append(pairs, currency.NewPair(b, q)) + } + } + + return &mockEx{ + assets: asset.Items{asset.Spot, asset.Futures}, + pairs: pairs, + } +} + func (m *mockEx) GetEnabledPairs(_ asset.Item) (currency.Pairs, error) { - return currency.Pairs{btcusdtPair, ethusdcPair}, m.errPairs + return m.pairs, m.errPairs } func (m *mockEx) GetPairFormat(_ asset.Item, _ bool) (currency.PairFormat, error) { return currency.PairFormat{Uppercase: true}, m.errFormat } -func (m *mockEx) GetSubscriptionTemplate(_ *Subscription) (*template.Template, error) { +func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, error) { + if s.Channel == "nil" { + return nil, nil + } return template.New(m.tpl). Funcs(template.FuncMap{ "assetName": func(a asset.Item) string { @@ -35,11 +55,18 @@ func (m *mockEx) GetSubscriptionTemplate(_ *Subscription) (*template.Template, e return "future" } return a.String() - }}). + }, + "updateAssetPairs": func(ap assetPairs) string { + ap[asset.Futures] = nil + ap[asset.Spot] = ap[asset.Spot][0:1] + return "" + }, + "batch": common.Batch[currency.Pairs], + }). ParseFiles("testdata/" + m.tpl) } -func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return asset.Items{asset.Spot, asset.Futures} } +func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return m.assets } func (m *mockEx) CanUseAuthenticatedWebsocketEndpoints() bool { return m.auth } // equalLists is a utility function to compare subscription lists and show a pretty failure message diff --git a/exchanges/subscription/list.go b/exchanges/subscription/list.go index 51d6edfa..f250010d 100644 --- a/exchanges/subscription/list.go +++ b/exchanges/subscription/list.go @@ -85,7 +85,7 @@ func fillAssetPairs(ap assetPairs, a asset.Item, e iExchange) error { if err != nil { return err } - ap[a] = p.Format(f) + ap[a] = common.SortStrings(p.Format(f)) return nil } diff --git a/exchanges/subscription/list_test.go b/exchanges/subscription/list_test.go index 08eba5da..cd92d171 100644 --- a/exchanges/subscription/list_test.go +++ b/exchanges/subscription/list_test.go @@ -83,13 +83,15 @@ func TestListSetStates(t *testing.T) { // All other code is covered under TestExpandTemplates func TestAssetPairs(t *testing.T) { t.Parallel() - expErr := errors.New("Krypton is gone") for _, a := range []asset.Item{asset.Spot, asset.All} { + e := newMockEx() l := &List{{Channel: CandlesChannel, Asset: a}} - _, err := l.assetPairs(&mockEx{errPairs: expErr}) - assert.ErrorIs(t, err, expErr, "Should error correctly on GetEnabledPairs") - _, err = l.assetPairs(&mockEx{errFormat: expErr}) - assert.ErrorIs(t, err, expErr, "Should error correctly on GetPairFormat") + e.errFormat = errors.New("Krypton is back") + _, err := l.assetPairs(e) + assert.ErrorIs(t, err, e.errFormat, "Should error correctly on GetPairFormat") + e.errPairs = errors.New("Krypton is gone") + _, err = l.assetPairs(e) + assert.ErrorIs(t, err, e.errPairs, "Should error correctly on GetEnabledPairs") } } diff --git a/exchanges/subscription/template.go b/exchanges/subscription/template.go index fc67d2ec..ab0a18b4 100644 --- a/exchanges/subscription/template.go +++ b/exchanges/subscription/template.go @@ -4,25 +4,29 @@ import ( "bytes" "errors" "fmt" + "maps" "slices" + "strconv" "strings" - "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) const ( + deviceControl = "\x11" groupSeparator = "\x1D" recordSeparator = "\x1E" ) var ( - errInvalidAssetExpandPairs = errors.New("subscription template containing PairSeparator with must contain either specific Asset or AssetSeparator") - errAssetRecords = errors.New("subscription template did not generate the expected number of asset records") - errPairRecords = errors.New("subscription template did not generate the expected number of pair records") - errAssetTemplateWithoutAll = errors.New("sub.Asset must be set to All if AssetSeparator is used in Channel template") - errNoTemplateContent = errors.New("subscription template did not generate content") - errInvalidTemplate = errors.New("GetSubscriptionTemplate did not return a template") + errInvalidAssetExpandPairs = errors.New("subscription template containing PairSeparator with must contain either specific Asset or AssetSeparator") + errAssetRecords = errors.New("subscription template did not generate the expected number of asset records") + errPairRecords = errors.New("subscription template did not generate the expected number of pair records") + errTooManyBatchSizePerAsset = errors.New("more than one BatchSize directive inside an AssetSeparator") + errAssetTemplateWithoutAll = errors.New("sub.Asset must be set to All if AssetSeparator is used in Channel template") + errNoTemplateContent = errors.New("subscription template did not generate content") + errInvalidTemplate = errors.New("GetSubscriptionTemplate did not return a template") ) type tplCtx struct { @@ -30,6 +34,7 @@ type tplCtx struct { AssetPairs assetPairs PairSeparator string AssetSeparator string + BatchSize string } // ExpandTemplates returns a list of Subscriptions with Template expanded @@ -63,86 +68,132 @@ func (l List) ExpandTemplates(e iExchange) (List, error) { assets = append(assets, k) } slices.Sort(assets) // text/template ranges maps in sorted order + + subs := List{} + for _, s := range l { + expanded, err2 := expandTemplate(e, s, maps.Clone(ap), assets) + if err2 != nil { + err = common.AppendError(err, fmt.Errorf("%s: %w", s, err2)) + } else { + subs = append(subs, expanded...) + } + } + + return subs, err +} + +func expandTemplate(e iExchange, s *Subscription, ap assetPairs, assets asset.Items) (List, error) { + if s.QualifiedChannel != "" { + return List{s}, nil + } + + t, err := e.GetSubscriptionTemplate(s) + if err != nil { + return nil, err + } + if t == nil { + return nil, errInvalidTemplate + } + + subCtx := &tplCtx{ + S: s, + PairSeparator: recordSeparator, + AssetSeparator: groupSeparator, + BatchSize: deviceControl + "BS", + } + subs := List{} - for _, s := range l { - if s.QualifiedChannel != "" { - subs = append(subs, s) + switch s.Asset { + case asset.All: + subCtx.AssetPairs = ap + default: + // This deliberately includes asset.Empty to harmonise handling + subCtx.AssetPairs = assetPairs{ + s.Asset: ap[s.Asset], + } + assets = asset.Items{s.Asset} + if s.Asset != asset.Empty && len(ap[s.Asset]) == 0 { + return List{}, nil // Nothing is enabled for this sub asset + } + } + + if len(s.Pairs) != 0 { + for a, pairs := range subCtx.AssetPairs { + if err := pairs.ContainsAll(s.Pairs, true); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign + return nil, err + } + subCtx.AssetPairs[a] = s.Pairs + } + } + + buf := &bytes.Buffer{} + if err := t.Execute(buf, subCtx); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign + return nil, err + } + + out := strings.TrimSpace(buf.String()) + + // Remove a single trailing AssetSeparator; don't use a cutset to avoid removing 2 or more + out = strings.TrimSpace(strings.TrimSuffix(out, subCtx.AssetSeparator)) + + assetRecords := strings.Split(out, subCtx.AssetSeparator) + if len(assetRecords) != len(assets) { + return nil, fmt.Errorf("%w: Got %d; Expected %d", errAssetRecords, len(assetRecords), len(assets)) + } + + for i, assetChannels := range assetRecords { + a := assets[i] + pairs := subCtx.AssetPairs[a] + + xpandPairs := strings.Contains(assetChannels, subCtx.PairSeparator) + + /* Batching: + - We start by assuming we'll get 1 batch sized to contain all pairs. Maybe a comma-separated list, or just the asset name + - If a BatchSize directive is found, we expect it to come right at the end, and be followed by the batch size as a number + - We'll then split into N batches of that size + - If no batchSize was declared, but we saw a PairSeparator, then we expect to see one line per pair, so batchSize is 1 + */ + batchSize := len(pairs) + if b := strings.Split(assetChannels, subCtx.BatchSize); len(b) > 2 { + return nil, fmt.Errorf("%w for %s", errTooManyBatchSizePerAsset, a) + } else if len(b) == 2 { + assetChannels = b[0] + if batchSize, err = strconv.Atoi(strings.TrimSpace(b[1])); err != nil { + return nil, fmt.Errorf("%s: %w", s, common.GetTypeAssertError("int", b[1], "batchSize")) + } + } else if xpandPairs { + batchSize = 1 + } + + // Trim space, then only one pair separator, then any more space. + assetChannels = strings.TrimSpace(strings.TrimSuffix(strings.TrimSpace(assetChannels), subCtx.PairSeparator)) + + if assetChannels == "" { continue } - subCtx := &tplCtx{ - S: s, - AssetPairs: ap, - PairSeparator: recordSeparator, - AssetSeparator: groupSeparator, + batches := common.Batch(pairs, batchSize) + + pairLines := strings.Split(assetChannels, subCtx.PairSeparator) + + if s.Asset != asset.Empty && len(pairLines) != len(batches) { + // The number of lines we get generated must match the number of pair batches we expect + return nil, fmt.Errorf("%w for %s: Got %d; Expected %d", errPairRecords, a, len(pairLines), len(batches)) } - t, err := e.GetSubscriptionTemplate(s) - if err != nil { - return nil, err - } - if t == nil { - return nil, errInvalidTemplate - } - - buf := &bytes.Buffer{} - if err := t.Execute(buf, subCtx); err != nil { - return nil, err - } - - out := buf.String() - - subAssets := assets - xpandPairs := strings.Contains(out, subCtx.PairSeparator) - if xpandAssets := strings.Contains(out, subCtx.AssetSeparator); xpandAssets { - if s.Asset != asset.All { - return nil, errAssetTemplateWithoutAll + for j, channel := range pairLines { + c := s.Clone() + c.Asset = a + channel = strings.TrimSpace(channel) + if channel == "" { + return nil, fmt.Errorf("%w for %s: %s", errNoTemplateContent, a, s) } - } else { - if xpandPairs && (s.Asset == asset.All || s.Asset == asset.Empty) { - // We don't currently support expanding Pairs without expanding Assets for All or Empty assets, but we could; waiting for a use-case - return nil, errInvalidAssetExpandPairs - } - // No expansion so update expected Assets for consistent behaviour below - subAssets = []asset.Item{s.Asset} - } - - out = strings.TrimRight(out, " \n\r\t"+subCtx.PairSeparator+subCtx.AssetSeparator) - - assetRecords := strings.Split(out, subCtx.AssetSeparator) - if len(assetRecords) != len(subAssets) { - return nil, fmt.Errorf("%w: Got %d; Expected %d", errAssetRecords, len(assetRecords), len(subAssets)) - } - - for i, assetChannels := range assetRecords { - a := subAssets[i] - assetChannels = strings.TrimRight(assetChannels, " \n\r\t"+recordSeparator) - pairLines := strings.Split(assetChannels, subCtx.PairSeparator) - pairs, ok := ap[a] - if xpandPairs { - if !ok { - return nil, fmt.Errorf("%w: %s", asset.ErrInvalidAsset, a) - } - if len(pairLines) != len(pairs) { - return nil, fmt.Errorf("%w: Got %d; Expected %d", errPairRecords, len(pairLines), len(pairs)) - } - } - for j, channel := range pairLines { - c := s.Clone() - c.Asset = a - channel = strings.TrimSpace(channel) - if channel == "" { - return nil, fmt.Errorf("%w: %s", errNoTemplateContent, s) - } - c.QualifiedChannel = strings.TrimSpace(channel) - if xpandPairs { - c.Pairs = currency.Pairs{pairs[j]} - } else { - c.Pairs = pairs - } - subs = append(subs, c) + c.QualifiedChannel = channel + if s.Asset != asset.Empty { + c.Pairs = batches[j] } + subs = append(subs, c) } } diff --git a/exchanges/subscription/template_test.go b/exchanges/subscription/template_test.go index 9df65b81..f14e851e 100644 --- a/exchanges/subscription/template_test.go +++ b/exchanges/subscription/template_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" @@ -15,29 +16,41 @@ import ( func TestExpandTemplates(t *testing.T) { t.Parallel() - e := &mockEx{ - tpl: "subscriptions.tmpl", - } + e := newMockEx() + e.tpl = "subscriptions.tmpl" // Functionality tests l := List{ - {Channel: "feature1"}, - {Channel: "feature2", Asset: asset.All, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature3", Asset: asset.All, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Levels: 100}, - {Channel: "feature4", Authenticated: true}, - {Channel: "feature1", QualifiedChannel: "just one sub already processed"}, + {Channel: "single-channel"}, + {Channel: "expand-assets", Asset: asset.All, Interval: kline.FifteenMin}, + {Channel: "expand-pairs", Asset: asset.All, Levels: 1}, + {Channel: "expand-pairs", Asset: asset.Spot, Levels: 2}, + {Channel: "single-channel", QualifiedChannel: "just one sub already processed"}, + {Channel: "update-asset-pairs", Asset: asset.All}, + {Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[0:2], Levels: 3}, + {Channel: "batching", Asset: asset.Spot}, + {Channel: "single-channel", Authenticated: true}, } got, err := l.ExpandTemplates(e) require.NoError(t, err, "ExpandTemplates must not error") exp := List{ - {Channel: "feature1", QualifiedChannel: "feature1"}, - {Channel: "feature2", QualifiedChannel: "spot-feature2@15m", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature2", QualifiedChannel: "future-feature2@15m", Asset: asset.Futures, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature3", QualifiedChannel: "spot-USDTBTC-feature3@100", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "spot-USDCETH-feature3@100", Asset: asset.Spot, Pairs: currency.Pairs{ethusdcPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "future-USDTBTC-feature3@100", Asset: asset.Futures, Pairs: currency.Pairs{btcusdtPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "future-USDCETH-feature3@100", Asset: asset.Futures, Pairs: currency.Pairs{ethusdcPair}, Levels: 100}, - {Channel: "feature1", QualifiedChannel: "just one sub already processed"}, + {Channel: "single-channel", QualifiedChannel: "single-channel"}, + {Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.FifteenMin}, + {Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs, Interval: kline.FifteenMin}, + {Channel: "single-channel", QualifiedChannel: "just one sub already processed"}, + {Channel: "update-asset-pairs", QualifiedChannel: "spot-btcusdt-update-asset-pairs", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}}, + {Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[:1], Levels: 3}, + {Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[1:2], Levels: 3}, + } + for _, p := range e.pairs { + exp = append(exp, List{ + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 1}, + {Channel: "expand-pairs", QualifiedChannel: "future-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Futures, Pairs: currency.Pairs{p}, Levels: 1}, + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@2", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 2}, + }...) + } + for _, b := range common.Batch(common.SortStrings(e.pairs), 3) { + exp = append(exp, &Subscription{Channel: "batching", QualifiedChannel: "spot-" + b.Join() + "-batching", Asset: asset.Spot, Pairs: b}) } if !equalLists(t, exp, got) { @@ -48,36 +61,58 @@ func TestExpandTemplates(t *testing.T) { got, err = l.ExpandTemplates(e) require.NoError(t, err, "ExpandTemplates must not error") exp = append(exp, - &Subscription{Channel: "feature4", QualifiedChannel: "feature4-authed"}, + &Subscription{Channel: "single-channel", QualifiedChannel: "single-channel-authed"}, ) equalLists(t, exp, got) - _, err = List{{Channel: "feature2", Asset: asset.Spot}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errAssetTemplateWithoutAll, "Should error correctly on xpand assets without All") + // Test with just one asset to ensure asset.All works, and disabled assets don't error + e.assets = e.assets[:1] + l = List{ + {Channel: "expand-assets", Asset: asset.All, Interval: kline.OneHour}, + {Channel: "expand-pairs", Asset: asset.All, Levels: 4}, + {Channel: "single-channel", Asset: asset.Futures}, + } + got, err = l.ExpandTemplates(e) + require.NoError(t, err, "ExpandTemplates must not error") + exp = List{ + {Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.OneHour}, + } + for _, p := range e.pairs { + exp = append(exp, List{ + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@4", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 4}, + }...) + } + equalLists(t, exp, got) + + // Error cases + _, err = List{{Channel: "nil"}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errInvalidTemplate, "Should get correct error on nil template") + + _, err = List{{Channel: "single-channel", Asset: asset.Spot, Pairs: currency.Pairs{currency.NewPairWithDelimiter("NOPE", "POPE", "🐰")}}}.ExpandTemplates(e) + assert.ErrorIs(t, err, currency.ErrPairNotContainedInAvailablePairs, "Should error correctly when pair not available") e.tpl = "errors.tmpl" - _, err = List{{Channel: "error1", Asset: asset.All}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errInvalidAssetExpandPairs, "Should error correctly on xpand pairs but not assets") _, err = List{{Channel: "error1"}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errInvalidAssetExpandPairs, "Should error correctly on xpand pairs but not assets") - - _, err = List{{Channel: "error2"}}.ExpandTemplates(e) assert.ErrorContains(t, err, "wrong number of args for String", "Should error correctly with execution error") - _, err = List{{Channel: "non-existent"}}.ExpandTemplates(e) + _, err = List{{Channel: "empty-content", Asset: asset.Spot}}.ExpandTemplates(e) assert.ErrorIs(t, err, errNoTemplateContent, "Should error correctly when no content generated") - assert.ErrorContains(t, err, "non-existent", "Should error correctly when no content generated") + assert.ErrorContains(t, err, "empty-content", "Should error correctly when no content generated") - _, err = List{{Channel: "error3", Asset: asset.All}}.ExpandTemplates(e) + _, err = List{{Channel: "error2", Asset: asset.All}}.ExpandTemplates(e) assert.ErrorIs(t, err, errAssetRecords, "Should error correctly when invalid number of asset entries") - _, err = List{{Channel: "error4", Asset: asset.Spot}}.ExpandTemplates(e) + _, err = List{{Channel: "error3", Asset: asset.Spot}}.ExpandTemplates(e) assert.ErrorIs(t, err, errPairRecords, "Should error correctly when invalid number of pair entries") - _, err = List{{Channel: "error4", Asset: asset.Margin}}.ExpandTemplates(e) - assert.ErrorIs(t, err, asset.ErrInvalidAsset, "Should error correctly when invalid asset") + _, err = List{{Channel: "error4", Asset: asset.Spot}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errTooManyBatchSizePerAsset, "Should error correctly when too many BatchSize directives") + _, err = List{{Channel: "error5", Asset: asset.Spot}}.ExpandTemplates(e) + assert.ErrorIs(t, err, common.ErrTypeAssertFailure, "Should error correctly when batch size isn't an int") + + e.tpl = "parse-error.tmpl" e.tpl = "parse-error.tmpl" _, err = l.ExpandTemplates(e) assert.ErrorContains(t, err, "function \"explode\" not defined", "Should error correctly on unparsable template") @@ -90,10 +125,13 @@ func TestExpandTemplates(t *testing.T) { _, err = l.ExpandTemplates(e) assert.ErrorIs(t, err, e.errPairs, "Should error correctly on GetEnabledPairs") - l = List{{Channel: "feature1", QualifiedChannel: "already happy"}} + l = List{{Channel: "single-channel", QualifiedChannel: "already happy"}} got, err = l.ExpandTemplates(e) require.NoError(t, err) require.Len(t, got, 1, "Must get back the one sub") assert.Equal(t, "already happy", l[0].QualifiedChannel, "Should get back the one sub") assert.NotSame(t, got, l, "Should get back a different actual list") + + _, err = List{{Channel: "nil"}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errInvalidTemplate, "Should get correct error on nil template") } diff --git a/exchanges/subscription/testdata/errors.tmpl b/exchanges/subscription/testdata/errors.tmpl index a086e0e9..3705d6c8 100644 --- a/exchanges/subscription/testdata/errors.tmpl +++ b/exchanges/subscription/testdata/errors.tmpl @@ -1,13 +1,36 @@ {{- if eq .S.Channel "error1" }} - {{/* Error 1: Expand pairs but not assets, without specific asset */}} - {{- .PairSeparator -}} + {{/* Runtime error from executing */}} + {{ .S.String 42 }} {{- else if eq .S.Channel "error2" }} - {{/* Error 2: Runtime error from executing */}} - {{ .S.String 42 }} + {{/* Incorrect number of asset entries */}} + {{- .AssetSeparator -}} + {{- .AssetSeparator -}} + {{- .AssetSeparator -}} {{- else if eq .S.Channel "error3" }} - {{/* Error 3: Incorrect number of asset entries */}} - {{- .AssetSeparator }} + {{/* Incorrect number of pair entries */}} + {{- .PairSeparator -}} + {{- .PairSeparator -}} + {{- .PairSeparator -}} {{- else if eq .S.Channel "error4" }} - {{/* Error 3: Incorrect number of pair entries */}} - {{- .PairSeparator }} + {{/* Too many BatchSize commands */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- $pairs.Join -}} + {{- $.BatchSize -}}1 + {{- $.BatchSize -}}2 + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq .S.Channel "error5" }} + {{/* BatchSize without number */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- $pairs.Join -}} + {{- $.BatchSize -}} + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq .S.Channel "empty-content" }} + {{/* Empty response for the pair */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- range $pair := $pairs -}} + {{- $.PairSeparator -}} + {{- end -}} + {{- end -}} {{- end -}} diff --git a/exchanges/subscription/testdata/subscriptions.tmpl b/exchanges/subscription/testdata/subscriptions.tmpl index b223c146..50a7345d 100644 --- a/exchanges/subscription/testdata/subscriptions.tmpl +++ b/exchanges/subscription/testdata/subscriptions.tmpl @@ -1,21 +1,36 @@ -{{- if eq $.S.Channel "feature1" -}} - {{/* Case 1: One channel to rule them all */}} - feature1 -{{- else if eq $.S.Channel "feature2" -}} - {{/* Case 2: One channel per asset */}} - {{- range $asset, $pairs := $.AssetPairs }} - {{ assetName $asset }}-feature2@ {{- $.S.Interval.Short }} - {{- $.AssetSeparator }} - {{- end }} -{{- else if eq $.S.Channel "feature3" }} - {{/* Case 3: One channel per pair per asset */}} - {{- range $asset, $pairs := $.AssetPairs }} - {{- range $pair := $pairs -}} - {{ assetName $asset }}-{{ $pair.Swap.String -}} -feature3@ {{- $.S.Levels }} - {{- $.PairSeparator -}} +{{- if eq $.S.Channel "single-channel" -}} + single-channel + {{- if $.S.Authenticated -}} + -authed {{- end -}} - {{- $.AssetSeparator -}} - {{- end -}} -{{- else if eq $.S.Channel "feature4" }} - feature4-authed +{{- else if eq $.S.Channel "expand-assets" -}} + {{- range $asset, $pairs := $.AssetPairs }} + {{ assetName $asset }}-expand-assets@ {{- $.S.Interval.Short }} + {{- $.AssetSeparator }} + {{- end }} +{{- else if eq $.S.Channel "expand-pairs" }} + {{- range $asset, $pairs := $.AssetPairs }} + {{- range $pair := $pairs -}} + {{ assetName $asset }}-{{ $pair.Swap.String -}} -expand-pairs@ {{- $.S.Levels }} + {{- $.PairSeparator -}} + {{- end -}} + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq $.S.Channel "update-asset-pairs" }} + {{- updateAssetPairs $.AssetPairs -}} + spot-btcusdt-update-asset-pairs + {{- $.PairSeparator -}} + {{- $.AssetSeparator -}} + {{/* futures doesn't output anything, but we need an asset separator, so this previous one must not be stripped */}} + {{- $.AssetSeparator -}} +{{- else if eq $.S.Channel "batching" }} + {{- range $asset, $pairs := $.AssetPairs }} + {{- if eq $asset.String "spot" }} + {{- range $batch := batch $pairs 3 -}} + {{ assetName $asset }}-{{ $batch.Join -}} -batching + {{- $.PairSeparator -}} + {{- end -}} + {{- $.BatchSize -}} 3 + {{- end }} + {{- end -}} {{- end -}}