Kucoin: Change default websocket subscription channel for tickers and orderbooks (#1371)

* kucoin: quick batching support for ticker/trades and orderbooks

* fix test

* kucoin: move pieces add commentry

* kucoin: optimise listOfAssetsCurrencyPairEnabledFor and refactor implementations, address specific orderbook channel subscription handling

* glorious: nits

* thx @thrasher-: nits addressed

* rm types and tests that are not needed

* rm subs checking code, and convert to types.Number

* not needed anymore

* fix tests

* set up reader routine to process updates before init a potential slow websocket subscriber

* implement glorious suggestion

* glorious: nitters

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2024-03-04 12:05:26 +11:00
committed by GitHub
parent 954aa0239e
commit 32a35b3f52
7 changed files with 197 additions and 119 deletions

View File

@@ -137,12 +137,12 @@ func (m *WebsocketRoutineManager) websocketRoutine() {
wg.Add(1)
go func() {
defer wg.Done()
err = ws.Connect()
err = m.websocketDataReceiver(ws)
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = m.websocketDataReceiver(ws)
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}

View File

@@ -516,9 +516,6 @@ func (g *Gateio) FetchTradablePairs(ctx context.Context, a asset.Item) (currency
return nil, err
}
cp.Quote = currency.NewCode(strings.ReplaceAll(cp.Quote.String(), currency.UnderscoreDelimiter, currency.DashDelimiter))
if err != nil {
return nil, err
}
pairs = append(pairs, cp)
}
}

View File

