diff --git a/README.md b/README.md index 5b885954..be393b5a 100644 --- a/README.md +++ b/README.md @@ -142,12 +142,12 @@ Binaries will be published once the codebase reaches a stable condition. |User|Contribution Amount| |--|--| -| [thrasher-](https://github.com/thrasher-) | 690 | -| [shazbert](https://github.com/shazbert) | 330 | -| [dependabot[bot]](https://github.com/apps/dependabot) | 287 | +| [thrasher-](https://github.com/thrasher-) | 692 | +| [shazbert](https://github.com/shazbert) | 333 | +| [dependabot[bot]](https://github.com/apps/dependabot) | 293 | | [gloriousCode](https://github.com/gloriousCode) | 234 | | [dependabot-preview[bot]](https://github.com/apps/dependabot-preview) | 88 | -| [gbjk](https://github.com/gbjk) | 76 | +| [gbjk](https://github.com/gbjk) | 80 | | [xtda](https://github.com/xtda) | 47 | | [lrascao](https://github.com/lrascao) | 27 | | [Beadko](https://github.com/Beadko) | 17 | @@ -162,8 +162,8 @@ Binaries will be published once the codebase reaches a stable condition. | [marcofranssen](https://github.com/marcofranssen) | 8 | | [140am](https://github.com/140am) | 8 | | [TaltaM](https://github.com/TaltaM) | 6 | +| [cranktakular](https://github.com/cranktakular) | 6 | | [dackroyd](https://github.com/dackroyd) | 5 | -| [cranktakular](https://github.com/cranktakular) | 5 | | [khcchiu](https://github.com/khcchiu) | 5 | | [yangrq1018](https://github.com/yangrq1018) | 4 | | [woshidama323](https://github.com/woshidama323) | 3 | diff --git a/cmd/documentation/exchanges_templates/kucoin.tmpl b/cmd/documentation/exchanges_templates/kucoin.tmpl new file mode 100644 index 00000000..1eb9ca85 --- /dev/null +++ b/cmd/documentation/exchanges_templates/kucoin.tmpl @@ -0,0 +1,40 @@ +{{define "exchanges kucoin" -}} +{{template "header" .}} +## Kucoin Exchange + +### Current Features + ++ REST Support ++ Websocket Support + +### Subscriptions + +Default Public Subscriptions: +- Ticker for spot, margin and futures +- Orderbook for spot, margin and futures +- All trades for spot and margin + +Default Authenticated Subscriptions: +- All trades for futures +- Stop Order Lifecycle events for futures +- Account Balance events for spot, margin and futures +- Margin Position updates +- Margin Loan updates + +Subscriptions are subject to enabled assets and pairs. + +Limitations: +- 100 symbols per subscription +- 300 symbols per connection + +Due to these limitations, if more than 10 symbols are enabled, ticker will subscribe to ticker:all. + +Unimplemented subscriptions: +- Candles for Futures +- Market snapshot for currency + +### Please click GoDocs chevron above to view current GoDoc information for this package + +{{template "contributions"}} +{{template "donations" .}} +{{end}} diff --git a/cmd/documentation/exchanges_templates/subscription.tmpl b/cmd/documentation/exchanges_templates/subscription.tmpl index b54b7cd5..4df4403a 100644 --- a/cmd/documentation/exchanges_templates/subscription.tmpl +++ b/cmd/documentation/exchanges_templates/subscription.tmpl @@ -20,12 +20,30 @@ The template is provided with a single context structure: AssetPairs map[asset.Item]currency.Pairs AssetSeparator string PairSeparator string + BatchSize string ``` Subscriptions may fan out many channels for assets and pairs, to support exchanges which require individual subscriptions. -To allow the template to communicate how to handle its output it should use the provided separators: +To allow the template to communicate how to handle its output it should use the provided directives: - AssetSeparator should be added at the end of each section related to assets - PairSeparator should be added at the end of each pair +- BatchSize should be added with a number directly before AssetSeparator to indicate pairs have been batched + +Example: +```{{` +{{- range $asset, $pairs := $.AssetPairs }} + {{- range $b := batch $pairs 30 -}} + {{- $.S.Channel -}} : {{- $b.Join -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 30 + {{- $.AssetSeparator }} +{{- end }} +`}}``` + +Assets and pairs should be output in the sequence in AssetPairs since text/template range function uses an sorted order for map keys. + +Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription We use separators like this because it allows mono-templates to decide at runtime whether to fan out. diff --git a/common/common.go b/common/common.go index 9b8ec990..2aec8fc8 100644 --- a/common/common.go +++ b/common/common.go @@ -15,6 +15,7 @@ import ( "path/filepath" "reflect" "regexp" + "slices" "strconv" "strings" "sync" @@ -388,20 +389,6 @@ func ChangePermission(directory string) error { }) } -// SplitStringSliceByLimit splits a slice of strings into slices by input limit and returns a slice of slice of strings -func SplitStringSliceByLimit(in []string, limit uint) [][]string { - var stringSlice []string - sliceSlice := make([][]string, 0, len(in)/int(limit)+1) - for len(in) >= int(limit) { - stringSlice, in = in[:limit], in[limit:] - sliceSlice = append(sliceSlice, stringSlice) - } - if len(in) > 0 { - sliceSlice = append(sliceSlice, in) - } - return sliceSlice -} - // AddPaddingOnUpperCase adds padding to a string when detecting an upper case letter. If // there are multiple upper case items like `ThisIsHTTPExample`, it will only // pad between like this `This Is HTTP Example`. @@ -653,3 +640,34 @@ func GetTypeAssertError(required string, received interface{}, fieldDescription } return fmt.Errorf("%w from %T to %s%s", ErrTypeAssertFailure, received, required, description) } + +// Batch takes a slice type and converts it into a slice of containing slices of length batchSize, and any remainder in the final batch +// batchSize <= 0 will return the entire input slice in one batch +func Batch[S ~[]E, E any](blobs S, batchSize int) []S { + if len(blobs) == 0 { + return []S{} + } + blobs = slices.Clone(blobs) + if batchSize <= 0 { + return []S{blobs} + } + i := 0 + batches := make([]S, (len(blobs)+batchSize-1)/batchSize) + for batchSize < len(blobs) { + blobs, batches[i] = blobs[batchSize:], blobs[:batchSize:batchSize] + i++ + } + if len(blobs) > 0 { + batches[i] = blobs + } + return batches +} + +// SortStrings takes a slice of fmt.Stringer implementers and returns a new ascending sorted slice +func SortStrings[S ~[]E, E fmt.Stringer](x S) S { + n := slices.Clone(x) + slices.SortFunc(n, func(a, b E) int { + return strings.Compare(a.String(), b.String()) + }) + return n +} diff --git a/common/common_test.go b/common/common_test.go index a468392b..93ebfe7a 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -565,26 +565,6 @@ func initStringSlice(size int) (out []string) { return } -func TestSplitStringSliceByLimit(t *testing.T) { - t.Parallel() - slice50 := initStringSlice(50) - out := SplitStringSliceByLimit(slice50, 20) - if len(out) != 3 { - t.Errorf("expected len() to be 3 instead received: %v", len(out)) - } - if len(out[0]) != 20 { - t.Errorf("expected len() to be 20 instead received: %v", len(out[0])) - } - - out = SplitStringSliceByLimit(slice50, 50) - if len(out) != 1 { - t.Errorf("expected len() to be 3 instead received: %v", len(out)) - } - if len(out[0]) != 50 { - t.Errorf("expected len() to be 20 instead received: %v", len(out[0])) - } -} - func TestAddPaddingOnUpperCase(t *testing.T) { t.Parallel() @@ -856,3 +836,36 @@ func TestErrorCollector(t *testing.T) { require.True(t, ok, "Must return a multiError") assert.Len(t, errs.Unwrap(), 2, "Should have 2 errors") } + +// TestBatch ensures the Batch function does not regress into common behavioural faults if implementation changes +func TestBatch(t *testing.T) { + s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + b := Batch(s, 3) + require.Len(t, b, 4) + assert.Len(t, b[0], 3) + assert.Len(t, b[3], 1) + + b[0][0] = 42 + assert.Equal(t, 1, s[0], "Changing the batches must not change the source") + + require.NotPanics(t, func() { Batch(s, -1) }, "Must not panic on negative batch size") + done := make(chan any, 1) + go func() { done <- Batch(s, 0) }() + require.Eventually(t, func() bool { return len(done) > 0 }, time.Second, time.Millisecond, "Batch 0 must not hang") + + for _, i := range []int{-1, 0, 50} { + b = Batch(s, i) + require.Lenf(t, b, 1, "A batch size of %v should produce a single batch", i) + assert.Lenf(t, b[0], len(s), "A batch size of %v should produce a single batch", i) + } +} + +type A int + +func (a A) String() string { + return strconv.Itoa(int(a)) +} + +func TestSortStrings(t *testing.T) { + assert.Equal(t, []A{1, 2, 5, 6}, SortStrings([]A{6, 2, 5, 1})) +} diff --git a/currency/manager.go b/currency/manager.go index d7639f2c..8486aa78 100644 --- a/currency/manager.go +++ b/currency/manager.go @@ -289,11 +289,14 @@ func (p *PairsManager) DisablePair(a asset.Item, pair Pair) error { return err } - enabled, err := pairStore.Enabled.Remove(pair) - if err != nil { - return err + enabledLen := len(pairStore.Enabled) + + pairStore.Enabled = pairStore.Enabled.Remove(pair) + + if enabledLen == len(pairStore.Enabled) { + return fmt.Errorf("%w %s", ErrPairNotFound, pair) } - pairStore.Enabled = enabled + return nil } diff --git a/currency/manager_test.go b/currency/manager_test.go index 4f50b929..a7064846 100644 --- a/currency/manager_test.go +++ b/currency/manager_test.go @@ -394,42 +394,30 @@ func TestDisablePair(t *testing.T) { t.Parallel() p := initTest(t) - if err := p.DisablePair(asset.Empty, EMPTYPAIR); !errors.Is(err, asset.ErrNotSupported) { - t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported) - } + err := p.DisablePair(asset.Empty, EMPTYPAIR) + assert.ErrorIs(t, err, asset.ErrNotSupported, "Empty asset should error") - if err := p.DisablePair(asset.Spot, EMPTYPAIR); !errors.Is(err, ErrCurrencyPairEmpty) { - t.Fatalf("received: '%v' but expected: '%v'", err, ErrCurrencyPairEmpty) - } + err = p.DisablePair(asset.Spot, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Empty pair should error") p.Pairs = nil - // Test disabling a pair when the pair manager is not initialised - if err := p.DisablePair(asset.Spot, NewPair(BTC, USD)); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(BTC, USD)) + assert.ErrorIs(t, err, ErrPairManagerNotInitialised, "Uninitialised PairManager should error") - // Test asset type which doesn't exist p = initTest(t) - if err := p.DisablePair(asset.Futures, EMPTYPAIR); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.CoinMarginedFutures, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Non-existent asset type should error") - // Test asset type which has an empty pair store p.Pairs[asset.Spot] = nil - if err := p.DisablePair(asset.Spot, EMPTYPAIR); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, EMPTYPAIR) + assert.ErrorIs(t, err, ErrCurrencyPairEmpty, "Empty pair store should error") - // Test disabling a pair which isn't enabled p = initTest(t) - if err := p.DisablePair(asset.Spot, NewPair(LTC, USD)); err == nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(LTC, USD)) + assert.ErrorIs(t, err, ErrPairNotFound, "Not Enabled pair should error") - // Test disabling a valid pair and ensure nil is empty - if err := p.DisablePair(asset.Spot, NewPair(BTC, USD)); err != nil { - t.Error("unexpected result") - } + err = p.DisablePair(asset.Spot, NewPair(BTC, USD)) + assert.NoError(t, err, "DisablePair should not error") } func TestEnablePair(t *testing.T) { diff --git a/currency/pairs.go b/currency/pairs.go index aea9f168..1d9dc884 100644 --- a/currency/pairs.go +++ b/currency/pairs.go @@ -199,28 +199,26 @@ func (p Pairs) GetPairsByCurrencies(currencies Currencies) Pairs { return pairs } -// Remove removes the specified pair from the list of pairs if it exists -func (p Pairs) Remove(pair Pair) (Pairs, error) { - pairs := slices.Clone(p) - for x := range p { - if p[x].Equal(pair) { - return append(pairs[:x], pairs[x+1:]...), nil +// Remove removes the specified pairs from the list of pairs if they exist +func (p Pairs) Remove(rem ...Pair) Pairs { + n := make(Pairs, 0, len(p)) + for _, pN := range p { + if !slices.ContainsFunc(rem, func(pX Pair) bool { return pX.Equal(pN) }) { + n = append(n, pN) } } - return nil, fmt.Errorf("%s %w", pair, ErrPairNotFound) + return slices.Clip(n) } -// Add adds a specified pair to the list of pairs if it doesn't exist +// Add adds pairs to the list of pairs ignoring duplicates func (p Pairs) Add(pairs ...Pair) Pairs { - merge := append(slices.Clone(p), pairs...) - var filterInt int - for x := len(p); x < len(merge); x++ { - if !merge[:len(p)+filterInt].Contains(merge[x], true) { - merge[len(p)+filterInt] = merge[x] - filterInt++ + n := slices.Clone(p) + for _, a := range pairs { + if !n.Contains(a, true) { + n = append(n, a) } } - return merge[:len(p)+filterInt] + return n } // GetMatch returns either the pair that is equal including the reciprocal for diff --git a/currency/pairs_test.go b/currency/pairs_test.go index f305f846..96191748 100644 --- a/currency/pairs_test.go +++ b/currency/pairs_test.go @@ -3,6 +3,7 @@ package currency import ( "encoding/json" "errors" + "slices" "testing" "github.com/stretchr/testify/assert" @@ -202,108 +203,40 @@ func TestRemove(t *testing.T) { NewPair(LTC, USDT), } - compare := make(Pairs, len(oldPairs)) - copy(compare, oldPairs) + compare := slices.Clone(oldPairs) - p := NewPair(BTC, USD) - newPairs, err := oldPairs.Remove(p) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } + newPairs := oldPairs.Remove(oldPairs[:2]...) - err = compare.ContainsAll(oldPairs, true) - if err != nil { - t.Fatal(err) - } + err := compare.ContainsAll(oldPairs, true) + assert.NoError(t, err, "Remove should not affect the original pairs") - if newPairs.Contains(p, true) || len(newPairs) != 2 { - t.Error("TestRemove unexpected result") - } + require.Len(t, newPairs, 1, "Remove should remove a pair") + require.Equal(t, oldPairs[2], newPairs[0], "Remove should leave the final pair") - _, err = newPairs.Remove(p) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } - - newPairs, err = oldPairs.Remove(p) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - newPairs, err = newPairs.Remove(NewPair(LTC, USD)) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - err = compare.ContainsAll(oldPairs, true) - if err != nil { - t.Fatal(err) - } - - _, err = newPairs.Remove(NewPair(LTC, USD)) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } - - newPairs, err = newPairs.Remove(NewPair(LTC, USDT)) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected '%v'", err, nil) - } - - if len(newPairs) != 0 { - t.Error("unexpected value") - } - - _, err = newPairs.Remove(NewPair(LTC, USDT)) - if !errors.Is(err, ErrPairNotFound) { - t.Fatalf("received: '%v' but expected '%v'", err, ErrPairNotFound) - } + newPairs = newPairs.Remove(oldPairs[0]) + assert.Len(t, newPairs, 1, newPairs, "Remove have no effect on non-included pairs") } func TestAdd(t *testing.T) { t.Parallel() - var pairs = Pairs{ - NewPair(BTC, USD), - NewPair(LTC, USD), - NewPair(LTC, USDT), - } - // Test adding a new pair to the list of pairs - p := NewPair(BTC, USDT) - pairs = pairs.Add(p) - if !pairs.Contains(p, true) || len(pairs) != 4 { - t.Error("TestAdd unexpected result") - } - // Now test adding a pair which already exists - pairs = pairs.Add(p) - if len(pairs) != 4 { - t.Error("TestAdd unexpected result") - } - // Test adding multiple pairs - pairs = pairs.Add(NewPair(BTC, LTC), NewPair(ETH, USD)) - if len(pairs) != 6 { - t.Error("TestAdd unexpected result") - } - // Test adding multiple duplicate pairs - pairs = pairs.Add(NewPair(ETH, USDT), NewPair(ETH, USDT)) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - // Test whether the original pairs have been modified - pairsWithExtraBaggage := make(Pairs, 0, len(pairs)+3) - pairsWithExtraBaggage = append(pairsWithExtraBaggage, pairs...) - brain := NewPair(BRAIN, USD) - withBrain := pairsWithExtraBaggage.Add(NewPair(BTC, LTC), brain) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - assert.Equal(t, brain, withBrain[len(withBrain)-1]) - badger := NewPair(BADGER, USD) - withBadger := pairsWithExtraBaggage.Add(NewPair(BTC, LTC), badger) - if len(pairs) != 7 { - t.Error("TestAdd unexpected result") - } - assert.Equal(t, badger, withBadger[len(withBadger)-1]) - assert.Equal(t, brain, withBrain[len(withBrain)-1]) + orig := Pairs{NewPair(BTC, USD), NewPair(LTC, USD), NewPair(LTC, USDT)} + p := slices.Clone(orig) + p2 := Pairs{NewPair(BTC, USDT), NewPair(ETH, USD), NewPair(BTC, ETH)} + + pT := p.Add(p...) + assert.Equal(t, pT.Join(), orig.Join(), "Adding only existing pairs should return same Pairs") + assert.Equal(t, p.Join(), orig.Join(), "Should not effect original") + + pT = p.Add(p2...) + assert.Equal(t, pT.Join(), append(orig, p2...).Join(), "Adding new pairs should return correct Pairs") + assert.Equal(t, p.Join(), orig.Join(), "Should not effect original") + + p = slices.Grow(slices.Clone(orig), len(p2)) // Grow so that append doesn't alloc + pT1 := p.Add(p2[0]) + pT2 := p.Add(p2[1]) + pT1[3] = p2[2] // If Add doesn't allocate an new underlying array, this would affect PT2 as well + assert.Equal(t, p.Join(), orig.Join(), "Pairs underlying array should not be shared with original") + assert.Equal(t, pT2.Join(), append(orig, p2[1]).Join(), "Pairs underlying array should not be shared with siblings") } func TestContains(t *testing.T) { diff --git a/exchanges/asset/asset.go b/exchanges/asset/asset.go index 52f13cf4..e229dffe 100644 --- a/exchanges/asset/asset.go +++ b/exchanges/asset/asset.go @@ -153,8 +153,7 @@ func (a Items) JoinToString(separator string) string { return strings.Join(a.Strings(), separator) } -// IsValid returns whether or not the supplied asset type is valid or -// not +// IsValid returns whether or not the supplied asset type is valid or not func (a Item) IsValid() bool { return a != Empty && supportedFlag&a == a } diff --git a/exchanges/btcmarkets/btcmarkets_wrapper.go b/exchanges/btcmarkets/btcmarkets_wrapper.go index ece27598..e189ddab 100644 --- a/exchanges/btcmarkets/btcmarkets_wrapper.go +++ b/exchanges/btcmarkets/btcmarkets_wrapper.go @@ -613,7 +613,7 @@ func (b *BTCMarkets) CancelBatchOrders(ctx context.Context, o []order.Cancel) (* // CancelAllOrders cancels all orders associated with a currency pair func (b *BTCMarkets) CancelAllOrders(ctx context.Context, _ *order.Cancel) (order.CancelAllResponse, error) { - var resp order.CancelAllResponse + resp := order.CancelAllResponse{Status: map[string]string{}} orders, err := b.GetOrders(ctx, "", -1, -1, -1, true) if err != nil { return resp, err @@ -623,21 +623,18 @@ func (b *BTCMarkets) CancelAllOrders(ctx context.Context, _ *order.Cancel) (orde for x := range orders { orderIDs[x] = orders[x].OrderID } - splitOrders := common.SplitStringSliceByLimit(orderIDs, 20) - tempMap := make(map[string]string) - for z := range splitOrders { - tempResp, err := b.CancelBatch(ctx, splitOrders[z]) + for _, batch := range common.Batch(orderIDs, 20) { + cancelResp, err := b.CancelBatch(ctx, batch) if err != nil { return resp, err } - for y := range tempResp.CancelOrders { - tempMap[tempResp.CancelOrders[y].OrderID] = "Success" + for _, r := range cancelResp.CancelOrders { + resp.Status[r.OrderID] = "Success" } - for z := range tempResp.UnprocessedRequests { - tempMap[tempResp.UnprocessedRequests[z].RequestID] = "Cancellation Failed" + for _, r := range cancelResp.UnprocessedRequests { + resp.Status[r.RequestID] = "Cancellation Failed" } } - resp.Status = tempMap return resp, nil } @@ -883,9 +880,8 @@ func (b *BTCMarkets) GetOrderHistory(ctx context.Context, req *order.MultiOrderR tempArray = append(tempArray, orders[z].OrderID) } } - splitOrders := common.SplitStringSliceByLimit(tempArray, 50) - for x := range splitOrders { - tempData, err := b.GetBatchTrades(ctx, splitOrders[x]) + for _, batch := range common.Batch(tempArray, 50) { + tempData, err := b.GetBatchTrades(ctx, batch) if err != nil { return resp, err } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 318f2d0c..17646350 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -776,10 +776,7 @@ func (b *Base) UpdatePairs(incoming currency.Pairs, a asset.Item, enabled, force if err != nil { continue } - diff.Remove, err = diff.Remove.Remove(enabledPairs[x]) - if err != nil { - return err - } + diff.Remove = diff.Remove.Remove(enabledPairs[x]) enabledPairs[target] = match.Format(pFmt) } target++ @@ -1823,19 +1820,14 @@ func (b *Base) ParallelChanOp(channels subscription.List, m func(subscription.Li return errBatchSizeZero } - var j int - for i := 0; i < len(channels); i += batchSize { - j += batchSize - if j >= len(channels) { - j = len(channels) - } + for _, b := range common.Batch(channels, batchSize) { wg.Add(1) go func(c subscription.List) { defer wg.Done() if err := m(c); err != nil { errC <- err } - }(channels[i:j]) + }(b) } wg.Wait() diff --git a/exchanges/kucoin/README.md b/exchanges/kucoin/README.md index ab42b51c..3e122a4d 100644 --- a/exchanges/kucoin/README.md +++ b/exchanges/kucoin/README.md @@ -25,101 +25,35 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader + REST Support + Websocket Support -### How to enable +### Subscriptions -+ [Enable via configuration](https://github.com/thrasher-corp/gocryptotrader/tree/master/config#enable-exchange-via-config-example) +Default Public Subscriptions: +- Ticker for spot, margin and futures +- Orderbook for spot, margin and futures +- All trades for spot and margin -+ Individual package example below: +Default Authenticated Subscriptions: +- All trades for futures +- Stop Order Lifecycle events for futures +- Account Balance events for spot, margin and futures +- Margin Position updates +- Margin Loan updates -```go - // Exchanges will be abstracted out in further updates and examples will be - // supplied then -``` +Subscriptions are subject to enabled assets and pairs. -### How to do REST public/private calls +Limitations: +- 100 symbols per subscription +- 300 symbols per connection -+ If enabled via "configuration".json file the exchange will be added to the -IBotExchange array in the ```go var bot Bot``` and you will only be able to use -the wrapper interface functions for accessing exchange data. View routines.go -for an example of integration usage with GoCryptoTrader. Rudimentary example -below: +Due to these limitations, if more than 10 symbols are enabled, ticker will subscribe to ticker:all. -main.go -```go -var b exchange.IBotExchange - -for i := range bot.Exchanges { - if bot.Exchanges[i].GetName() == "Kucoin" { - b = bot.Exchanges[i] - } -} - -// Public calls - wrapper functions - -// Fetches current ticker information -tick, err := b.FetchTicker() -if err != nil { - // Handle error -} - -// Fetches current orderbook information -ob, err := b.FetchOrderbook() -if err != nil { - // Handle error -} - -// Private calls - wrapper functions - make sure your APIKEY and APISECRET are -// set and AuthenticatedAPISupport is set to true - -// Fetches current account information -accountInfo, err := b.GetAccountInfo() -if err != nil { - // Handle error -} -``` - -+ If enabled via individually importing package, rudimentary example below: - -```go -// Public calls - -// Fetches current ticker information -ticker, err := b.GetTicker() -if err != nil { - // Handle error -} - -// Fetches current orderbook information -ob, err := b.GetOrderBook() -if err != nil { - // Handle error -} - -// Private calls - make sure your APIKEY and APISECRET are set and -// AuthenticatedAPISupport is set to true - -// GetUserInfo returns account info -accountInfo, err := b.GetUserInfo(...) -if err != nil { - // Handle error -} - -// Submits an order and the exchange and returns its tradeID -tradeID, err := b.Trade(...) -if err != nil { - // Handle error -} -``` - -### How to do Websocket public/private calls - -```go - // Exchanges will be abstracted out in further updates and examples will be - // supplied then -``` +Unimplemented subscriptions: +- Candles for Futures +- Market snapshot for currency ### Please click GoDocs chevron above to view current GoDoc information for this package + ## Contribution Please feel free to submit any pull requests or suggest any desired features to be added. diff --git a/exchanges/kucoin/kucoin.go b/exchanges/kucoin/kucoin.go index 4543679c..53a13894 100644 --- a/exchanges/kucoin/kucoin.go +++ b/exchanges/kucoin/kucoin.go @@ -1764,7 +1764,7 @@ func (ku *Kucoin) SendAuthHTTPRequest(ctx context.Context, ePath exchange.URL, e return resp.GetError() } -func (ku *Kucoin) intervalToString(interval kline.Interval) (string, error) { +func intervalToString(interval kline.Interval) (string, error) { switch interval { case kline.OneMin: return "1min", nil diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index 5b88b128..863c239c 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -6,7 +6,6 @@ import ( "errors" "log" "os" - "strings" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/core" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" @@ -28,6 +28,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -58,7 +59,7 @@ func TestMain(m *testing.M) { getFirstTradablePairOfAssets() ku.setupOrderbookManager() - fetchedFuturesSnapshotOrderbook = map[string]bool{} + fetchedFuturesOrderbook = map[string]bool{} os.Exit(m.Run()) } @@ -1973,130 +1974,141 @@ func TestPushData(t *testing.T) { testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", ku.wsHandleData) } -func verifySubs(tb testing.TB, subs subscription.List, a asset.Item, prefix string, expected ...string) { - tb.Helper() - var sub *subscription.Subscription - for i, s := range subs { - if s.Asset == a && strings.HasPrefix(s.Channel, prefix) { - if len(expected) == 1 && !strings.Contains(s.Channel, expected[0]) { - continue - } - if sub != nil { - assert.Failf(tb, "Too many subs with prefix", "Asset %s; Prefix %s", a.String(), prefix) - return - } - sub = subs[i] - } - } - if assert.NotNil(tb, sub, "Should find a sub for asset %s with prefix %s for %s", a.String(), prefix, strings.Join(expected, ", ")) { - suffix := strings.TrimPrefix(sub.Channel, prefix) - if len(expected) == 0 { - assert.Empty(tb, suffix, "Sub for asset %s with prefix %s should have no symbol suffix", a.String(), prefix) - } else { - currs := strings.Split(suffix, ",") - assert.ElementsMatch(tb, currs, expected, "Currencies should match in sub for asset %s with prefix %s", a.String(), prefix) - } - } -} - -// Pairs for Subscription tests: -// Only in Spot: BTC-USDT, ETH-USDT -// In Both: ETH-BTC, LTC-USDT -// Only in Margin: TRX-BTC, SOL-USDC - func TestGenerateSubscriptions(t *testing.T) { t.Parallel() + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + + // Pairs overlap for spot/margin tests: + // Only in Spot: BTC-USDT, ETH-USDT + // In Both: ETH-BTC, LTC-USDT + // Only in Margin: TRX-BTC, SOL-USDC + subPairs := currency.Pairs{} + for _, pp := range [][]string{ + {"BTC", "USDT", "-"}, {"ETH", "BTC", "-"}, {"ETH", "USDT", "-"}, {"LTC", "USDT", "-"}, // Spot + {"ETH", "BTC", "-"}, {"LTC", "USDT", "-"}, {"SOL", "USDC", "-"}, {"TRX", "BTC", "-"}, // Margin + {"ETH", "USDCM", ""}, {"SOL", "USDTM", ""}, {"XBT", "USDCM", ""}, // Futures + } { + subPairs = append(subPairs, currency.NewPairWithDelimiter(pp[0], pp[1], pp[2])) + } + + exp := subscription.List{ + {Channel: subscription.TickerChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/market/ticker:" + subPairs[0:4].Join()}, + {Channel: subscription.TickerChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/market/ticker:" + subPairs[6:8].Join()}, + {Channel: subscription.TickerChannel, Asset: asset.Futures, Pairs: subPairs[8:], QualifiedChannel: "/contractMarket/tickerV2:" + subPairs[8:].Join()}, + {Channel: subscription.OrderbookChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/spotMarket/level2Depth5:" + subPairs[0:4].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.OrderbookChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/spotMarket/level2Depth5:" + subPairs[6:8].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.OrderbookChannel, Asset: asset.Futures, Pairs: subPairs[8:], QualifiedChannel: "/contractMarket/level2Depth5:" + subPairs[8:].Join(), + Interval: kline.HundredMilliseconds}, + {Channel: subscription.AllTradesChannel, Asset: asset.Spot, Pairs: subPairs[0:4], QualifiedChannel: "/market/match:" + subPairs[0:4].Join()}, + {Channel: subscription.AllTradesChannel, Asset: asset.Margin, Pairs: subPairs[6:8], QualifiedChannel: "/market/match:" + subPairs[6:8].Join()}, + } + subs, err := ku.generateSubscriptions() require.NoError(t, err, "generateSubscriptions must not error") + testsubs.EqualLists(t, exp, subs) - assert.Len(t, subs, 11, "Should generate the correct number of subs when not logged in") - - verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. - - verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") - - verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") - - for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) - verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) - } -} - -func TestGenerateAuthSubscriptions(t *testing.T) { - t.Parallel() - - ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes ku.Websocket.SetCanUseAuthenticatedEndpoints(true) + var loanPairs currency.Pairs + loanCurrs := common.SortStrings(subPairs[0:8].GetCurrencies()) + for _, c := range loanCurrs { + loanPairs = append(loanPairs, currency.Pair{Base: c}) + } + + exp = append(exp, subscription.List{ + {Asset: asset.Futures, Channel: futuresTradeOrderChannel, QualifiedChannel: "/contractMarket/tradeOrders", Pairs: subPairs[8:]}, + {Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, QualifiedChannel: "/contractMarket/advancedOrders", Pairs: subPairs[8:]}, + {Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, QualifiedChannel: "/contractAccount/wallet", Pairs: subPairs[8:]}, + {Asset: asset.Margin, Channel: marginPositionChannel, QualifiedChannel: "/margin/position", Pairs: subPairs[4:8]}, + {Asset: asset.Margin, Channel: marginLoanChannel, QualifiedChannel: "/margin/loan:" + loanCurrs.Join(), Pairs: loanPairs}, + {Channel: accountBalanceChannel, QualifiedChannel: "/account/balance"}, + }...) + + subs, err = ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions with Auth must not error") + testsubs.EqualLists(t, exp, subs) +} + +func TestGenerateTickerAllSub(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + for i := 0; i <= 10; i++ { + err = ku.CurrencyPairs.EnablePair(asset.Spot, avail[i]) + require.NoError(t, common.ExcludeError(err, currency.ErrPairAlreadyEnabled), "EnablePair must not error") + } + + enabled, err := ku.GetEnabledPairs(asset.Spot) + require.NoError(t, err, "GetEnabledPairs must not error") + + ku.Features.Subscriptions = subscription.List{{Channel: subscription.TickerChannel, Asset: asset.Spot}} + exp := subscription.List{ + {Channel: subscription.TickerChannel, Asset: asset.Spot, QualifiedChannel: "/market/ticker:all", Pairs: enabled}, + } subs, err := ku.generateSubscriptions() require.NoError(t, err, "generateSubscriptions with Auth must not error") - assert.Len(t, subs, 24, "Should generate the correct number of subs when logged in") - - verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. - - verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") - - verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") - - for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { - verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) - verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) - } - for _, c := range []string{"SOL", "BTC", "TRX", "LTC", "USDC", "USDT", "ETH"} { - verifySubs(t, subs, asset.Margin, "/margin/loan:", c) - } - verifySubs(t, subs, asset.Spot, "/account/balance") - verifySubs(t, subs, asset.Margin, "/margin/position") - verifySubs(t, subs, asset.Margin, "/margin/fundingBook:", "SOL", "BTC", "TRX", "LTC", "USDT", "USDC", "ETH") - verifySubs(t, subs, asset.Futures, "/contractAccount/wallet") - verifySubs(t, subs, asset.Futures, "/contractMarket/advancedOrders") - verifySubs(t, subs, asset.Futures, "/contractMarket/tradeOrders") + testsubs.EqualLists(t, exp, subs) } -func TestGenerateCandleSubscription(t *testing.T) { +// TestGenerateOtherSubscriptions exercises non-default subscriptions +func TestGenerateOtherSubscriptions(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - ku.Features.Subscriptions = subscription.List{ - {Channel: subscription.CandlesChannel, Interval: kline.FourHour}, + + subs := subscription.List{ + {Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.FourHour}, + {Channel: marketSnapshotChannel, Asset: asset.Spot}, } - subs, err := ku.generateSubscriptions() - assert.NoError(t, err, "generateSubscriptions with Candles should not error") - - assert.Len(t, subs, 6, "Should generate the correct number of subs for candles") - for _, c := range []string{"BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC"} { - verifySubs(t, subs, asset.Spot, "/market/candles:", c+"_4hour") - } - for _, c := range []string{"SOL-USDC", "TRX-BTC"} { - verifySubs(t, subs, asset.Margin, "/market/candles:", c+"_4hour") + for _, s := range subs { + ku.Features.Subscriptions = subscription.List{s} + got, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions should not error") + require.Len(t, got, 1, "Should generate just one sub") + assert.NotEmpty(t, got[0].QualifiedChannel, "Qualified Channel should not be empty") + if got[0].Channel == subscription.CandlesChannel { + assert.Equal(t, "/market/candles:BTC-USDT_4hour,ETH-BTC_4hour,ETH-USDT_4hour,LTC-USDT_4hour", got[0].QualifiedChannel, "QualifiedChannel should be correct") + } } } -func TestGenerateMarketSubscription(t *testing.T) { +// TestCheckSubscriptions ensures checkSubscriptions upgrades user config correctly +func TestCheckSubscriptions(t *testing.T) { t.Parallel() - ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes - ku.Features.Subscriptions = subscription.List{ - {Channel: marketSnapshotChannel}, + ku := &Kucoin{ //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + Base: exchange.Base{ + Config: &config.Exchange{ + Features: &config.FeaturesConfig{ + Subscriptions: subscription.List{ + {Enabled: true, Channel: "ticker"}, + {Enabled: true, Channel: "allTrades"}, + {Enabled: true, Channel: "orderbook", Interval: kline.HundredMilliseconds}, + {Enabled: true, Channel: "/contractMarket/tickerV2:%s"}, + {Enabled: true, Channel: "/contractMarket/level2Depth50:%s"}, + {Enabled: true, Channel: "/margin/fundingBook:%s", Authenticated: true}, + {Enabled: true, Channel: "/account/balance", Authenticated: true}, + {Enabled: true, Channel: "/margin/position", Authenticated: true}, + {Enabled: true, Channel: "/margin/loan:%s", Authenticated: true}, + {Enabled: true, Channel: "/contractMarket/tradeOrders", Authenticated: true}, + {Enabled: true, Channel: "/contractMarket/advancedOrders", Authenticated: true}, + {Enabled: true, Channel: "/contractAccount/wallet", Authenticated: true}, + }, + }, + }, + Features: exchange.Features{}, + }, } - subs, err := ku.generateSubscriptions() - assert.NoError(t, err, "generateSubscriptions with MarketSnapshot should not error") - - assert.Len(t, subs, 7, "Should generate the correct number of subs for snapshot") - for _, c := range []string{"BTC", "ETH", "LTC", "USDT"} { - verifySubs(t, subs, asset.Spot, "/market/snapshot:", c) - } - for _, c := range []string{"SOL", "USDC", "TRX"} { - verifySubs(t, subs, asset.Margin, "/market/snapshot:", c) - } + ku.checkSubscriptions() + testsubs.EqualLists(t, defaultSubscriptions, ku.Features.Subscriptions) + testsubs.EqualLists(t, defaultSubscriptions, ku.Config.Features.Subscriptions) } func TestGetAvailableTransferChains(t *testing.T) { @@ -2490,14 +2502,90 @@ func TestProcessMarketSnapshot(t *testing.T) { } } -func TestSubscribeMarketSnapshot(t *testing.T) { +// TestSubscribeBatches ensures that endpoints support batching, contrary to kucoin api docs +func TestSubscribeBatches(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} testexch.SetupWs(t, ku) - err := ku.Subscribe(subscription.List{{Channel: marketSymbolSnapshotChannel, Pairs: currency.Pairs{currency.Pair{Base: currency.BTC}}}}) - assert.NoError(t, err, "Subscribe to MarketSnapshot should not error") + ku.Features.Subscriptions = subscription.List{ + {Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin}, + {Asset: asset.Futures, Channel: subscription.TickerChannel}, + {Asset: asset.Spot, Channel: marketSnapshotChannel}, + } + + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, len(ku.Features.Subscriptions), "Must generate batched subscriptions") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe to small batches should not error") +} + +// TestSubscribeTickerAll ensures that ticker subscriptions switch to using all and it works + +// TestSubscribeBatchLimit exercises the kucoin batch limits of 300 per connection +// Ensures batching of 100 pairs and the connection symbol limit is still 300 at Kucoin's end +func TestSubscribeBatchLimit(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, ku) + + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:299], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}} + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 3, "Must get 3 subs") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe must not error") + + err = ku.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:320], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.AllTradesChannel}} + subs, err = ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 4, "Must get 4 subs") + + err = ku.Subscribe(subs) + require.ErrorContains(t, err, "exceed max subscription count limitation of 300 per session", "Subscribe to MarketSnapshot must error above connection symbol limit") +} + +func TestSubscribeTickerAll(t *testing.T) { + t.Parallel() + + ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, ku) + + avail, err := ku.GetAvailablePairs(asset.Spot) + require.NoError(t, err, "GetAvailablePairs must not error") + + err = ku.CurrencyPairs.StorePairs(asset.Spot, avail[:500], true) + require.NoError(t, err, "StorePairs must not error") + + ku.Features.Subscriptions = subscription.List{{Asset: asset.Spot, Channel: subscription.TickerChannel}} + + subs, err := ku.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + require.Len(t, subs, 1, "Must generate one subscription") + require.Equal(t, "/market/ticker:all", subs[0].QualifiedChannel, "QualifiedChannel must be correct") + + err = ku.Subscribe(subs) + require.NoError(t, err, "Subscribe to must not error") } func TestSeedLocalCache(t *testing.T) { diff --git a/exchanges/kucoin/kucoin_types.go b/exchanges/kucoin/kucoin_types.go index c3ed39ba..1d710c09 100644 --- a/exchanges/kucoin/kucoin_types.go +++ b/exchanges/kucoin/kucoin_types.go @@ -35,7 +35,6 @@ var ( errInvalidLeverage = errors.New("invalid leverage value") errInvalidClientOrderID = errors.New("no client order ID supplied, this endpoint requires a UUID or similar string") errInvalidMsgType = errors.New("message type field not valid") - errSubscriptionPairRequired = errors.New("pair required for manual subscriptions") subAccountRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-32}$") subAccountPassphraseRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-24}$") @@ -956,19 +955,6 @@ type WsPriceIndicator struct { Value float64 `json:"value"` } -// WsMarginFundingBook represents order book changes on margin. -type WsMarginFundingBook struct { - Sequence int64 `json:"sequence"` - Currency string `json:"currency"` - DailyInterestRate float64 `json:"dailyIntRate"` - AnnualInterestRate float64 `json:"annualIntRate"` - Term int64 `json:"term"` - Size float64 `json:"size"` - Side string `json:"side"` - Timestamp convert.ExchangeTime `json:"ts"` // In Nanosecond - -} - // WsTradeOrder represents a private trade order push data. type WsTradeOrder struct { Symbol string `json:"symbol"` @@ -1079,8 +1065,8 @@ type WsFuturesTicker struct { FilledTime convert.ExchangeTime `json:"ts"` } -// WsFuturesOrderbokInfo represents Level 2 order book information. -type WsFuturesOrderbokInfo struct { +// WsFuturesOrderbookInfo represents Level 2 order book information. +type WsFuturesOrderbookInfo struct { Sequence int64 `json:"sequence"` Change string `json:"change"` Timestamp convert.ExchangeTime `json:"timestamp"` diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 5b7c14bd..35738bfe 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" "net/http" + "slices" "strconv" "strings" "sync" + "text/template" "time" "github.com/buger/jsonparser" @@ -18,6 +20,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/request" @@ -28,60 +31,48 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) -var fetchedFuturesSnapshotOrderbook map[string]bool +var fetchedFuturesOrderbookMutex sync.Mutex +var fetchedFuturesOrderbook map[string]bool const ( publicBullets = "/v1/bullet-public" privateBullets = "/v1/bullet-private" // spot channels - marketAllTickersChannel = "/market/ticker:all" - marketTickerChannel = "/market/ticker:%s" // /market/ticker:{symbol},{symbol}... - marketSymbolSnapshotChannel = "/market/snapshot:%s" // /market/snapshot:{symbol} - marketSnapshotChannel = "/market/snapshot:%v" // /market/snapshot:{market} <--- market represents a currency - marketOrderbookLevel2Channels = "/market/level2:%s" // /market/level2:{pair},{pair}... - marketOrderbookLevel2to5Channel = "/spotMarket/level2Depth5:%s" // /spotMarket/level2Depth5:{symbol},{symbol}... - marketOrderbokLevel2To50Channel = "/spotMarket/level2Depth50:%s" // /spotMarket/level2Depth50:{symbol},{symbol}... - marketCandlesChannel = "/market/candles:%s_%s" // /market/candles:{symbol}_{interval} - marketMatchChannel = "/market/match:%s" // /market/match:{symbol},{symbol}... - indexPriceIndicatorChannel = "/indicator/index:%s" // /indicator/index:{symbol0},{symbol1}.. - markPriceIndicatorChannel = "/indicator/markPrice:%s" // /indicator/markPrice:{symbol0},{symbol1}... - marginFundingbookChangeChannel = "/margin/fundingBook:%s" // /margin/fundingBook:{currency0},{currency1}... + marketTickerChannel = "/market/ticker" // /market/ticker:{symbol},... + marketSnapshotChannel = "/market/snapshot" // /market/snapshot:{symbol},... + marketOrderbookChannel = "/market/level2" // /market/level2:{symbol},... + marketOrderbookDepth5Channel = "/spotMarket/level2Depth5" // /spotMarket/level2Depth5:{symbol},... + marketOrderbookDepth50Channel = "/spotMarket/level2Depth50" // /spotMarket/level2Depth50:{symbol},... + marketCandlesChannel = "/market/candles" // /market/candles:{symbol}_{interval},... + marketMatchChannel = "/market/match" // /market/match:{symbol},... + indexPriceIndicatorChannel = "/indicator/index" // /indicator/index:{symbol},... + markPriceIndicatorChannel = "/indicator/markPrice" // /indicator/markPrice:{symbol},... // Private channels privateSpotTradeOrders = "/spotMarket/tradeOrders" accountBalanceChannel = "/account/balance" marginPositionChannel = "/margin/position" - marginLoanChannel = "/margin/loan:%s" // /margin/loan:{currency} + marginLoanChannel = "/margin/loan" // /margin/loan:{currency} spotMarketAdvancedChannel = "/spotMarket/advancedOrders" // futures channels - futuresTickerV2Channel = "/contractMarket/tickerV2:%s" // /contractMarket/tickerV2:{symbol} - futuresTickerChannel = "/contractMarket/ticker:%s" // /contractMarket/ticker:{symbol} - futuresOrderbookLevel2Channel = "/contractMarket/level2:%s" // /contractMarket/level2:{symbol} - futuresExecutionDataChannel = "/contractMarket/execution:%s" // /contractMarket/execution:{symbol} - futuresOrderbookLevel2Depth5Channel = "/contractMarket/level2Depth5:%s" // /contractMarket/level2Depth5:{symbol} - futuresOrderbookLevel2Depth50Channel = "/contractMarket/level2Depth50:%s" // /contractMarket/level2Depth50:{symbol} - futuresContractMarketDataChannel = "/contract/instrument:%s" // /contract/instrument:{symbol} + futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},... + futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},... + futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},... + futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},... + futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},... + futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},... futuresSystemAnnouncementChannel = "/contract/announcement" - futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot:%s" // /contractMarket/snapshot:{symbol} + futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},... // futures private channels - futuresTradeOrdersBySymbolChannel = "/contractMarket/tradeOrders:%s" // /contractMarket/tradeOrders:{symbol} - futuresTradeOrderChannel = "/contractMarket/tradeOrders" + futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},... + futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},... futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders" futuresAccountBalanceEventChannel = "/contractAccount/wallet" - futuresPositionChangeEventChannel = "/contract/position:%s" // /contract/position:{symbol} ) -var subscriptionNames = map[string]string{ - subscription.TickerChannel: marketAllTickersChannel, // This allows more subscriptions on the orderbook channel for this specific connection. - subscription.OrderbookChannel: marketOrderbookLevel2to5Channel, // This does not require a REST request to get the orderbook. - subscription.CandlesChannel: marketCandlesChannel, - subscription.AllTradesChannel: marketMatchChannel, - // No equivalents for: AllOrders, MyTrades, MyOrders -} - var ( // maxWSUpdateBuffer defines max websocket updates to apply when an // orderbook is initially fetched @@ -94,12 +85,27 @@ var ( maxWSOrderbookWorkers = 10 ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Margin, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Futures, Channel: futuresTradeOrderChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Margin, Channel: marginPositionChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Margin, Channel: marginLoanChannel, Authenticated: true}, + {Enabled: true, Channel: accountBalanceChannel, Authenticated: true}, +} + // WsConnect creates a new websocket connection. func (ku *Kucoin) WsConnect() error { if !ku.Websocket.IsEnabled() || !ku.IsEnabled() { return stream.ErrWebsocketNotEnabled } - fetchedFuturesSnapshotOrderbook = map[string]bool{} + fetchedFuturesOrderbookMutex.Lock() + fetchedFuturesOrderbook = map[string]bool{} + fetchedFuturesOrderbookMutex.Unlock() var dialer websocket.Dialer dialer.HandshakeTimeout = ku.Config.HTTPTimeout dialer.Proxy = http.ProxyFromEnvironment @@ -206,9 +212,8 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { return nil } topicInfo := strings.Split(resp.Topic, ":") - switch { - case strings.HasPrefix(marketAllTickersChannel, topicInfo[0]), - strings.HasPrefix(marketTickerChannel, topicInfo[0]): + switch topicInfo[0] { + case marketTickerChannel: var instruments string if topicInfo[1] == "all" { instruments = resp.Subject @@ -216,117 +221,74 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { instruments = topicInfo[1] } return ku.processTicker(resp.Data, instruments, topicInfo[0]) - case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]): + case marketSnapshotChannel: return ku.processMarketSnapshot(resp.Data, topicInfo[0]) - case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]): + case marketOrderbookChannel: return ku.processOrderbookWithDepth(respData, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(marketOrderbookLevel2to5Channel, topicInfo[0]), - strings.HasPrefix(marketOrderbokLevel2To50Channel, topicInfo[0]): + case marketOrderbookDepth5Channel, marketOrderbookDepth50Channel: return ku.processOrderbook(resp.Data, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(marketCandlesChannel, topicInfo[0]): + case marketCandlesChannel: symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter) if len(symbolAndInterval) != 2 { return errMalformedData } return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0]) - case strings.HasPrefix(marketMatchChannel, topicInfo[0]): + case marketMatchChannel: return ku.processTradeData(resp.Data, topicInfo[1], topicInfo[0]) - case strings.HasPrefix(indexPriceIndicatorChannel, topicInfo[0]): + case indexPriceIndicatorChannel, markPriceIndicatorChannel: var response WsPriceIndicator return ku.processData(resp.Data, &response) - case strings.HasPrefix(markPriceIndicatorChannel, topicInfo[0]): - var response WsPriceIndicator - return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginFundingbookChangeChannel, topicInfo[0]): - var response WsMarginFundingBook - return ku.processData(resp.Data, &response) - case strings.HasPrefix(privateSpotTradeOrders, topicInfo[0]): + case privateSpotTradeOrders: return ku.processOrderChangeEvent(resp.Data, topicInfo[0]) - case strings.HasPrefix(accountBalanceChannel, topicInfo[0]): + case accountBalanceChannel: return ku.processAccountBalanceChange(resp.Data) - case strings.HasPrefix(marginPositionChannel, topicInfo[0]): + case marginPositionChannel: if resp.Subject == "debt.ratio" { var response WsDebtRatioChange return ku.processData(resp.Data, &response) } var response WsPositionStatus return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginLoanChannel, topicInfo[0]) && resp.Subject == "order.done": - var response WsMarginTradeOrderDoneEvent - return ku.processData(resp.Data, &response) - case strings.HasPrefix(marginLoanChannel, topicInfo[0]): - return ku.processMarginLendingTradeOrderEvent(resp.Data) - case strings.HasPrefix(spotMarketAdvancedChannel, topicInfo[0]): - return ku.processStopOrderEvent(resp.Data) - case strings.HasPrefix(futuresTickerV2Channel, topicInfo[0]), - strings.HasPrefix(futuresTickerChannel, topicInfo[0]): - return ku.processFuturesTickerV2(resp.Data) - case strings.HasPrefix(futuresOrderbookLevel2Channel, topicInfo[0]): - if !fetchedFuturesSnapshotOrderbook[topicInfo[1]] { - fetchedFuturesSnapshotOrderbook[topicInfo[1]] = true - var enabledPairs currency.Pairs - enabledPairs, err = ku.GetEnabledPairs(asset.Futures) - if err != nil { - return err - } - var cp currency.Pair - cp, err = enabledPairs.DeriveFrom(topicInfo[1]) - if err != nil { - return err - } - var orderbooks *orderbook.Base - orderbooks, err = ku.FetchOrderbook(context.Background(), cp, asset.Futures) - if err != nil { - return err - } - err = ku.Websocket.Orderbook.LoadSnapshot(orderbooks) - if err != nil { - return err - } + case marginLoanChannel: + if resp.Subject == "order.done" { + var response WsMarginTradeOrderDoneEvent + return ku.processData(resp.Data, &response) + } else { + return ku.processMarginLendingTradeOrderEvent(resp.Data) } - return ku.processFuturesOrderbookLevel2(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresExecutionDataChannel, topicInfo[0]): + case spotMarketAdvancedChannel: + return ku.processStopOrderEvent(resp.Data) + case futuresTickerChannel: + return ku.processFuturesTickerV2(resp.Data) + case futuresExecutionDataChannel: var response WsFuturesExecutionData return ku.processData(resp.Data, &response) - case strings.HasPrefix(futuresOrderbookLevel2Depth5Channel, topicInfo[0]), - strings.HasPrefix(futuresOrderbookLevel2Depth50Channel, topicInfo[0]): - if !fetchedFuturesSnapshotOrderbook[topicInfo[1]] { - fetchedFuturesSnapshotOrderbook[topicInfo[1]] = true - var enabledPairs currency.Pairs - enabledPairs, err = ku.GetEnabledPairs(asset.Futures) - if err != nil { - return err - } - cp, err := enabledPairs.DeriveFrom(topicInfo[1]) - if err != nil { - return err - } - orderbooks, err := ku.FetchOrderbook(context.Background(), cp, asset.Futures) - if err != nil { - return err - } - err = ku.Websocket.Orderbook.LoadSnapshot(orderbooks) - if err != nil { - return err - } + case futuresOrderbookChannel: + if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil { + return err } - return ku.processFuturesOrderbookLevel5(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresContractMarketDataChannel, topicInfo[0]): - if resp.Subject == "mark.index.price" { + return ku.processFuturesOrderbookLevel2(resp.Data, topicInfo[1]) + case futuresOrderbookDepth5Channel, futuresOrderbookDepth50Channel: + if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil { + return err + } + return ku.processFuturesOrderbookSnapshot(resp.Data, topicInfo[1]) + case futuresContractMarketDataChannel: + switch resp.Subject { + case "mark.index.price": return ku.processFuturesMarkPriceAndIndexPrice(resp.Data, topicInfo[1]) - } else if resp.Subject == "funding.rate" { + case "funding.rate": return ku.processFuturesFundingData(resp.Data, topicInfo[1]) } - case strings.HasPrefix(futuresSystemAnnouncementChannel, topicInfo[0]): + case futuresSystemAnnouncementChannel: return ku.processFuturesSystemAnnouncement(resp.Data, resp.Subject) - case strings.HasPrefix(futuresTrasactionStatisticsTimerEventChannel, topicInfo[0]): + case futuresTrasactionStatisticsTimerEventChannel: return ku.processFuturesTransactionStatistics(resp.Data, topicInfo[1]) - case strings.HasPrefix(futuresTradeOrdersBySymbolChannel, topicInfo[0]), - strings.HasPrefix(futuresTradeOrderChannel, topicInfo[0]): + case futuresTradeOrderChannel: return ku.processFuturesPrivateTradeOrders(resp.Data) - case strings.HasPrefix(futuresStopOrdersLifecycleEventChannel, topicInfo[0]): + case futuresStopOrdersLifecycleEventChannel: return ku.processFuturesStopOrderLifecycleEvent(resp.Data) - case strings.HasPrefix(futuresAccountBalanceEventChannel, topicInfo[0]): + case futuresAccountBalanceEventChannel: switch resp.Subject { case "orderMargin.change": var response WsFuturesOrderMarginEvent @@ -337,15 +299,16 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { var response WsFuturesWithdrawalAmountAndTransferOutAmountEvent return ku.processData(resp.Data, &response) } - case strings.HasPrefix(futuresPositionChangeEventChannel, topicInfo[0]): - if resp.Subject == "position.change" { + case futuresPositionChangeEventChannel: + switch resp.Subject { + case "position.change": if resp.ChannelType == "private" { var response WsFuturesPosition return ku.processData(resp.Data, &response) } var response WsFuturesMarkPricePositionChanges return ku.processData(resp.Data, &response) - } else if resp.Subject == "position.settlement" { + case "position.settlement": var response WsFuturesPositionFundingSettlement return ku.processData(resp.Data, &response) } @@ -502,7 +465,30 @@ func (ku *Kucoin) processFuturesMarkPriceAndIndexPrice(respData []byte, instrume return nil } -func (ku *Kucoin) processFuturesOrderbookLevel5(respData []byte, instrument string) error { +// ensureFuturesOrderbookSnapshotLoaded makes sure an initial futures orderbook snapshot is loaded +func (ku *Kucoin) ensureFuturesOrderbookSnapshotLoaded(symbol string) error { + fetchedFuturesOrderbookMutex.Lock() + defer fetchedFuturesOrderbookMutex.Unlock() + if fetchedFuturesOrderbook[symbol] { + return nil + } + fetchedFuturesOrderbook[symbol] = true + enabledPairs, err := ku.GetEnabledPairs(asset.Futures) + if err != nil { + return err + } + cp, err := enabledPairs.DeriveFrom(symbol) + if err != nil { + return err + } + orderbooks, err := ku.FetchOrderbook(context.Background(), cp, asset.Futures) + if err != nil { + return err + } + return ku.Websocket.Orderbook.LoadSnapshot(orderbooks) +} + +func (ku *Kucoin) processFuturesOrderbookSnapshot(respData []byte, instrument string) error { response := WsOrderbookLevel5Response{} if err := json.Unmarshal(respData, &response); err != nil { return err @@ -527,7 +513,7 @@ func (ku *Kucoin) processFuturesOrderbookLevel5(respData []byte, instrument stri } func (ku *Kucoin) processFuturesOrderbookLevel2(respData []byte, instrument string) error { - resp := WsFuturesOrderbokInfo{} + resp := WsFuturesOrderbookInfo{} if err := json.Unmarshal(respData, &resp); err != nil { return err } @@ -959,40 +945,14 @@ func (ku *Kucoin) Unsubscribe(subscriptions subscription.List) error { return ku.manageSubscriptions(subscriptions, "unsubscribe") } -// expandManualSubscription takes a subscription list and expand all the subscriptions across the relevant assets and pairs -func (ku *Kucoin) expandManualSubscriptions(in subscription.List) (subscription.List, error) { - subs := make(subscription.List, 0, len(in)) - for _, s := range in { - if isSymbolChannel(s.Channel) { - if len(s.Pairs) == 0 { - return nil, errSubscriptionPairRequired - } - a := s.Asset - if !a.IsValid() { - a = getChannelsAssetType(s.Channel) - } - assetPairs := map[asset.Item]currency.Pairs{a: s.Pairs} - n, err := ku.expandSubscription(s, assetPairs) - if err != nil { - return nil, err - } - subs = append(subs, n...) - } else { - subs = append(subs, s) - } - } - return subs, nil -} - func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) error { var errs error - subs, errs = ku.expandManualSubscriptions(subs) for _, s := range subs { msgID := strconv.FormatInt(ku.Websocket.Conn.GenerateMessageID(false), 10) req := WsSubscriptionInput{ ID: msgID, Type: operation, - Topic: s.Channel, + Topic: s.QualifiedChannel, PrivateChannel: s.Authenticated, Response: true, } @@ -1003,6 +963,13 @@ func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) switch { case err != nil: errs = common.AppendError(errs, err) + case rType == "error": + code, _ := jsonparser.GetUnsafeString(respRaw, "code") + msg, msgErr := jsonparser.GetUnsafeString(respRaw, "data") + if msgErr != nil { + msg = "unknown error" + } + errs = common.AppendError(errs, fmt.Errorf("%s (%s)", msg, code)) case rType != "ack": errs = common.AppendError(errs, fmt.Errorf("%w: %s from %s", errInvalidMsgType, rType, respRaw)) default: @@ -1023,192 +990,28 @@ func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) return errs } -// getChannelsAssetType returns the asset type to which the subscription channel belongs to or asset.Empty -func getChannelsAssetType(channelName string) asset.Item { - switch channelName { - case futuresTickerV2Channel, futuresTickerChannel, futuresOrderbookLevel2Channel, futuresExecutionDataChannel, futuresOrderbookLevel2Depth5Channel, futuresOrderbookLevel2Depth50Channel, futuresContractMarketDataChannel, futuresSystemAnnouncementChannel, futuresTrasactionStatisticsTimerEventChannel, futuresTradeOrdersBySymbolChannel, futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel, futuresPositionChangeEventChannel: - return asset.Futures - case marketTickerChannel, marketAllTickersChannel, - marketSnapshotChannel, marketSymbolSnapshotChannel, - marketOrderbookLevel2Channels, marketOrderbookLevel2to5Channel, - marketOrderbokLevel2To50Channel, marketCandlesChannel, - marketMatchChannel, indexPriceIndicatorChannel, markPriceIndicatorChannel, - privateSpotTradeOrders, accountBalanceChannel, spotMarketAdvancedChannel: - return asset.Spot - case marginFundingbookChangeChannel, marginPositionChannel, marginLoanChannel: - return asset.Margin - default: - return asset.Empty - } -} - // generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (ku *Kucoin) generateSubscriptions() (subscription.List, error) { - assetPairs := map[asset.Item]currency.Pairs{} - for _, a := range ku.GetAssetTypes(false) { - if p, err := ku.GetEnabledPairs(a); err == nil { - assetPairs[a] = p - } else { - assetPairs[a] = currency.Pairs{} // err is probably that Asset isn't enabled, but we don't care about errors of any type - } - } - authed := ku.Websocket.CanUseAuthenticatedEndpoints() - subscriptions := subscription.List{} - for _, s := range ku.Features.Subscriptions { - if !authed && s.Authenticated { - continue - } - subs, err := ku.expandSubscription(s, assetPairs) - if err != nil { - return nil, err - } - subscriptions = append(subscriptions, subs...) - } - return subscriptions, nil + return ku.Features.Subscriptions.ExpandTemplates(ku) } -// expandSubscription takes a subscription and expands it across the relevant assets and pairs passed in -func (ku *Kucoin) expandSubscription(baseSub *subscription.Subscription, assetPairs map[asset.Item]currency.Pairs) (subscription.List, error) { - var subscriptions = subscription.List{} - if baseSub == nil { - return nil, common.ErrNilPointer - } - s := baseSub.Clone() - s.Channel = channelName(s.Channel) - if !s.Asset.IsValid() { - s.Asset = getChannelsAssetType(s.Channel) - } - - if len(assetPairs[s.Asset]) == 0 { - return nil, nil - } - - switch { - case s.Channel == marginLoanChannel: - for _, c := range assetPairs[asset.Margin].GetCurrencies() { - i := s.Clone() - i.Channel = fmt.Sprintf(s.Channel, c) - subscriptions = append(subscriptions, i) - } - case s.Channel == marketCandlesChannel: - interval, err := ku.intervalToString(s.Interval) - if err != nil { - return nil, err - } - subs := spotOrMarginPairSubs(assetPairs, s, false, interval) - subscriptions = append(subscriptions, subs...) - case s.Channel == marginFundingbookChangeChannel: - s.Channel = fmt.Sprintf(s.Channel, assetPairs[asset.Margin].GetCurrencies().Join()) - subscriptions = append(subscriptions, s) - case s.Channel == marketSnapshotChannel: - subs, err := spotOrMarginCurrencySubs(assetPairs, s) - if err != nil { - return nil, err - } - subscriptions = append(subscriptions, subs...) - case getChannelsAssetType(s.Channel) == asset.Futures && isSymbolChannel(s.Channel): - for _, p := range assetPairs[asset.Futures] { - c, err := ku.FormatExchangeCurrency(p, asset.Futures) - if err != nil { - continue - } - i := s.Clone() - i.Channel = fmt.Sprintf(s.Channel, c) - subscriptions = append(subscriptions, i) - } - case isSymbolChannel(s.Channel): - // Subscriptions which can use a single comma-separated sub per asset - subs := spotOrMarginPairSubs(assetPairs, s, true) - subscriptions = append(subscriptions, subs...) - default: - subscriptions = append(subscriptions, s) - } - return subscriptions, nil -} - -// isSymbolChannel returns true it this channel path ends in a formatting %s to accept a Symbol -func isSymbolChannel(c string) bool { - return strings.HasSuffix(c, "%s") || strings.HasSuffix(c, "%v") -} - -// channelName converts global channel Names used in config of channel input into kucoin channel names -// returns the name unchanged if no match is found -func channelName(name string) string { - if s, ok := subscriptionNames[name]; ok { - return s - } - return name -} - -// spotOrMarginPairSubs accepts a map of pairs and a template subscription and returns a list of subscriptions for Spot and Margin pairs -// If there's a Spot subscription, it won't be added again as a Margin subscription -// If joined param is true then one subscription per asset type with the currencies comma delimited -func spotOrMarginPairSubs(assetPairs map[asset.Item]currency.Pairs, b *subscription.Subscription, join bool, fmtArgs ...any) subscription.List { - subs := subscription.List{} - add := func(a asset.Item, pairs currency.Pairs) { - if len(pairs) == 0 { - return - } - if join { - f := append([]any{pairs.Join()}, fmtArgs...) - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, f...) - subs = append(subs, s) - } else { - for i := range pairs { - f := append([]any{pairs[i].String()}, fmtArgs...) - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, f...) - subs = append(subs, s) - } - } - } - - add(asset.Spot, assetPairs[asset.Spot]) - - marginPairs := currency.Pairs{} - for _, p := range assetPairs[asset.Margin] { - if !assetPairs[asset.Spot].Contains(p, false) { - marginPairs = marginPairs.Add(p) - } - } - add(asset.Margin, marginPairs) - - return subs -} - -// spotOrMarginCurrencySubs accepts a map of pairs and a template subscription and returns a list of subscriptions for every currency in Spot and Margin pairs -// If there's a Spot subscription, it won't be added again as a Margin subscription -func spotOrMarginCurrencySubs(assetPairs map[asset.Item]currency.Pairs, b *subscription.Subscription) (subscription.List, error) { - if b == nil { - return nil, common.ErrNilPointer - } - subs := subscription.List{} - add := func(a asset.Item, currs currency.Currencies) { - if len(currs) == 0 { - return - } - for _, c := range currs { - s := b.Clone() - s.Asset = a - s.Channel = fmt.Sprintf(b.Channel, c) - subs = append(subs, s) - } - } - - add(asset.Spot, assetPairs[asset.Spot].GetCurrencies()) - - marginCurrencies := currency.Currencies{} - for _, c := range assetPairs[asset.Margin].GetCurrencies() { - if !assetPairs[asset.Spot].ContainsCurrency(c) { - marginCurrencies = marginCurrencies.Add(c) - } - } - add(asset.Margin, marginCurrencies) - - return subs, nil +// GetSubscriptionTemplate returns a subscription channel template +func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl"). + Funcs(template.FuncMap{ + "channelName": channelName, + "removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string { + spotPairs, _ := ku.GetEnabledPairs(asset.Spot) + return removeSpotFromMargin(s, ap, spotPairs) + }, + "isCurrencyChannel": isCurrencyChannel, + "isSymbolChannel": isSymbolChannel, + "channelInterval": channelInterval, + "assetCurrencies": assetCurrencies, + "joinPairsWithInterval": joinPairsWithInterval, + "batch": common.Batch[currency.Pairs], + }). + Parse(subTplText) } // orderbookManager defines a way of managing and maintaining synchronisation @@ -1777,3 +1580,158 @@ func (ku *Kucoin) CalculateAssets(topic string, cp currency.Pair) ([]asset.Item, return resp, nil } } + +// checkSubscriptions looks for any backwards incompatibilities with missing assets +// This should be unnecessary and removable by 2025 +func (ku *Kucoin) checkSubscriptions() { + upgraded := false + for _, s := range ku.Config.Features.Subscriptions { + if s.Asset != asset.Empty { + continue + } + upgraded = true + s.Channel = strings.TrimSuffix(s.Channel, ":%s") + switch s.Channel { + case subscription.TickerChannel, subscription.OrderbookChannel: + s.Asset = asset.All + case subscription.AllTradesChannel: + for _, d := range defaultSubscriptions { + if d.Channel == s.Channel { + ku.Config.Features.Subscriptions = append(ku.Config.Features.Subscriptions, d) + } + } + case futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel: + s.Asset = asset.Futures + case marginPositionChannel, marginLoanChannel: + s.Asset = asset.Margin + } + } + ku.Config.Features.Subscriptions = slices.DeleteFunc(ku.Config.Features.Subscriptions, func(s *subscription.Subscription) bool { + switch s.Channel { + case "/contractMarket/level2Depth50", // Replaced by subsctiption.Orderbook for asset.All + "/contractMarket/tickerV2", // Replaced by subscription.Ticker for asset.All + "/margin/fundingBook": // Deprecated and removed + return true + case subscription.AllTradesChannel: + return s.Asset == asset.Empty + } + return false + }) + if upgraded { + ku.Features.Subscriptions = subscription.List{} + for _, s := range ku.Config.Features.Subscriptions { + if s.Enabled { + ku.Features.Subscriptions = append(ku.Features.Subscriptions, s) + } + } + } +} + +// channelName returns the correct channel name for the asset +func channelName(s *subscription.Subscription, a asset.Item) string { + switch s.Channel { + case subscription.TickerChannel: + if a == asset.Futures { + return futuresTickerChannel + } + return marketTickerChannel + case subscription.OrderbookChannel: + if a == asset.Futures { + return futuresOrderbookDepth5Channel + } + return marketOrderbookDepth5Channel // This does not require a REST request to get the orderbook. + case subscription.CandlesChannel: + return marketCandlesChannel // No support in GCT yet for Futures candles + case subscription.AllTradesChannel: + return marketMatchChannel // No support in GCT yet for Futures all trades + } + return s.Channel +} + +// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints +func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string { + if strings.HasPrefix(s.Channel, "/margin") { + return "" + } + if p, ok := ap[asset.Margin]; ok { + ap[asset.Margin] = p.Remove(spotPairs...) + } + return "" +} + +// isSymbolChannel returns if the channel expects receive a symbol +func isSymbolChannel(s *subscription.Subscription) bool { + switch channelName(s, s.Asset) { + case privateSpotTradeOrders, accountBalanceChannel, marginPositionChannel, spotMarketAdvancedChannel, futuresSystemAnnouncementChannel, + futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel: + return false + } + return true +} + +// isCurrencyChannel returns if the channel expects receive a currency +func isCurrencyChannel(s *subscription.Subscription) bool { + return s.Channel == marginLoanChannel +} + +// channelInterval returns the channel interval if it has one +func channelInterval(s *subscription.Subscription) string { + if channelName(s, s.Asset) == marketCandlesChannel { + if i, err := intervalToString(s.Interval); err == nil { + return i + } + } + return "" +} + +// assetCurrencies returns the currencies from all pairs in an asset +// Updates the AssetPairs map parameter to contain only those currencies as Base items for expandTemplates to see +func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) currency.Currencies { + cs := common.SortStrings(ap[s.Asset].GetCurrencies()) + p := currency.Pairs{} + for _, c := range cs { + p = append(p, currency.Pair{Base: c}) + } + ap[s.Asset] = p + return cs +} + +// joinPairsWithInterval returns a list of currency pair symbols joined by comma +// If the subscription has a viable interval it's appended after each symbol +func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string { + out := make([]string, len(b)) + suffix, err := intervalToString(s.Interval) + if err == nil { + suffix = "_" + suffix + } + for i, p := range b { + out[i] = p.String() + suffix + } + return strings.Join(out, ",") +} + +const subTplText = ` +{{- removeSpotFromMargin $.S $.AssetPairs -}} +{{- if isCurrencyChannel $.S }} + {{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}} +{{- else if isSymbolChannel $.S }} + {{ range $asset, $pairs := $.AssetPairs }} + {{- with $name := channelName $.S $asset }} + {{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}} + {{- $name -}} :all + {{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}} + {{- $.BatchSize -}} {{ len $pairs }} + {{- else -}} + {{- range $b := batch $pairs 100 -}} + {{- $name -}} : {{- joinPairsWithInterval $b $.S -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 100 + {{- end }} + {{- end }} + {{ $.AssetSeparator }} + {{- end }} +{{- else -}} + {{ channelName $.S $.S.Asset }} +{{- end }} +` diff --git a/exchanges/kucoin/kucoin_wrapper.go b/exchanges/kucoin/kucoin_wrapper.go index d7cb0299..6cf00ffa 100644 --- a/exchanges/kucoin/kucoin_wrapper.go +++ b/exchanges/kucoin/kucoin_wrapper.go @@ -28,7 +28,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -144,21 +143,7 @@ func (ku *Kucoin) SetDefaults() { GlobalResultLimit: 1500, }, }, - Subscriptions: subscription.List{ - // Where we can we use generic names - {Enabled: true, Channel: subscription.TickerChannel}, // marketTickerChannel - {Enabled: true, Channel: subscription.AllTradesChannel}, // marketMatchChannel - {Enabled: true, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, // marketOrderbookLevel2Channels - {Enabled: true, Channel: futuresTickerV2Channel}, - {Enabled: true, Channel: futuresOrderbookLevel2Depth50Channel}, - {Enabled: true, Channel: marginFundingbookChangeChannel, Authenticated: true}, - {Enabled: true, Channel: accountBalanceChannel, Authenticated: true}, - {Enabled: true, Channel: marginPositionChannel, Authenticated: true}, - {Enabled: true, Channel: marginLoanChannel, Authenticated: true}, - {Enabled: true, Channel: futuresTradeOrderChannel, Authenticated: true}, - {Enabled: true, Channel: futuresStopOrdersLifecycleEventChannel, Authenticated: true}, - {Enabled: true, Channel: futuresAccountBalanceEventChannel, Authenticated: true}, - }, + Subscriptions: defaultSubscriptions.Clone(), } ku.Requester, err = request.New(ku.Name, common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout), @@ -197,6 +182,8 @@ func (ku *Kucoin) Setup(exch *config.Exchange) error { return err } + ku.checkSubscriptions() + wsRunningEndpoint, err := ku.API.Endpoints.GetURL(exchange.WebsocketSpot) if err != nil { return err @@ -1390,7 +1377,7 @@ func (ku *Kucoin) GetHistoricCandles(ctx context.Context, pair currency.Pair, a }) } case asset.Spot, asset.Margin: - intervalString, err := ku.intervalToString(interval) + intervalString, err := intervalToString(interval) if err != nil { return nil, err } @@ -1442,7 +1429,7 @@ func (ku *Kucoin) GetHistoricCandlesExtended(ctx context.Context, pair currency. } case asset.Spot, asset.Margin: var intervalString string - intervalString, err = ku.intervalToString(interval) + intervalString, err = intervalToString(interval) if err != nil { return nil, err } diff --git a/exchanges/kucoin/testdata/wsHandleData.json b/exchanges/kucoin/testdata/wsHandleData.json index 75f620a4..85d400b1 100644 --- a/exchanges/kucoin/testdata/wsHandleData.json +++ b/exchanges/kucoin/testdata/wsHandleData.json @@ -7,7 +7,6 @@ {"type":"message","topic":"/market/match:BTC-USDT","subject":"trade.l3match","data":{"sequence":"1545896669145","type":"match","symbol":"BTC-USDT","side":"buy","price":"0.08200000000000000000","size":"0.01022222000000000000","tradeId":"5c24c5da03aa673885cd67aa","takerOrderId":"5c24c5d903aa6772d55b371e","makerOrderId":"5c2187d003aa677bd09d5c93","time":"1545913818099033203"}} {"id":"","type":"message","topic":"/indicator/index:USDT-BTC","subject":"tick","data":{"symbol":"USDT-BTC","granularity":5000,"timestamp":1551770400000,"value":0.0001092}} {"type":"message","topic":"/indicator/markPrice:USDT-BTC","subject":"tick","data":{"symbol":"USDT-BTC","granularity":5000,"timestamp":1551770400000,"value":0.0001093}} -{"type":"message","topic":"/margin/fundingBook:USDT","subject":"funding.update","data":{"annualIntRate":0.0547,"currency":"USDT","dailyIntRate":0.00015,"sequence":87611418,"side":"lend","size":25040,"term":7,"ts":1671005721087508700}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"buy","orderId":"5efab07953bdea00089965d2","type":"open","orderTime":1593487481683297800,"size":"0.1","filledSize":"0","price":"0.937","clientOid":"1593487481000906","remainSize":"0.1","status":"open","ts":1593487481683297800}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"sell","orderId":"5efab07953bdea00089965fa","liquidity":"taker","type":"match","orderTime":1593487482038606000,"size":"0.1","filledSize":"0.1","price":"0.938","matchPrice":"0.96738","matchSize":"0.1","tradeId":"5efab07a4ee4c7000a82d6d9","clientOid":"1593487481000313","remainSize":"0","status":"match","ts":1593487482038606000}} {"type":"message","topic":"/spotMarket/tradeOrders","subject":"orderChange","channelType":"private","data":{"symbol":"KCS-USDT","orderType":"limit","side":"sell","orderId":"5efab07953bdea00089965fa","type":"filled","orderTime":1593487482038606000,"size":"0.1","filledSize":"0.1","price":"0.938","clientOid":"1593487481000313","remainSize":"0","status":"done","ts":1593487482038606000}} @@ -21,7 +20,6 @@ {"type":"message","topic":"/margin/loan:BTC","subject":"order.done","channelType":"private","data":{"currency":"BTC","orderId":"ac928c66ca53498f9c13a127a60e8","reason":"filled","side":"lend","ts":1553846081210005000}} {"type":"message","topic":"/spotMarket/advancedOrders","subject":"stopOrder","channelType":"private","data":{"createdAt":1589789942337,"orderId":"5ec244f6a8a75e0009958237","orderPrice":"0.00062","orderType":"stop","side":"sell","size":"1","stop":"entry","stopPrice":"0.00062","symbol":"KCS-BTC","tradeType":"TRADE","triggerSuccess":true,"ts":1589790121382281200,"type":"triggered"}} {"subject":"tickerV2","topic":"/contractMarket/tickerV2:ETHUSDCM","data":{"symbol":"ETHUSDCM","bestBidSize":795,"bestBidPrice":3200,"bestAskPrice":3600,"bestAskSize":284,"ts":1553846081210005000}} -{"subject":"ticker","topic":"/contractMarket/ticker:ETHUSDCM","data":{"symbol":"ETHUSDCM","sequence":45,"side":"sell","price":3600,"size":16,"tradeId":"5c9dcf4170744d6f5a3d32fb","bestBidSize":795,"bestBidPrice":3200,"bestAskPrice":3600,"bestAskSize":284,"ts":1553846081210005000}} {"subject":"level2","topic":"/contractMarket/level2:ETHUSDCM","type":"message","data":{"sequence":18,"change":"5000.0,sell,83","timestamp":1551770400000}} {"type":"message","topic":"/contractMarket/execution:ETHUSDCM","subject":"match","data":{"makerUserId":"6287c3015c27f000017d0c2f","symbol":"ETHUSDCM","sequence":31443494,"side":"buy","size":35,"price":23083,"takerOrderId":"63f94040839d00000193264b","makerOrderId":"63f94036839d0000019310c3","takerUserId":"6133f817230d8d000607b941","tradeId":"63f940400000650065f4996f","ts":1677279296134648800}} {"type":"message","topic":"/contractMarket/level2Depth5:ETHUSDCM","subject":"level2","data":{"sequence":1672332328701,"asks":[[23149,13703],[23150,1460],[23151,941],[23152,4591],[23153,4107]],"bids":[[23148,22801],[23147,4766],[23146,1388],[23145,2593],[23144,6286]],"ts":1677280435684,"timestamp":1677280435684}} diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index 60514d6c..877c25d1 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -122,8 +122,7 @@ func (s *Service) DeployDepth(exchange string, p currency.Pair, a asset.Item) (* return book, nil } -// GetDepth returns the actual depth struct for potential subsystems and -// strategies to interact with +// GetDepth returns the actual depth struct for potential subsystems and strategies to interact with func (s *Service) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { s.mu.Lock() defer s.mu.Unlock() @@ -139,9 +138,7 @@ func (s *Service) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Dep Asset: a, }] if !ok { - return nil, fmt.Errorf("%w associated with base currency %s", - errCannotFindOrderbook, - p.Quote) + return nil, fmt.Errorf("%w associated with base currency %s", errCannotFindOrderbook, p.Quote) } return book, nil } @@ -160,9 +157,7 @@ func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Bas defer s.mu.Unlock() m1, ok := s.books[strings.ToLower(exchange)] if !ok { - return nil, fmt.Errorf("%w for %s exchange", - errCannotFindOrderbook, - exchange) + return nil, fmt.Errorf("%w for %s exchange", errCannotFindOrderbook, exchange) } book, ok := m1.m[key.PairAsset{ Base: p.Base.Item, @@ -170,9 +165,7 @@ func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Bas Asset: a, }] if !ok { - return nil, fmt.Errorf("%w associated with base currency %s", - errCannotFindOrderbook, - p.Quote) + return nil, fmt.Errorf("%w associated with base currency %s", errCannotFindOrderbook, p.Quote) } return book.Retrieve() } diff --git a/exchanges/subscription/README.md b/exchanges/subscription/README.md index ff4d7a6f..bbc9492c 100644 --- a/exchanges/subscription/README.md +++ b/exchanges/subscription/README.md @@ -38,12 +38,30 @@ The template is provided with a single context structure: AssetPairs map[asset.Item]currency.Pairs AssetSeparator string PairSeparator string + BatchSize string ``` Subscriptions may fan out many channels for assets and pairs, to support exchanges which require individual subscriptions. -To allow the template to communicate how to handle its output it should use the provided separators: +To allow the template to communicate how to handle its output it should use the provided directives: - AssetSeparator should be added at the end of each section related to assets - PairSeparator should be added at the end of each pair +- BatchSize should be added with a number directly before AssetSeparator to indicate pairs have been batched + +Example: +``` +{{- range $asset, $pairs := $.AssetPairs }} + {{- range $b := batch $pairs 30 -}} + {{- $.S.Channel -}} : {{- $b.Join -}} + {{ $.PairSeparator }} + {{- end -}} + {{- $.BatchSize -}} 30 + {{- $.AssetSeparator }} +{{- end }} +``` + +Assets and pairs should be output in the sequence in AssetPairs since text/template range function uses an sorted order for map keys. + +Template functions may modify AssetPairs to update the subscription's pairs, e.g. Filtering out margin pairs already in spot subscription We use separators like this because it allows mono-templates to decide at runtime whether to fan out. diff --git a/exchanges/subscription/fixtures_test.go b/exchanges/subscription/fixtures_test.go index 918c1f81..32683472 100644 --- a/exchanges/subscription/fixtures_test.go +++ b/exchanges/subscription/fixtures_test.go @@ -8,26 +8,46 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) type mockEx struct { + pairs currency.Pairs + assets asset.Items tpl string auth bool errPairs error errFormat error } +func newMockEx() *mockEx { + pairs := currency.Pairs{btcusdtPair, ethusdcPair} + for _, b := range []currency.Code{currency.LTC, currency.XRP, currency.TRX} { + for _, q := range []currency.Code{currency.USDT, currency.USDC} { + pairs = append(pairs, currency.NewPair(b, q)) + } + } + + return &mockEx{ + assets: asset.Items{asset.Spot, asset.Futures}, + pairs: pairs, + } +} + func (m *mockEx) GetEnabledPairs(_ asset.Item) (currency.Pairs, error) { - return currency.Pairs{btcusdtPair, ethusdcPair}, m.errPairs + return m.pairs, m.errPairs } func (m *mockEx) GetPairFormat(_ asset.Item, _ bool) (currency.PairFormat, error) { return currency.PairFormat{Uppercase: true}, m.errFormat } -func (m *mockEx) GetSubscriptionTemplate(_ *Subscription) (*template.Template, error) { +func (m *mockEx) GetSubscriptionTemplate(s *Subscription) (*template.Template, error) { + if s.Channel == "nil" { + return nil, nil + } return template.New(m.tpl). Funcs(template.FuncMap{ "assetName": func(a asset.Item) string { @@ -35,11 +55,18 @@ func (m *mockEx) GetSubscriptionTemplate(_ *Subscription) (*template.Template, e return "future" } return a.String() - }}). + }, + "updateAssetPairs": func(ap assetPairs) string { + ap[asset.Futures] = nil + ap[asset.Spot] = ap[asset.Spot][0:1] + return "" + }, + "batch": common.Batch[currency.Pairs], + }). ParseFiles("testdata/" + m.tpl) } -func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return asset.Items{asset.Spot, asset.Futures} } +func (m *mockEx) GetAssetTypes(_ bool) asset.Items { return m.assets } func (m *mockEx) CanUseAuthenticatedWebsocketEndpoints() bool { return m.auth } // equalLists is a utility function to compare subscription lists and show a pretty failure message diff --git a/exchanges/subscription/list.go b/exchanges/subscription/list.go index 51d6edfa..f250010d 100644 --- a/exchanges/subscription/list.go +++ b/exchanges/subscription/list.go @@ -85,7 +85,7 @@ func fillAssetPairs(ap assetPairs, a asset.Item, e iExchange) error { if err != nil { return err } - ap[a] = p.Format(f) + ap[a] = common.SortStrings(p.Format(f)) return nil } diff --git a/exchanges/subscription/list_test.go b/exchanges/subscription/list_test.go index 08eba5da..cd92d171 100644 --- a/exchanges/subscription/list_test.go +++ b/exchanges/subscription/list_test.go @@ -83,13 +83,15 @@ func TestListSetStates(t *testing.T) { // All other code is covered under TestExpandTemplates func TestAssetPairs(t *testing.T) { t.Parallel() - expErr := errors.New("Krypton is gone") for _, a := range []asset.Item{asset.Spot, asset.All} { + e := newMockEx() l := &List{{Channel: CandlesChannel, Asset: a}} - _, err := l.assetPairs(&mockEx{errPairs: expErr}) - assert.ErrorIs(t, err, expErr, "Should error correctly on GetEnabledPairs") - _, err = l.assetPairs(&mockEx{errFormat: expErr}) - assert.ErrorIs(t, err, expErr, "Should error correctly on GetPairFormat") + e.errFormat = errors.New("Krypton is back") + _, err := l.assetPairs(e) + assert.ErrorIs(t, err, e.errFormat, "Should error correctly on GetPairFormat") + e.errPairs = errors.New("Krypton is gone") + _, err = l.assetPairs(e) + assert.ErrorIs(t, err, e.errPairs, "Should error correctly on GetEnabledPairs") } } diff --git a/exchanges/subscription/template.go b/exchanges/subscription/template.go index fc67d2ec..ab0a18b4 100644 --- a/exchanges/subscription/template.go +++ b/exchanges/subscription/template.go @@ -4,25 +4,29 @@ import ( "bytes" "errors" "fmt" + "maps" "slices" + "strconv" "strings" - "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) const ( + deviceControl = "\x11" groupSeparator = "\x1D" recordSeparator = "\x1E" ) var ( - errInvalidAssetExpandPairs = errors.New("subscription template containing PairSeparator with must contain either specific Asset or AssetSeparator") - errAssetRecords = errors.New("subscription template did not generate the expected number of asset records") - errPairRecords = errors.New("subscription template did not generate the expected number of pair records") - errAssetTemplateWithoutAll = errors.New("sub.Asset must be set to All if AssetSeparator is used in Channel template") - errNoTemplateContent = errors.New("subscription template did not generate content") - errInvalidTemplate = errors.New("GetSubscriptionTemplate did not return a template") + errInvalidAssetExpandPairs = errors.New("subscription template containing PairSeparator with must contain either specific Asset or AssetSeparator") + errAssetRecords = errors.New("subscription template did not generate the expected number of asset records") + errPairRecords = errors.New("subscription template did not generate the expected number of pair records") + errTooManyBatchSizePerAsset = errors.New("more than one BatchSize directive inside an AssetSeparator") + errAssetTemplateWithoutAll = errors.New("sub.Asset must be set to All if AssetSeparator is used in Channel template") + errNoTemplateContent = errors.New("subscription template did not generate content") + errInvalidTemplate = errors.New("GetSubscriptionTemplate did not return a template") ) type tplCtx struct { @@ -30,6 +34,7 @@ type tplCtx struct { AssetPairs assetPairs PairSeparator string AssetSeparator string + BatchSize string } // ExpandTemplates returns a list of Subscriptions with Template expanded @@ -63,86 +68,132 @@ func (l List) ExpandTemplates(e iExchange) (List, error) { assets = append(assets, k) } slices.Sort(assets) // text/template ranges maps in sorted order + + subs := List{} + for _, s := range l { + expanded, err2 := expandTemplate(e, s, maps.Clone(ap), assets) + if err2 != nil { + err = common.AppendError(err, fmt.Errorf("%s: %w", s, err2)) + } else { + subs = append(subs, expanded...) + } + } + + return subs, err +} + +func expandTemplate(e iExchange, s *Subscription, ap assetPairs, assets asset.Items) (List, error) { + if s.QualifiedChannel != "" { + return List{s}, nil + } + + t, err := e.GetSubscriptionTemplate(s) + if err != nil { + return nil, err + } + if t == nil { + return nil, errInvalidTemplate + } + + subCtx := &tplCtx{ + S: s, + PairSeparator: recordSeparator, + AssetSeparator: groupSeparator, + BatchSize: deviceControl + "BS", + } + subs := List{} - for _, s := range l { - if s.QualifiedChannel != "" { - subs = append(subs, s) + switch s.Asset { + case asset.All: + subCtx.AssetPairs = ap + default: + // This deliberately includes asset.Empty to harmonise handling + subCtx.AssetPairs = assetPairs{ + s.Asset: ap[s.Asset], + } + assets = asset.Items{s.Asset} + if s.Asset != asset.Empty && len(ap[s.Asset]) == 0 { + return List{}, nil // Nothing is enabled for this sub asset + } + } + + if len(s.Pairs) != 0 { + for a, pairs := range subCtx.AssetPairs { + if err := pairs.ContainsAll(s.Pairs, true); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign + return nil, err + } + subCtx.AssetPairs[a] = s.Pairs + } + } + + buf := &bytes.Buffer{} + if err := t.Execute(buf, subCtx); err != nil { //nolint:govet // Shadow, or gocritic will complain sloppyReassign + return nil, err + } + + out := strings.TrimSpace(buf.String()) + + // Remove a single trailing AssetSeparator; don't use a cutset to avoid removing 2 or more + out = strings.TrimSpace(strings.TrimSuffix(out, subCtx.AssetSeparator)) + + assetRecords := strings.Split(out, subCtx.AssetSeparator) + if len(assetRecords) != len(assets) { + return nil, fmt.Errorf("%w: Got %d; Expected %d", errAssetRecords, len(assetRecords), len(assets)) + } + + for i, assetChannels := range assetRecords { + a := assets[i] + pairs := subCtx.AssetPairs[a] + + xpandPairs := strings.Contains(assetChannels, subCtx.PairSeparator) + + /* Batching: + - We start by assuming we'll get 1 batch sized to contain all pairs. Maybe a comma-separated list, or just the asset name + - If a BatchSize directive is found, we expect it to come right at the end, and be followed by the batch size as a number + - We'll then split into N batches of that size + - If no batchSize was declared, but we saw a PairSeparator, then we expect to see one line per pair, so batchSize is 1 + */ + batchSize := len(pairs) + if b := strings.Split(assetChannels, subCtx.BatchSize); len(b) > 2 { + return nil, fmt.Errorf("%w for %s", errTooManyBatchSizePerAsset, a) + } else if len(b) == 2 { + assetChannels = b[0] + if batchSize, err = strconv.Atoi(strings.TrimSpace(b[1])); err != nil { + return nil, fmt.Errorf("%s: %w", s, common.GetTypeAssertError("int", b[1], "batchSize")) + } + } else if xpandPairs { + batchSize = 1 + } + + // Trim space, then only one pair separator, then any more space. + assetChannels = strings.TrimSpace(strings.TrimSuffix(strings.TrimSpace(assetChannels), subCtx.PairSeparator)) + + if assetChannels == "" { continue } - subCtx := &tplCtx{ - S: s, - AssetPairs: ap, - PairSeparator: recordSeparator, - AssetSeparator: groupSeparator, + batches := common.Batch(pairs, batchSize) + + pairLines := strings.Split(assetChannels, subCtx.PairSeparator) + + if s.Asset != asset.Empty && len(pairLines) != len(batches) { + // The number of lines we get generated must match the number of pair batches we expect + return nil, fmt.Errorf("%w for %s: Got %d; Expected %d", errPairRecords, a, len(pairLines), len(batches)) } - t, err := e.GetSubscriptionTemplate(s) - if err != nil { - return nil, err - } - if t == nil { - return nil, errInvalidTemplate - } - - buf := &bytes.Buffer{} - if err := t.Execute(buf, subCtx); err != nil { - return nil, err - } - - out := buf.String() - - subAssets := assets - xpandPairs := strings.Contains(out, subCtx.PairSeparator) - if xpandAssets := strings.Contains(out, subCtx.AssetSeparator); xpandAssets { - if s.Asset != asset.All { - return nil, errAssetTemplateWithoutAll + for j, channel := range pairLines { + c := s.Clone() + c.Asset = a + channel = strings.TrimSpace(channel) + if channel == "" { + return nil, fmt.Errorf("%w for %s: %s", errNoTemplateContent, a, s) } - } else { - if xpandPairs && (s.Asset == asset.All || s.Asset == asset.Empty) { - // We don't currently support expanding Pairs without expanding Assets for All or Empty assets, but we could; waiting for a use-case - return nil, errInvalidAssetExpandPairs - } - // No expansion so update expected Assets for consistent behaviour below - subAssets = []asset.Item{s.Asset} - } - - out = strings.TrimRight(out, " \n\r\t"+subCtx.PairSeparator+subCtx.AssetSeparator) - - assetRecords := strings.Split(out, subCtx.AssetSeparator) - if len(assetRecords) != len(subAssets) { - return nil, fmt.Errorf("%w: Got %d; Expected %d", errAssetRecords, len(assetRecords), len(subAssets)) - } - - for i, assetChannels := range assetRecords { - a := subAssets[i] - assetChannels = strings.TrimRight(assetChannels, " \n\r\t"+recordSeparator) - pairLines := strings.Split(assetChannels, subCtx.PairSeparator) - pairs, ok := ap[a] - if xpandPairs { - if !ok { - return nil, fmt.Errorf("%w: %s", asset.ErrInvalidAsset, a) - } - if len(pairLines) != len(pairs) { - return nil, fmt.Errorf("%w: Got %d; Expected %d", errPairRecords, len(pairLines), len(pairs)) - } - } - for j, channel := range pairLines { - c := s.Clone() - c.Asset = a - channel = strings.TrimSpace(channel) - if channel == "" { - return nil, fmt.Errorf("%w: %s", errNoTemplateContent, s) - } - c.QualifiedChannel = strings.TrimSpace(channel) - if xpandPairs { - c.Pairs = currency.Pairs{pairs[j]} - } else { - c.Pairs = pairs - } - subs = append(subs, c) + c.QualifiedChannel = channel + if s.Asset != asset.Empty { + c.Pairs = batches[j] } + subs = append(subs, c) } } diff --git a/exchanges/subscription/template_test.go b/exchanges/subscription/template_test.go index 9df65b81..f14e851e 100644 --- a/exchanges/subscription/template_test.go +++ b/exchanges/subscription/template_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" @@ -15,29 +16,41 @@ import ( func TestExpandTemplates(t *testing.T) { t.Parallel() - e := &mockEx{ - tpl: "subscriptions.tmpl", - } + e := newMockEx() + e.tpl = "subscriptions.tmpl" // Functionality tests l := List{ - {Channel: "feature1"}, - {Channel: "feature2", Asset: asset.All, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature3", Asset: asset.All, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Levels: 100}, - {Channel: "feature4", Authenticated: true}, - {Channel: "feature1", QualifiedChannel: "just one sub already processed"}, + {Channel: "single-channel"}, + {Channel: "expand-assets", Asset: asset.All, Interval: kline.FifteenMin}, + {Channel: "expand-pairs", Asset: asset.All, Levels: 1}, + {Channel: "expand-pairs", Asset: asset.Spot, Levels: 2}, + {Channel: "single-channel", QualifiedChannel: "just one sub already processed"}, + {Channel: "update-asset-pairs", Asset: asset.All}, + {Channel: "expand-pairs", Asset: asset.Spot, Pairs: e.pairs[0:2], Levels: 3}, + {Channel: "batching", Asset: asset.Spot}, + {Channel: "single-channel", Authenticated: true}, } got, err := l.ExpandTemplates(e) require.NoError(t, err, "ExpandTemplates must not error") exp := List{ - {Channel: "feature1", QualifiedChannel: "feature1"}, - {Channel: "feature2", QualifiedChannel: "spot-feature2@15m", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature2", QualifiedChannel: "future-feature2@15m", Asset: asset.Futures, Pairs: currency.Pairs{btcusdtPair, ethusdcPair}, Interval: kline.FifteenMin}, - {Channel: "feature3", QualifiedChannel: "spot-USDTBTC-feature3@100", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "spot-USDCETH-feature3@100", Asset: asset.Spot, Pairs: currency.Pairs{ethusdcPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "future-USDTBTC-feature3@100", Asset: asset.Futures, Pairs: currency.Pairs{btcusdtPair}, Levels: 100}, - {Channel: "feature3", QualifiedChannel: "future-USDCETH-feature3@100", Asset: asset.Futures, Pairs: currency.Pairs{ethusdcPair}, Levels: 100}, - {Channel: "feature1", QualifiedChannel: "just one sub already processed"}, + {Channel: "single-channel", QualifiedChannel: "single-channel"}, + {Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@15m", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.FifteenMin}, + {Channel: "expand-assets", QualifiedChannel: "future-expand-assets@15m", Asset: asset.Futures, Pairs: e.pairs, Interval: kline.FifteenMin}, + {Channel: "single-channel", QualifiedChannel: "just one sub already processed"}, + {Channel: "update-asset-pairs", QualifiedChannel: "spot-btcusdt-update-asset-pairs", Asset: asset.Spot, Pairs: currency.Pairs{btcusdtPair}}, + {Channel: "expand-pairs", QualifiedChannel: "spot-USDTBTC-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[:1], Levels: 3}, + {Channel: "expand-pairs", QualifiedChannel: "spot-USDCETH-expand-pairs@3", Asset: asset.Spot, Pairs: e.pairs[1:2], Levels: 3}, + } + for _, p := range e.pairs { + exp = append(exp, List{ + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 1}, + {Channel: "expand-pairs", QualifiedChannel: "future-" + p.Swap().String() + "-expand-pairs@1", Asset: asset.Futures, Pairs: currency.Pairs{p}, Levels: 1}, + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@2", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 2}, + }...) + } + for _, b := range common.Batch(common.SortStrings(e.pairs), 3) { + exp = append(exp, &Subscription{Channel: "batching", QualifiedChannel: "spot-" + b.Join() + "-batching", Asset: asset.Spot, Pairs: b}) } if !equalLists(t, exp, got) { @@ -48,36 +61,58 @@ func TestExpandTemplates(t *testing.T) { got, err = l.ExpandTemplates(e) require.NoError(t, err, "ExpandTemplates must not error") exp = append(exp, - &Subscription{Channel: "feature4", QualifiedChannel: "feature4-authed"}, + &Subscription{Channel: "single-channel", QualifiedChannel: "single-channel-authed"}, ) equalLists(t, exp, got) - _, err = List{{Channel: "feature2", Asset: asset.Spot}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errAssetTemplateWithoutAll, "Should error correctly on xpand assets without All") + // Test with just one asset to ensure asset.All works, and disabled assets don't error + e.assets = e.assets[:1] + l = List{ + {Channel: "expand-assets", Asset: asset.All, Interval: kline.OneHour}, + {Channel: "expand-pairs", Asset: asset.All, Levels: 4}, + {Channel: "single-channel", Asset: asset.Futures}, + } + got, err = l.ExpandTemplates(e) + require.NoError(t, err, "ExpandTemplates must not error") + exp = List{ + {Channel: "expand-assets", QualifiedChannel: "spot-expand-assets@1h", Asset: asset.Spot, Pairs: e.pairs, Interval: kline.OneHour}, + } + for _, p := range e.pairs { + exp = append(exp, List{ + {Channel: "expand-pairs", QualifiedChannel: "spot-" + p.Swap().String() + "-expand-pairs@4", Asset: asset.Spot, Pairs: currency.Pairs{p}, Levels: 4}, + }...) + } + equalLists(t, exp, got) + + // Error cases + _, err = List{{Channel: "nil"}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errInvalidTemplate, "Should get correct error on nil template") + + _, err = List{{Channel: "single-channel", Asset: asset.Spot, Pairs: currency.Pairs{currency.NewPairWithDelimiter("NOPE", "POPE", "🐰")}}}.ExpandTemplates(e) + assert.ErrorIs(t, err, currency.ErrPairNotContainedInAvailablePairs, "Should error correctly when pair not available") e.tpl = "errors.tmpl" - _, err = List{{Channel: "error1", Asset: asset.All}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errInvalidAssetExpandPairs, "Should error correctly on xpand pairs but not assets") _, err = List{{Channel: "error1"}}.ExpandTemplates(e) - assert.ErrorIs(t, err, errInvalidAssetExpandPairs, "Should error correctly on xpand pairs but not assets") - - _, err = List{{Channel: "error2"}}.ExpandTemplates(e) assert.ErrorContains(t, err, "wrong number of args for String", "Should error correctly with execution error") - _, err = List{{Channel: "non-existent"}}.ExpandTemplates(e) + _, err = List{{Channel: "empty-content", Asset: asset.Spot}}.ExpandTemplates(e) assert.ErrorIs(t, err, errNoTemplateContent, "Should error correctly when no content generated") - assert.ErrorContains(t, err, "non-existent", "Should error correctly when no content generated") + assert.ErrorContains(t, err, "empty-content", "Should error correctly when no content generated") - _, err = List{{Channel: "error3", Asset: asset.All}}.ExpandTemplates(e) + _, err = List{{Channel: "error2", Asset: asset.All}}.ExpandTemplates(e) assert.ErrorIs(t, err, errAssetRecords, "Should error correctly when invalid number of asset entries") - _, err = List{{Channel: "error4", Asset: asset.Spot}}.ExpandTemplates(e) + _, err = List{{Channel: "error3", Asset: asset.Spot}}.ExpandTemplates(e) assert.ErrorIs(t, err, errPairRecords, "Should error correctly when invalid number of pair entries") - _, err = List{{Channel: "error4", Asset: asset.Margin}}.ExpandTemplates(e) - assert.ErrorIs(t, err, asset.ErrInvalidAsset, "Should error correctly when invalid asset") + _, err = List{{Channel: "error4", Asset: asset.Spot}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errTooManyBatchSizePerAsset, "Should error correctly when too many BatchSize directives") + _, err = List{{Channel: "error5", Asset: asset.Spot}}.ExpandTemplates(e) + assert.ErrorIs(t, err, common.ErrTypeAssertFailure, "Should error correctly when batch size isn't an int") + + e.tpl = "parse-error.tmpl" e.tpl = "parse-error.tmpl" _, err = l.ExpandTemplates(e) assert.ErrorContains(t, err, "function \"explode\" not defined", "Should error correctly on unparsable template") @@ -90,10 +125,13 @@ func TestExpandTemplates(t *testing.T) { _, err = l.ExpandTemplates(e) assert.ErrorIs(t, err, e.errPairs, "Should error correctly on GetEnabledPairs") - l = List{{Channel: "feature1", QualifiedChannel: "already happy"}} + l = List{{Channel: "single-channel", QualifiedChannel: "already happy"}} got, err = l.ExpandTemplates(e) require.NoError(t, err) require.Len(t, got, 1, "Must get back the one sub") assert.Equal(t, "already happy", l[0].QualifiedChannel, "Should get back the one sub") assert.NotSame(t, got, l, "Should get back a different actual list") + + _, err = List{{Channel: "nil"}}.ExpandTemplates(e) + assert.ErrorIs(t, err, errInvalidTemplate, "Should get correct error on nil template") } diff --git a/exchanges/subscription/testdata/errors.tmpl b/exchanges/subscription/testdata/errors.tmpl index a086e0e9..3705d6c8 100644 --- a/exchanges/subscription/testdata/errors.tmpl +++ b/exchanges/subscription/testdata/errors.tmpl @@ -1,13 +1,36 @@ {{- if eq .S.Channel "error1" }} - {{/* Error 1: Expand pairs but not assets, without specific asset */}} - {{- .PairSeparator -}} + {{/* Runtime error from executing */}} + {{ .S.String 42 }} {{- else if eq .S.Channel "error2" }} - {{/* Error 2: Runtime error from executing */}} - {{ .S.String 42 }} + {{/* Incorrect number of asset entries */}} + {{- .AssetSeparator -}} + {{- .AssetSeparator -}} + {{- .AssetSeparator -}} {{- else if eq .S.Channel "error3" }} - {{/* Error 3: Incorrect number of asset entries */}} - {{- .AssetSeparator }} + {{/* Incorrect number of pair entries */}} + {{- .PairSeparator -}} + {{- .PairSeparator -}} + {{- .PairSeparator -}} {{- else if eq .S.Channel "error4" }} - {{/* Error 3: Incorrect number of pair entries */}} - {{- .PairSeparator }} + {{/* Too many BatchSize commands */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- $pairs.Join -}} + {{- $.BatchSize -}}1 + {{- $.BatchSize -}}2 + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq .S.Channel "error5" }} + {{/* BatchSize without number */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- $pairs.Join -}} + {{- $.BatchSize -}} + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq .S.Channel "empty-content" }} + {{/* Empty response for the pair */}} + {{- range $asset, $pairs := $.AssetPairs }} + {{- range $pair := $pairs -}} + {{- $.PairSeparator -}} + {{- end -}} + {{- end -}} {{- end -}} diff --git a/exchanges/subscription/testdata/subscriptions.tmpl b/exchanges/subscription/testdata/subscriptions.tmpl index b223c146..50a7345d 100644 --- a/exchanges/subscription/testdata/subscriptions.tmpl +++ b/exchanges/subscription/testdata/subscriptions.tmpl @@ -1,21 +1,36 @@ -{{- if eq $.S.Channel "feature1" -}} - {{/* Case 1: One channel to rule them all */}} - feature1 -{{- else if eq $.S.Channel "feature2" -}} - {{/* Case 2: One channel per asset */}} - {{- range $asset, $pairs := $.AssetPairs }} - {{ assetName $asset }}-feature2@ {{- $.S.Interval.Short }} - {{- $.AssetSeparator }} - {{- end }} -{{- else if eq $.S.Channel "feature3" }} - {{/* Case 3: One channel per pair per asset */}} - {{- range $asset, $pairs := $.AssetPairs }} - {{- range $pair := $pairs -}} - {{ assetName $asset }}-{{ $pair.Swap.String -}} -feature3@ {{- $.S.Levels }} - {{- $.PairSeparator -}} +{{- if eq $.S.Channel "single-channel" -}} + single-channel + {{- if $.S.Authenticated -}} + -authed {{- end -}} - {{- $.AssetSeparator -}} - {{- end -}} -{{- else if eq $.S.Channel "feature4" }} - feature4-authed +{{- else if eq $.S.Channel "expand-assets" -}} + {{- range $asset, $pairs := $.AssetPairs }} + {{ assetName $asset }}-expand-assets@ {{- $.S.Interval.Short }} + {{- $.AssetSeparator }} + {{- end }} +{{- else if eq $.S.Channel "expand-pairs" }} + {{- range $asset, $pairs := $.AssetPairs }} + {{- range $pair := $pairs -}} + {{ assetName $asset }}-{{ $pair.Swap.String -}} -expand-pairs@ {{- $.S.Levels }} + {{- $.PairSeparator -}} + {{- end -}} + {{- $.AssetSeparator -}} + {{- end -}} +{{- else if eq $.S.Channel "update-asset-pairs" }} + {{- updateAssetPairs $.AssetPairs -}} + spot-btcusdt-update-asset-pairs + {{- $.PairSeparator -}} + {{- $.AssetSeparator -}} + {{/* futures doesn't output anything, but we need an asset separator, so this previous one must not be stripped */}} + {{- $.AssetSeparator -}} +{{- else if eq $.S.Channel "batching" }} + {{- range $asset, $pairs := $.AssetPairs }} + {{- if eq $asset.String "spot" }} + {{- range $batch := batch $pairs 3 -}} + {{ assetName $asset }}-{{ $batch.Join -}} -batching + {{- $.PairSeparator -}} + {{- end -}} + {{- $.BatchSize -}} 3 + {{- end }} + {{- end -}} {{- end -}}