From 32a35b3f525e924efea6ef58f7f998678be57ae7 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Mon, 4 Mar 2024 12:05:26 +1100 Subject: [PATCH] Kucoin: Change default websocket subscription channel for tickers and orderbooks (#1371) * kucoin: quick batching support for ticker/trades and orderbooks * fix test * kucoin: move pieces add commentry * kucoin: optimise listOfAssetsCurrencyPairEnabledFor and refactor implementations, address specific orderbook channel subscription handling * glorious: nits * thx @thrasher-: nits addressed * rm types and tests that are not needed * rm subs checking code, and convert to types.Number * not needed anymore * fix tests * set up reader routine to process updates before init a potential slow websocket subscriber * implement glorious suggestion * glorious: nitters --------- Co-authored-by: Ryan O'Hara-Reid --- engine/websocketroutine_manager.go | 4 +- exchanges/gateio/gateio_wrapper.go | 3 - exchanges/kucoin/kucoin_futures.go | 27 ++-- exchanges/kucoin/kucoin_test.go | 33 +++-- exchanges/kucoin/kucoin_types.go | 8 ++ exchanges/kucoin/kucoin_websocket.go | 197 +++++++++++++++++++-------- exchanges/kucoin/kucoin_wrapper.go | 44 +++--- 7 files changed, 197 insertions(+), 119 deletions(-) diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index bec886be..f6412d8b 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -137,12 +137,12 @@ func (m *WebsocketRoutineManager) websocketRoutine() { wg.Add(1) go func() { defer wg.Done() - err = ws.Connect() + err = m.websocketDataReceiver(ws) if err != nil { log.Errorf(log.WebsocketMgr, "%v", err) } - err = m.websocketDataReceiver(ws) + err = ws.Connect() if err != nil { log.Errorf(log.WebsocketMgr, "%v", err) } diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index adea4d2f..9dc825cc 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -516,9 +516,6 @@ func (g *Gateio) FetchTradablePairs(ctx context.Context, a asset.Item) (currency return nil, err } cp.Quote = currency.NewCode(strings.ReplaceAll(cp.Quote.String(), currency.UnderscoreDelimiter, currency.DashDelimiter)) - if err != nil { - return nil, err - } pairs = append(pairs, cp) } } diff --git a/exchanges/kucoin/kucoin_futures.go b/exchanges/kucoin/kucoin_futures.go index d1883d4d..6a782982 100644 --- a/exchanges/kucoin/kucoin_futures.go +++ b/exchanges/kucoin/kucoin_futures.go @@ -170,7 +170,7 @@ func (ku *Kucoin) GetFuturesOrderbook(ctx context.Context, symbol string) (*Orde if err != nil { return nil, err } - return constructFuturesOrderbook(&o) + return constructFuturesOrderbook(&o), nil } // GetFuturesPartOrderbook20 gets orderbook for a specified symbol with depth 20 @@ -185,7 +185,7 @@ func (ku *Kucoin) GetFuturesPartOrderbook20(ctx context.Context, symbol string) if err != nil { return nil, err } - return constructFuturesOrderbook(&o) + return constructFuturesOrderbook(&o), nil } // GetFuturesPartOrderbook100 gets orderbook for a specified symbol with depth 100 @@ -200,7 +200,7 @@ func (ku *Kucoin) GetFuturesPartOrderbook100(ctx context.Context, symbol string) if err != nil { return nil, err } - return constructFuturesOrderbook(&o) + return constructFuturesOrderbook(&o), nil } // GetFuturesTradeHistory get last 100 trades for symbol @@ -863,20 +863,11 @@ func processFuturesOB(ob [][2]float64) []orderbook.Item { return o } -func constructFuturesOrderbook(o *futuresOrderbookResponse) (*Orderbook, error) { - var ( - s Orderbook - err error - ) - s.Bids = processFuturesOB(o.Bids) - if err != nil { - return nil, err +func constructFuturesOrderbook(o *futuresOrderbookResponse) *Orderbook { + return &Orderbook{ + Bids: processFuturesOB(o.Bids), + Asks: processFuturesOB(o.Asks), + Sequence: o.Sequence, + Time: o.Time.Time(), } - s.Asks = processFuturesOB(o.Asks) - if err != nil { - return nil, err - } - s.Sequence = o.Sequence - s.Time = o.Time.Time() - return &s, err } diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index 5d774286..3fba95e8 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -2027,13 +2027,19 @@ func TestGenerateDefaultSubscriptions(t *testing.T) { t.Parallel() subs, err := ku.GenerateDefaultSubscriptions() + assert.NoError(t, err, "GenerateDefaultSubscriptions should not error") - assert.Len(t, subs, 12, "Should generate the correct number of subs when not logged in") - for _, p := range []string{"ticker", "match", "level2"} { - verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/"+p+":", "SOL-USDC", "TRX-BTC") - } + assert.Len(t, subs, 11, "Should generate the correct number of subs when not logged in") + + verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. + + verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") + verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") + + verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") + verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") + for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) @@ -2052,11 +2058,16 @@ func TestGenerateAuthSubscriptions(t *testing.T) { subs, err := nu.GenerateDefaultSubscriptions() assert.NoError(t, err, "GenerateDefaultSubscriptions with Auth should not error") - assert.Len(t, subs, 25, "Should generate the correct number of subs when logged in") - for _, p := range []string{"ticker", "match", "level2"} { - verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") - verifySubs(t, subs, asset.Margin, "/market/"+p+":", "SOL-USDC", "TRX-BTC") - } + assert.Len(t, subs, 24, "Should generate the correct number of subs when logged in") + + verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well. + + verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") + verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC") + + verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC") + verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC") + for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} { verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c) verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c) @@ -2472,7 +2483,7 @@ func TestProcessOrderbook(t *testing.T) { if err != nil { t.Error(err) } - err = ku.processOrderbook([]byte(orderbookLevel5PushData), "BTC-USDT") + err = ku.processOrderbook([]byte(orderbookLevel5PushData), "BTC-USDT", "") if err != nil { t.Error(err) } diff --git a/exchanges/kucoin/kucoin_types.go b/exchanges/kucoin/kucoin_types.go index 1f40c15e..f9471f44 100644 --- a/exchanges/kucoin/kucoin_types.go +++ b/exchanges/kucoin/kucoin_types.go @@ -1501,3 +1501,11 @@ type MarginOrderParam struct { VisibleSize float64 `json:"visibleSize,omitempty,string"` Funds float64 `json:"funds,string,omitempty"` } + +// Level2Depth5Or20 stores the orderbook data for the level 5 or level 20 +// orderbook +type Level2Depth5Or20 struct { + Asks [][2]types.Number `json:"asks"` + Bids [][2]types.Number `json:"bids"` + Timestamp int64 `json:"timestamp"` +} diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 917bc1a9..7da0a195 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -75,8 +75,8 @@ const ( ) var subscriptionNames = map[string]string{ - subscription.TickerChannel: marketTickerChannel, - subscription.OrderbookChannel: marketOrderbookLevel2Channels, + subscription.TickerChannel: marketAllTickersChannel, // This allows more subscriptions on the orderbook channel for this specific connection. + subscription.OrderbookChannel: marketOrderbookLevel2to5Channel, // This does not require a REST request to get the orderbook. subscription.CandlesChannel: marketCandlesChannel, subscription.AllTradesChannel: marketMatchChannel, // No equivalents for: AllOrders, MyTrades, MyOrders @@ -131,9 +131,6 @@ func (ku *Kucoin) WsConnect() error { } ku.Websocket.Wg.Add(1) go ku.wsReadData() - if err != nil { - return err - } ku.Websocket.Conn.SetupPingHandler(stream.PingHandler{ Delay: time.Millisecond * time.Duration(instances.InstanceServers[0].PingTimeout), Message: []byte(`{"type":"ping"}`), @@ -218,22 +215,22 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { } else { instruments = topicInfo[1] } - return ku.processTicker(resp.Data, instruments) + return ku.processTicker(resp.Data, instruments, topicInfo[0]) case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]): - return ku.processMarketSnapshot(resp.Data) + return ku.processMarketSnapshot(resp.Data, topicInfo[0]) case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]): - return ku.processOrderbookWithDepth(respData, topicInfo[1]) + return ku.processOrderbookWithDepth(respData, topicInfo[1], topicInfo[0]) case strings.HasPrefix(marketOrderbookLevel2to5Channel, topicInfo[0]), strings.HasPrefix(marketOrderbokLevel2To50Channel, topicInfo[0]): - return ku.processOrderbook(resp.Data, topicInfo[1]) + return ku.processOrderbook(resp.Data, topicInfo[1], topicInfo[0]) case strings.HasPrefix(marketCandlesChannel, topicInfo[0]): symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter) if len(symbolAndInterval) != 2 { return errMalformedData } - return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1]) + return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0]) case strings.HasPrefix(marketMatchChannel, topicInfo[0]): - return ku.processTradeData(resp.Data, topicInfo[1]) + return ku.processTradeData(resp.Data, topicInfo[1], topicInfo[0]) case strings.HasPrefix(indexPriceIndicatorChannel, topicInfo[0]): var response WsPriceIndicator return ku.processData(resp.Data, &response) @@ -244,7 +241,7 @@ func (ku *Kucoin) wsHandleData(respData []byte) error { var response WsMarginFundingBook return ku.processData(resp.Data, &response) case strings.HasPrefix(privateSpotTradeOrders, topicInfo[0]): - return ku.processOrderChangeEvent(resp.Data) + return ku.processOrderChangeEvent(resp.Data, topicInfo[0]) case strings.HasPrefix(accountBalanceChannel, topicInfo[0]): return ku.processAccountBalanceChange(resp.Data) case strings.HasPrefix(marginPositionChannel, topicInfo[0]): @@ -643,7 +640,7 @@ func (ku *Kucoin) processAccountBalanceChange(respData []byte) error { return nil } -func (ku *Kucoin) processOrderChangeEvent(respData []byte) error { +func (ku *Kucoin) processOrderChangeEvent(respData []byte, topic string) error { response := WsTradeOrder{} err := json.Unmarshal(respData, &response) if err != nil { @@ -665,8 +662,12 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error { if err != nil { return err } - // TODO should amend this function as we need to know the order asset type when we call it - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { + // TODO: should amend this function as we need to know the order asset type when we call it + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { ku.Websocket.DataHandler <- &order.Detail{ Price: response.Price, Amount: response.Size, @@ -678,7 +679,7 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error { Type: oType, Side: side, Status: oStatus, - AssetType: assetType, + AssetType: assets[x], Date: response.OrderTime.Time(), LastUpdated: response.Timestamp.Time(), Pair: pair, @@ -687,7 +688,7 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error { return nil } -func (ku *Kucoin) processTradeData(respData []byte, instrument string) error { +func (ku *Kucoin) processTradeData(respData []byte, instrument, topic string) error { response := WsTrade{} err := json.Unmarshal(respData, &response) if err != nil { @@ -706,7 +707,11 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error { if err != nil { return err } - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { err = ku.Websocket.Trade.Update(saveTradeData, trade.Data{ CurrencyPair: pair, Timestamp: response.Time.Time(), @@ -715,7 +720,7 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error { Side: side, Exchange: ku.Name, TID: response.TradeID, - AssetType: assetType, + AssetType: assets[x], }) if err != nil { return err @@ -724,7 +729,7 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error { return nil } -func (ku *Kucoin) processTicker(respData []byte, instrument string) error { +func (ku *Kucoin) processTicker(respData []byte, instrument, topic string) error { response := WsTicker{} err := json.Unmarshal(respData, &response) if err != nil { @@ -734,9 +739,16 @@ func (ku *Kucoin) processTicker(respData []byte, instrument string) error { if err != nil { return err } - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { + if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) { + continue + } ku.Websocket.DataHandler <- &ticker.Price{ - AssetType: assetType, + AssetType: assets[x], Last: response.Price, LastUpdated: response.Timestamp.Time(), ExchangeName: ku.Name, @@ -751,7 +763,7 @@ func (ku *Kucoin) processTicker(respData []byte, instrument string) error { return nil } -func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalString string) error { +func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalString, topic string) error { pair, err := currency.NewPairFromString(instrument) if err != nil { return err @@ -765,11 +777,18 @@ func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalStrin if err != nil { return err } - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { - ku.Websocket.DataHandler <- stream.KlineData{ + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { + if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) { + continue + } + ku.Websocket.DataHandler <- &stream.KlineData{ Timestamp: response.Time.Time(), Pair: pair, - AssetType: assetType, + AssetType: assets[x], Exchange: ku.Name, StartTime: resp.Candles.StartTime, Interval: intervalString, @@ -783,7 +802,7 @@ func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalStrin return nil } -func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument string) error { +func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument, topic string) error { pair, err := currency.NewPairFromString(instrument) if err != nil { return err @@ -795,17 +814,18 @@ func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument string) if err != nil { return err } - var init bool - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { - init, err = ku.UpdateLocalBuffer(result.Result, assetType) + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { + var init bool + init, err = ku.UpdateLocalBuffer(result.Result, assets[x]) if err != nil { if init { return nil } - return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s", - ku.Name, - assetType, - err) + return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s", ku.Name, assets[x], err) } } return nil @@ -847,35 +867,54 @@ func (ku *Kucoin) UpdateLocalBuffer(wsdp *WsOrderbook, assetType asset.Item) (bo return false, err } -func (ku *Kucoin) processOrderbook(respData []byte, symbol string) error { - response := &WsOrderbook{} +func (ku *Kucoin) processOrderbook(respData []byte, symbol, topic string) error { + var response Level2Depth5Or20 err := json.Unmarshal(respData, &response) if err != nil { return err } - response.Symbol = symbol - var pair currency.Pair - pair, err = currency.NewPairFromString(symbol) + + pair, err := currency.NewPairFromString(symbol) if err != nil { return err } - var init bool - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { - init, err = ku.UpdateLocalBuffer(response, assetType) + + asks := make([]orderbook.Item, len(response.Asks)) + for x := range response.Asks { + asks[x].Price = response.Asks[x][0].Float64() + asks[x].Amount = response.Asks[x][1].Float64() + } + + bids := make([]orderbook.Item, len(response.Bids)) + for x := range response.Bids { + bids[x].Price = response.Bids[x][0].Float64() + bids[x].Amount = response.Bids[x][1].Float64() + } + + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + + lastUpdated := time.UnixMilli(response.Timestamp) + + for x := range assets { + err = ku.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ + Exchange: ku.Name, + Asks: asks, + Bids: bids, + Pair: pair, + Asset: assets[x], + LastUpdated: lastUpdated, + }) if err != nil { - if init { - return nil - } - return fmt.Errorf("%v - UpdateLocalCache for asset type %v error: %s", - ku.Name, - assetType, - err) + return err } } return nil } -func (ku *Kucoin) processMarketSnapshot(respData []byte) error { +func (ku *Kucoin) processMarketSnapshot(respData []byte, topic string) error { response := WsSnapshot{} err := json.Unmarshal(respData, &response) if err != nil { @@ -885,10 +924,17 @@ func (ku *Kucoin) processMarketSnapshot(respData []byte) error { if err != nil { return err } - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { + assets, err := ku.CalculateAssets(topic, pair) + if err != nil { + return err + } + for x := range assets { + if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) { + continue + } ku.Websocket.DataHandler <- &ticker.Price{ ExchangeName: ku.Name, - AssetType: assetType, + AssetType: assets[x], Last: response.Data.LastTradedPrice, Pair: pair, Low: response.Data.Low, @@ -1024,6 +1070,11 @@ func (ku *Kucoin) expandSubscription(baseSub *subscription.Subscription, assetPa if !s.Asset.IsValid() { s.Asset = getChannelsAssetType(s.Channel) } + + if len(assetPairs[s.Asset]) == 0 { + return nil, nil + } + switch { case s.Channel == marginLoanChannel: for _, c := range assetPairs[asset.Margin].GetCurrencies() { @@ -1678,13 +1729,41 @@ func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair, assetType a return nil } -func (ku *Kucoin) listOfAssetsCurrencyPairEnabledFor(cp currency.Pair) []asset.Item { - assets := []asset.Item{} - for _, a := range ku.CurrencyPairs.GetAssetTypes(true) { - pairs, err := ku.GetEnabledPairs(a) - if err == nil && pairs.Contains(cp, true) { - assets = append(assets, a) +// CalculateAssets returns the available asset types for a currency pair +func (ku *Kucoin) CalculateAssets(topic string, cp currency.Pair) ([]asset.Item, error) { + switch { + case cp.Quote.Equal(currency.USDTM), strings.HasPrefix(topic, "/contract"): + if err := ku.CurrencyPairs.IsAssetEnabled(asset.Futures); err != nil { + if !errors.Is(err, asset.ErrNotSupported) { + return nil, err + } + return nil, nil } + return []asset.Item{asset.Futures}, nil + case strings.HasPrefix(topic, "/margin"), strings.HasPrefix(topic, "/index"): + if err := ku.CurrencyPairs.IsAssetEnabled(asset.Margin); err != nil { + if !errors.Is(err, asset.ErrNotSupported) { + return nil, err + } + return nil, nil + } + return []asset.Item{asset.Margin}, nil + default: + resp := make([]asset.Item, 0, 2) + spotEnabled, err := ku.IsPairEnabled(cp, asset.Spot) + if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) { + return nil, err + } + if spotEnabled { + resp = append(resp, asset.Spot) + } + marginEnabled, err := ku.IsPairEnabled(cp, asset.Margin) + if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) { + return nil, err + } + if marginEnabled { + resp = append(resp, asset.Margin) + } + return resp, nil } - return assets } diff --git a/exchanges/kucoin/kucoin_wrapper.go b/exchanges/kucoin/kucoin_wrapper.go index d767a2cf..432fa038 100644 --- a/exchanges/kucoin/kucoin_wrapper.go +++ b/exchanges/kucoin/kucoin_wrapper.go @@ -369,34 +369,29 @@ func (ku *Kucoin) UpdateTickers(ctx context.Context, assetType asset.Item) error if err != nil { return err } - pairs, err := ku.GetEnabledPairs(assetType) - if err != nil { - return err - } for t := range ticks.Tickers { - pair, err := currency.NewPairFromString(ticks.Tickers[t].Symbol) - if err != nil { + pair, enabled, err := ku.MatchSymbolCheckEnabled(ticks.Tickers[t].Symbol, assetType, true) + if err != nil && !errors.Is(err, currency.ErrPairNotFound) { return err } - if !pairs.Contains(pair, true) { + if !enabled { continue } - for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) { - err = ticker.ProcessTicker(&ticker.Price{ - Last: ticks.Tickers[t].Last, - High: ticks.Tickers[t].High, - Low: ticks.Tickers[t].Low, - Volume: ticks.Tickers[t].Volume, - Ask: ticks.Tickers[t].Sell, - Bid: ticks.Tickers[t].Buy, - Pair: pair, - ExchangeName: ku.Name, - AssetType: assetType, - LastUpdated: ticks.Time.Time(), - }) - if err != nil { - errs = common.AppendError(errs, err) - } + + err = ticker.ProcessTicker(&ticker.Price{ + Last: ticks.Tickers[t].Last, + High: ticks.Tickers[t].High, + Low: ticks.Tickers[t].Low, + Volume: ticks.Tickers[t].Volume, + Ask: ticks.Tickers[t].Sell, + Bid: ticks.Tickers[t].Buy, + Pair: pair, + ExchangeName: ku.Name, + AssetType: assetType, + LastUpdated: ticks.Time.Time(), + }) + if err != nil { + errs = common.AppendError(errs, err) } } default: @@ -1128,9 +1123,6 @@ func (ku *Kucoin) GetActiveOrders(ctx context.Context, getOrdersRequest *order.M if err != nil { return nil, err } - if err != nil { - return nil, err - } for x := range spotOrders.Items { if !spotOrders.Items[x].IsActive { continue