@@ -170,7 +170,7 @@ func (ku *Kucoin) GetFuturesOrderbook(ctx context.Context, symbol string) (*Orde
if err != nil {
return nil, err
}
return constructFuturesOrderbook(&o)
return constructFuturesOrderbook(&o), nil
}
// GetFuturesPartOrderbook20 gets orderbook for a specified symbol with depth 20
@@ -185,7 +185,7 @@ func (ku *Kucoin) GetFuturesPartOrderbook20(ctx context.Context, symbol string)
if err != nil {
return nil, err
}
return constructFuturesOrderbook(&o)
return constructFuturesOrderbook(&o), nil
}
// GetFuturesPartOrderbook100 gets orderbook for a specified symbol with depth 100
@@ -200,7 +200,7 @@ func (ku *Kucoin) GetFuturesPartOrderbook100(ctx context.Context, symbol string)
if err != nil {
return nil, err
}
return constructFuturesOrderbook(&o)
return constructFuturesOrderbook(&o), nil
}
// GetFuturesTradeHistory get last 100 trades for symbol
@@ -863,20 +863,11 @@ func processFuturesOB(ob [][2]float64) []orderbook.Item {
return o
}
func constructFuturesOrderbook(o *futuresOrderbookResponse) (*Orderbook, error) {
var (
s Orderbook
err error
)
s.Bids = processFuturesOB(o.Bids)
if err != nil {
return nil, err
func constructFuturesOrderbook(o *futuresOrderbookResponse) *Orderbook {
return &Orderbook{
Bids: processFuturesOB(o.Bids),
Asks: processFuturesOB(o.Asks),
Sequence: o.Sequence,
Time: o.Time.Time(),
}
s.Asks = processFuturesOB(o.Asks)
if err != nil {
return nil, err
}
s.Sequence = o.Sequence
s.Time = o.Time.Time()
return &s, err
}

View File

@@ -2027,13 +2027,19 @@ func TestGenerateDefaultSubscriptions(t *testing.T) {
t.Parallel()
subs, err := ku.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions should not error")
assert.Len(t, subs, 12, "Should generate the correct number of subs when not logged in")
for _, p := range []string{"ticker", "match", "level2"} {
verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/market/"+p+":", "SOL-USDC", "TRX-BTC")
}
assert.Len(t, subs, 11, "Should generate the correct number of subs when not logged in")
verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well.
verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC")
verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC")
for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} {
verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c)
@@ -2052,11 +2058,16 @@ func TestGenerateAuthSubscriptions(t *testing.T) {
subs, err := nu.GenerateDefaultSubscriptions()
assert.NoError(t, err, "GenerateDefaultSubscriptions with Auth should not error")
assert.Len(t, subs, 25, "Should generate the correct number of subs when logged in")
for _, p := range []string{"ticker", "match", "level2"} {
verifySubs(t, subs, asset.Spot, "/market/"+p+":", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/market/"+p+":", "SOL-USDC", "TRX-BTC")
}
assert.Len(t, subs, 24, "Should generate the correct number of subs when logged in")
verifySubs(t, subs, asset.Spot, "/market/ticker:all") // This takes care of margin as well.
verifySubs(t, subs, asset.Spot, "/market/match:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/market/match:", "SOL-USDC", "TRX-BTC")
verifySubs(t, subs, asset.Spot, "/spotMarket/level2Depth5:", "BTC-USDT", "ETH-USDT", "LTC-USDT", "ETH-BTC")
verifySubs(t, subs, asset.Margin, "/spotMarket/level2Depth5:", "SOL-USDC", "TRX-BTC")
for _, c := range []string{"ETHUSDCM", "XBTUSDCM", "SOLUSDTM"} {
verifySubs(t, subs, asset.Futures, "/contractMarket/tickerV2:", c)
verifySubs(t, subs, asset.Futures, "/contractMarket/level2Depth50:", c)
@@ -2472,7 +2483,7 @@ func TestProcessOrderbook(t *testing.T) {
if err != nil {
t.Error(err)
}
err = ku.processOrderbook([]byte(orderbookLevel5PushData), "BTC-USDT")
err = ku.processOrderbook([]byte(orderbookLevel5PushData), "BTC-USDT", "")
if err != nil {
t.Error(err)
}

View File

@@ -1501,3 +1501,11 @@ type MarginOrderParam struct {
VisibleSize float64 `json:"visibleSize,omitempty,string"`
Funds float64 `json:"funds,string,omitempty"`
}
// Level2Depth5Or20 stores the orderbook data for the level 5 or level 20
// orderbook
type Level2Depth5Or20 struct {
Asks [][2]types.Number `json:"asks"`
Bids [][2]types.Number `json:"bids"`
Timestamp int64 `json:"timestamp"`
}

View File

@@ -75,8 +75,8 @@ const (
)
var subscriptionNames = map[string]string{
subscription.TickerChannel: marketTickerChannel,
subscription.OrderbookChannel: marketOrderbookLevel2Channels,
subscription.TickerChannel: marketAllTickersChannel, // This allows more subscriptions on the orderbook channel for this specific connection.
subscription.OrderbookChannel: marketOrderbookLevel2to5Channel, // This does not require a REST request to get the orderbook.
subscription.CandlesChannel: marketCandlesChannel,
subscription.AllTradesChannel: marketMatchChannel,
// No equivalents for: AllOrders, MyTrades, MyOrders
@@ -131,9 +131,6 @@ func (ku *Kucoin) WsConnect() error {
}
ku.Websocket.Wg.Add(1)
go ku.wsReadData()
if err != nil {
return err
}
ku.Websocket.Conn.SetupPingHandler(stream.PingHandler{
Delay: time.Millisecond * time.Duration(instances.InstanceServers[0].PingTimeout),
Message: []byte(`{"type":"ping"}`),
@@ -218,22 +215,22 @@ func (ku *Kucoin) wsHandleData(respData []byte) error {
} else {
instruments = topicInfo[1]
}
return ku.processTicker(resp.Data, instruments)
return ku.processTicker(resp.Data, instruments, topicInfo[0])
case strings.HasPrefix(marketSymbolSnapshotChannel, topicInfo[0]):
return ku.processMarketSnapshot(resp.Data)
return ku.processMarketSnapshot(resp.Data, topicInfo[0])
case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]):
return ku.processOrderbookWithDepth(respData, topicInfo[1])
return ku.processOrderbookWithDepth(respData, topicInfo[1], topicInfo[0])
case strings.HasPrefix(marketOrderbookLevel2to5Channel, topicInfo[0]),
strings.HasPrefix(marketOrderbokLevel2To50Channel, topicInfo[0]):
return ku.processOrderbook(resp.Data, topicInfo[1])
return ku.processOrderbook(resp.Data, topicInfo[1], topicInfo[0])
case strings.HasPrefix(marketCandlesChannel, topicInfo[0]):
symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter)
if len(symbolAndInterval) != 2 {
return errMalformedData
}
return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1])
return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0])
case strings.HasPrefix(marketMatchChannel, topicInfo[0]):
return ku.processTradeData(resp.Data, topicInfo[1])
return ku.processTradeData(resp.Data, topicInfo[1], topicInfo[0])
case strings.HasPrefix(indexPriceIndicatorChannel, topicInfo[0]):
var response WsPriceIndicator
return ku.processData(resp.Data, &response)
@@ -244,7 +241,7 @@ func (ku *Kucoin) wsHandleData(respData []byte) error {
var response WsMarginFundingBook
return ku.processData(resp.Data, &response)
case strings.HasPrefix(privateSpotTradeOrders, topicInfo[0]):
return ku.processOrderChangeEvent(resp.Data)
return ku.processOrderChangeEvent(resp.Data, topicInfo[0])
case strings.HasPrefix(accountBalanceChannel, topicInfo[0]):
return ku.processAccountBalanceChange(resp.Data)
case strings.HasPrefix(marginPositionChannel, topicInfo[0]):
@@ -643,7 +640,7 @@ func (ku *Kucoin) processAccountBalanceChange(respData []byte) error {
return nil
}
func (ku *Kucoin) processOrderChangeEvent(respData []byte) error {
func (ku *Kucoin) processOrderChangeEvent(respData []byte, topic string) error {
response := WsTradeOrder{}
err := json.Unmarshal(respData, &response)
if err != nil {
@@ -665,8 +662,12 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error {
if err != nil {
return err
}
// TODO should amend this function as we need to know the order asset type when we call it
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
// TODO: should amend this function as we need to know the order asset type when we call it
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
ku.Websocket.DataHandler <- &order.Detail{
Price: response.Price,
Amount: response.Size,
@@ -678,7 +679,7 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error {
Type: oType,
Side: side,
Status: oStatus,
AssetType: assetType,
AssetType: assets[x],
Date: response.OrderTime.Time(),
LastUpdated: response.Timestamp.Time(),
Pair: pair,
@@ -687,7 +688,7 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error {
return nil
}
func (ku *Kucoin) processTradeData(respData []byte, instrument string) error {
func (ku *Kucoin) processTradeData(respData []byte, instrument, topic string) error {
response := WsTrade{}
err := json.Unmarshal(respData, &response)
if err != nil {
@@ -706,7 +707,11 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error {
if err != nil {
return err
}
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
err = ku.Websocket.Trade.Update(saveTradeData, trade.Data{
CurrencyPair: pair,
Timestamp: response.Time.Time(),
@@ -715,7 +720,7 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error {
Side: side,
Exchange: ku.Name,
TID: response.TradeID,
AssetType: assetType,
AssetType: assets[x],
})
if err != nil {
return err
@@ -724,7 +729,7 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error {
return nil
}
func (ku *Kucoin) processTicker(respData []byte, instrument string) error {
func (ku *Kucoin) processTicker(respData []byte, instrument, topic string) error {
response := WsTicker{}
err := json.Unmarshal(respData, &response)
if err != nil {
@@ -734,9 +739,16 @@ func (ku *Kucoin) processTicker(respData []byte, instrument string) error {
if err != nil {
return err
}
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &ticker.Price{
AssetType: assetType,
AssetType: assets[x],
Last: response.Price,
LastUpdated: response.Timestamp.Time(),
ExchangeName: ku.Name,
@@ -751,7 +763,7 @@ func (ku *Kucoin) processTicker(respData []byte, instrument string) error {
return nil
}
func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalString string) error {
func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalString, topic string) error {
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
@@ -765,11 +777,18 @@ func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalStrin
if err != nil {
return err
}
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
ku.Websocket.DataHandler <- stream.KlineData{
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &stream.KlineData{
Timestamp: response.Time.Time(),
Pair: pair,
AssetType: assetType,
AssetType: assets[x],
Exchange: ku.Name,
StartTime: resp.Candles.StartTime,
Interval: intervalString,
@@ -783,7 +802,7 @@ func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalStrin
return nil
}
func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument string) error {
func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument, topic string) error {
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
@@ -795,17 +814,18 @@ func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument string)
if err != nil {
return err
}
var init bool
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
init, err = ku.UpdateLocalBuffer(result.Result, assetType)
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
var init bool
init, err = ku.UpdateLocalBuffer(result.Result, assets[x])
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s",
ku.Name,
assetType,
err)
return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s", ku.Name, assets[x], err)
}
}
return nil
@@ -847,35 +867,54 @@ func (ku *Kucoin) UpdateLocalBuffer(wsdp *WsOrderbook, assetType asset.Item) (bo
return false, err
}
func (ku *Kucoin) processOrderbook(respData []byte, symbol string) error {
response := &WsOrderbook{}
func (ku *Kucoin) processOrderbook(respData []byte, symbol, topic string) error {
var response Level2Depth5Or20
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
response.Symbol = symbol
var pair currency.Pair
pair, err = currency.NewPairFromString(symbol)
pair, err := currency.NewPairFromString(symbol)
if err != nil {
return err
}
var init bool
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
init, err = ku.UpdateLocalBuffer(response, assetType)
asks := make([]orderbook.Item, len(response.Asks))
for x := range response.Asks {
asks[x].Price = response.Asks[x][0].Float64()
asks[x].Amount = response.Asks[x][1].Float64()
}
bids := make([]orderbook.Item, len(response.Bids))
for x := range response.Bids {
bids[x].Price = response.Bids[x][0].Float64()
bids[x].Amount = response.Bids[x][1].Float64()
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
lastUpdated := time.UnixMilli(response.Timestamp)
for x := range assets {
err = ku.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: ku.Name,
Asks: asks,
Bids: bids,
Pair: pair,
Asset: assets[x],
LastUpdated: lastUpdated,
})
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type %v error: %s",
ku.Name,
assetType,
err)
return err
}
}
return nil
}
func (ku *Kucoin) processMarketSnapshot(respData []byte) error {
func (ku *Kucoin) processMarketSnapshot(respData []byte, topic string) error {
response := WsSnapshot{}
err := json.Unmarshal(respData, &response)
if err != nil {
@@ -885,10 +924,17 @@ func (ku *Kucoin) processMarketSnapshot(respData []byte) error {
if err != nil {
return err
}
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &ticker.Price{
ExchangeName: ku.Name,
AssetType: assetType,
AssetType: assets[x],
Last: response.Data.LastTradedPrice,
Pair: pair,
Low: response.Data.Low,
@@ -1024,6 +1070,11 @@ func (ku *Kucoin) expandSubscription(baseSub *subscription.Subscription, assetPa
if !s.Asset.IsValid() {
s.Asset = getChannelsAssetType(s.Channel)
}
if len(assetPairs[s.Asset]) == 0 {
return nil, nil
}
switch {
case s.Channel == marginLoanChannel:
for _, c := range assetPairs[asset.Margin].GetCurrencies() {
@@ -1678,13 +1729,41 @@ func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair, assetType a
return nil
}
func (ku *Kucoin) listOfAssetsCurrencyPairEnabledFor(cp currency.Pair) []asset.Item {
assets := []asset.Item{}
for _, a := range ku.CurrencyPairs.GetAssetTypes(true) {
pairs, err := ku.GetEnabledPairs(a)
if err == nil && pairs.Contains(cp, true) {
assets = append(assets, a)
// CalculateAssets returns the available asset types for a currency pair
func (ku *Kucoin) CalculateAssets(topic string, cp currency.Pair) ([]asset.Item, error) {
switch {
case cp.Quote.Equal(currency.USDTM), strings.HasPrefix(topic, "/contract"):
if err := ku.CurrencyPairs.IsAssetEnabled(asset.Futures); err != nil {
if !errors.Is(err, asset.ErrNotSupported) {
return nil, err
}
return nil, nil
}
return []asset.Item{asset.Futures}, nil
case strings.HasPrefix(topic, "/margin"), strings.HasPrefix(topic, "/index"):
if err := ku.CurrencyPairs.IsAssetEnabled(asset.Margin); err != nil {
if !errors.Is(err, asset.ErrNotSupported) {
return nil, err
}
return nil, nil
}
return []asset.Item{asset.Margin}, nil
default:
resp := make([]asset.Item, 0, 2)
spotEnabled, err := ku.IsPairEnabled(cp, asset.Spot)
if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) {
return nil, err
}
if spotEnabled {
resp = append(resp, asset.Spot)
}
marginEnabled, err := ku.IsPairEnabled(cp, asset.Margin)
if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) {
return nil, err
}
if marginEnabled {
resp = append(resp, asset.Margin)
}
return resp, nil
}
return assets
}

View File

@@ -369,34 +369,29 @@ func (ku *Kucoin) UpdateTickers(ctx context.Context, assetType asset.Item) error
if err != nil {
return err
}
pairs, err := ku.GetEnabledPairs(assetType)
if err != nil {
return err
}
for t := range ticks.Tickers {
pair, err := currency.NewPairFromString(ticks.Tickers[t].Symbol)
if err != nil {
pair, enabled, err := ku.MatchSymbolCheckEnabled(ticks.Tickers[t].Symbol, assetType, true)
if err != nil && !errors.Is(err, currency.ErrPairNotFound) {
return err
}
if !pairs.Contains(pair, true) {
if !enabled {
continue
}
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
err = ticker.ProcessTicker(&ticker.Price{
Last: ticks.Tickers[t].Last,
High: ticks.Tickers[t].High,
Low: ticks.Tickers[t].Low,
Volume: ticks.Tickers[t].Volume,
Ask: ticks.Tickers[t].Sell,
Bid: ticks.Tickers[t].Buy,
Pair: pair,
ExchangeName: ku.Name,
AssetType: assetType,
LastUpdated: ticks.Time.Time(),
})
if err != nil {
errs = common.AppendError(errs, err)
}
err = ticker.ProcessTicker(&ticker.Price{
Last: ticks.Tickers[t].Last,
High: ticks.Tickers[t].High,
Low: ticks.Tickers[t].Low,
Volume: ticks.Tickers[t].Volume,
Ask: ticks.Tickers[t].Sell,
Bid: ticks.Tickers[t].Buy,
Pair: pair,
ExchangeName: ku.Name,
AssetType: assetType,
LastUpdated: ticks.Time.Time(),
})
if err != nil {
errs = common.AppendError(errs, err)
}
}
default:
@@ -1128,9 +1123,6 @@ func (ku *Kucoin) GetActiveOrders(ctx context.Context, getOrdersRequest *order.M
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
for x := range spotOrders.Items {
if !spotOrders.Items[x].IsActive {
continue