Files
gocryptotrader/exchanges/kucoin/kucoin_websocket.go
Gareth Kirwan b41fe27684 Kucoin: Add subscription templating and various fixes (#1579)
* Currency: Variadic Pairs.Add

This version of Pairs.Add is simpler and [more
performant](https://gist.github.com/gbjk/06a1fc1832d04ee41213ca518938cf74)

Behavioural difference: If there's nothing to add, the same slice is
returned unaltered. This seems like good sauce

* Currency: Variadic Remove

* Common: Add Batch function

* Common: Add common.SortStrings for stringers

* Subscriptions: Add batching to templates

* Subscriptions: Sort list of pairs

* Kucoin: Switch to sub templating

* Kucoin: Simplify channel prefix usage

* Kucoin: Fix race on fetchedFuturesOrderbook

* Subscriptions: Filter AssetPairs

Now only the assetPairs relevant to the subscription are in the context

* Subscriptions: Respect subscription Pairs

* Subscriptions: Trim AssetSeparator early

We want to trim before checking for "AssetSeparator vs All" because a
template should be allowed to reuse a range template and generate just one trailing AssetSeparator
whilst using a specific Asset

* Kucoin: Fix empty margin asset added

* Kucoin: Add Subscription batching

Turns out that contary to the documentation, kucoin supports batching of
all symbols and currencies

* Kucoin: Fix checkSubscriptions and coverage

* Subscriptions: Simplify error checking

This reduces the complexity of error checking to just be "do we get the
correct numbers".

Fixes Asset.All with only one asset erroring on xpandPairs, because we
trimmed the only asset separator, and then errored that we're not
xpanding Assets and the asset on the sub is asset.All

This use-case conflicted with commit 6bbd546d74, which required:
```
Subscriptions: Trim AssetSeparator early

We want to trim before checking for "AssetSeparator vs All" because a
template should be allowed to reuse a range template and generate just one trailing AssetSeparator
whilst using a specific Asset
```

Now we set up the assets earlier, and we remove the check for xpandAssets, since the number of asset lines matching is all that matters.

I've removed the asset tests for this, but they were correctly erroring
on the number of asset lines instead.

Everything hits coverage, as well.

* Kucoin: Remove deprecated fundingBook endpoint

* BTCMarkets: Use common.Batch
2024-08-09 12:33:15 +10:00

1738 lines
51 KiB
Go

package kucoin
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"slices"
"strconv"
"strings"
"sync"
"text/template"
"time"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
var fetchedFuturesOrderbookMutex sync.Mutex
var fetchedFuturesOrderbook map[string]bool
const (
publicBullets = "/v1/bullet-public"
privateBullets = "/v1/bullet-private"
// spot channels
marketTickerChannel = "/market/ticker" // /market/ticker:{symbol},...
marketSnapshotChannel = "/market/snapshot" // /market/snapshot:{symbol},...
marketOrderbookChannel = "/market/level2" // /market/level2:{symbol},...
marketOrderbookDepth5Channel = "/spotMarket/level2Depth5" // /spotMarket/level2Depth5:{symbol},...
marketOrderbookDepth50Channel = "/spotMarket/level2Depth50" // /spotMarket/level2Depth50:{symbol},...
marketCandlesChannel = "/market/candles" // /market/candles:{symbol}_{interval},...
marketMatchChannel = "/market/match" // /market/match:{symbol},...
indexPriceIndicatorChannel = "/indicator/index" // /indicator/index:{symbol},...
markPriceIndicatorChannel = "/indicator/markPrice" // /indicator/markPrice:{symbol},...
// Private channels
privateSpotTradeOrders = "/spotMarket/tradeOrders"
accountBalanceChannel = "/account/balance"
marginPositionChannel = "/margin/position"
marginLoanChannel = "/margin/loan" // /margin/loan:{currency}
spotMarketAdvancedChannel = "/spotMarket/advancedOrders"
// futures channels
futuresTickerChannel = "/contractMarket/tickerV2" // /contractMarket/tickerV2:{symbol},...
futuresOrderbookChannel = "/contractMarket/level2" // /contractMarket/level2:{symbol},...
futuresOrderbookDepth5Channel = "/contractMarket/level2Depth5" // /contractMarket/level2Depth5:{symbol},...
futuresOrderbookDepth50Channel = "/contractMarket/level2Depth50" // /contractMarket/level2Depth50:{symbol},...
futuresExecutionDataChannel = "/contractMarket/execution" // /contractMarket/execution:{symbol},...
futuresContractMarketDataChannel = "/contract/instrument" // /contract/instrument:{symbol},...
futuresSystemAnnouncementChannel = "/contract/announcement"
futuresTrasactionStatisticsTimerEventChannel = "/contractMarket/snapshot" // /contractMarket/snapshot:{symbol},...
// futures private channels
futuresTradeOrderChannel = "/contractMarket/tradeOrders" // /contractMarket/tradeOrders:{symbol},...
futuresPositionChangeEventChannel = "/contract/position" // /contract/position:{symbol},...
futuresStopOrdersLifecycleEventChannel = "/contractMarket/advancedOrders"
futuresAccountBalanceEventChannel = "/contractAccount/wallet"
)
var (
// maxWSUpdateBuffer defines max websocket updates to apply when an
// orderbook is initially fetched
maxWSUpdateBuffer = 150
// maxWSOrderbookJobs defines max websocket orderbook jobs in queue to fetch
// an orderbook snapshot via REST
maxWSOrderbookJobs = 2000
// maxWSOrderbookWorkers defines a max amount of workers allowed to execute
// jobs from the job channel
maxWSOrderbookWorkers = 10
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Margin, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Futures, Channel: futuresTradeOrderChannel, Authenticated: true},
{Enabled: true, Asset: asset.Futures, Channel: futuresStopOrdersLifecycleEventChannel, Authenticated: true},
{Enabled: true, Asset: asset.Futures, Channel: futuresAccountBalanceEventChannel, Authenticated: true},
{Enabled: true, Asset: asset.Margin, Channel: marginPositionChannel, Authenticated: true},
{Enabled: true, Asset: asset.Margin, Channel: marginLoanChannel, Authenticated: true},
{Enabled: true, Channel: accountBalanceChannel, Authenticated: true},
}
// WsConnect creates a new websocket connection.
func (ku *Kucoin) WsConnect() error {
if !ku.Websocket.IsEnabled() || !ku.IsEnabled() {
return stream.ErrWebsocketNotEnabled
}
fetchedFuturesOrderbookMutex.Lock()
fetchedFuturesOrderbook = map[string]bool{}
fetchedFuturesOrderbookMutex.Unlock()
var dialer websocket.Dialer
dialer.HandshakeTimeout = ku.Config.HTTPTimeout
dialer.Proxy = http.ProxyFromEnvironment
var instances *WSInstanceServers
_, err := ku.GetCredentials(context.Background())
if err != nil {
ku.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
if ku.Websocket.CanUseAuthenticatedEndpoints() {
instances, err = ku.GetAuthenticatedInstanceServers(context.Background())
if err != nil {
ku.Websocket.DataHandler <- err
ku.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
if instances == nil {
instances, err = ku.GetInstanceServers(context.Background())
if err != nil {
return err
}
}
if len(instances.InstanceServers) == 0 {
return errors.New("no websocket instance server found")
}
ku.Websocket.Conn.SetURL(instances.InstanceServers[0].Endpoint + "?token=" + instances.Token)
err = ku.Websocket.Conn.Dial(&dialer, http.Header{})
if err != nil {
return fmt.Errorf("%v - Unable to connect to Websocket. Error: %s", ku.Name, err)
}
ku.Websocket.Wg.Add(1)
go ku.wsReadData()
ku.Websocket.Conn.SetupPingHandler(stream.PingHandler{
Delay: time.Millisecond * time.Duration(instances.InstanceServers[0].PingTimeout),
Message: []byte(`{"type":"ping"}`),
MessageType: websocket.TextMessage,
})
ku.setupOrderbookManager()
return nil
}
// GetInstanceServers retrieves the server list and temporary public token
func (ku *Kucoin) GetInstanceServers(ctx context.Context) (*WSInstanceServers, error) {
response := struct {
Data WSInstanceServers `json:"data"`
Error
}{}
return &(response.Data), ku.SendPayload(ctx, request.Unset, func() (*request.Item, error) {
endpointPath, err := ku.API.Endpoints.GetURL(exchange.RestSpot)
if err != nil {
return nil, err
}
return &request.Item{
Method: http.MethodPost,
Path: endpointPath + publicBullets,
Result: &response,
Verbose: ku.Verbose,
HTTPDebugging: ku.HTTPDebugging,
HTTPRecording: ku.HTTPRecording}, nil
}, request.UnauthenticatedRequest)
}
// GetAuthenticatedInstanceServers retrieves server instances for authenticated users.
func (ku *Kucoin) GetAuthenticatedInstanceServers(ctx context.Context) (*WSInstanceServers, error) {
response := struct {
Data *WSInstanceServers `json:"data"`
Error
}{}
err := ku.SendAuthHTTPRequest(ctx, exchange.RestSpot, defaultSpotEPL, http.MethodPost, privateBullets, nil, &response)
if err != nil && strings.Contains(err.Error(), "400003") {
return response.Data, ku.SendAuthHTTPRequest(ctx, exchange.RestFutures, defaultFuturesEPL, http.MethodPost, privateBullets, nil, &response)
}
return response.Data, err
}
// wsReadData receives and passes on websocket messages for processing
func (ku *Kucoin) wsReadData() {
defer ku.Websocket.Wg.Done()
for {
resp := ku.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
return
}
err := ku.wsHandleData(resp.Raw)
if err != nil {
ku.Websocket.DataHandler <- err
}
}
}
func (ku *Kucoin) wsHandleData(respData []byte) error {
resp := WsPushData{}
err := json.Unmarshal(respData, &resp)
if err != nil {
return err
}
if resp.Type == "pong" || resp.Type == "welcome" {
return nil
}
if resp.ID != "" {
if !ku.Websocket.Match.IncomingWithData("msgID:"+resp.ID, respData) {
return fmt.Errorf("message listener not found: %s", resp.ID)
}
return nil
}
topicInfo := strings.Split(resp.Topic, ":")
switch topicInfo[0] {
case marketTickerChannel:
var instruments string
if topicInfo[1] == "all" {
instruments = resp.Subject
} else {
instruments = topicInfo[1]
}
return ku.processTicker(resp.Data, instruments, topicInfo[0])
case marketSnapshotChannel:
return ku.processMarketSnapshot(resp.Data, topicInfo[0])
case marketOrderbookChannel:
return ku.processOrderbookWithDepth(respData, topicInfo[1], topicInfo[0])
case marketOrderbookDepth5Channel, marketOrderbookDepth50Channel:
return ku.processOrderbook(resp.Data, topicInfo[1], topicInfo[0])
case marketCandlesChannel:
symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter)
if len(symbolAndInterval) != 2 {
return errMalformedData
}
return ku.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0])
case marketMatchChannel:
return ku.processTradeData(resp.Data, topicInfo[1], topicInfo[0])
case indexPriceIndicatorChannel, markPriceIndicatorChannel:
var response WsPriceIndicator
return ku.processData(resp.Data, &response)
case privateSpotTradeOrders:
return ku.processOrderChangeEvent(resp.Data, topicInfo[0])
case accountBalanceChannel:
return ku.processAccountBalanceChange(resp.Data)
case marginPositionChannel:
if resp.Subject == "debt.ratio" {
var response WsDebtRatioChange
return ku.processData(resp.Data, &response)
}
var response WsPositionStatus
return ku.processData(resp.Data, &response)
case marginLoanChannel:
if resp.Subject == "order.done" {
var response WsMarginTradeOrderDoneEvent
return ku.processData(resp.Data, &response)
} else {
return ku.processMarginLendingTradeOrderEvent(resp.Data)
}
case spotMarketAdvancedChannel:
return ku.processStopOrderEvent(resp.Data)
case futuresTickerChannel:
return ku.processFuturesTickerV2(resp.Data)
case futuresExecutionDataChannel:
var response WsFuturesExecutionData
return ku.processData(resp.Data, &response)
case futuresOrderbookChannel:
if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil {
return err
}
return ku.processFuturesOrderbookLevel2(resp.Data, topicInfo[1])
case futuresOrderbookDepth5Channel, futuresOrderbookDepth50Channel:
if err := ku.ensureFuturesOrderbookSnapshotLoaded(topicInfo[1]); err != nil {
return err
}
return ku.processFuturesOrderbookSnapshot(resp.Data, topicInfo[1])
case futuresContractMarketDataChannel:
switch resp.Subject {
case "mark.index.price":
return ku.processFuturesMarkPriceAndIndexPrice(resp.Data, topicInfo[1])
case "funding.rate":
return ku.processFuturesFundingData(resp.Data, topicInfo[1])
}
case futuresSystemAnnouncementChannel:
return ku.processFuturesSystemAnnouncement(resp.Data, resp.Subject)
case futuresTrasactionStatisticsTimerEventChannel:
return ku.processFuturesTransactionStatistics(resp.Data, topicInfo[1])
case futuresTradeOrderChannel:
return ku.processFuturesPrivateTradeOrders(resp.Data)
case futuresStopOrdersLifecycleEventChannel:
return ku.processFuturesStopOrderLifecycleEvent(resp.Data)
case futuresAccountBalanceEventChannel:
switch resp.Subject {
case "orderMargin.change":
var response WsFuturesOrderMarginEvent
return ku.processData(resp.Data, &response)
case "availableBalance.change":
return ku.processFuturesAccountBalanceEvent(resp.Data)
case "withdrawHold.change":
var response WsFuturesWithdrawalAmountAndTransferOutAmountEvent
return ku.processData(resp.Data, &response)
}
case futuresPositionChangeEventChannel:
switch resp.Subject {
case "position.change":
if resp.ChannelType == "private" {
var response WsFuturesPosition
return ku.processData(resp.Data, &response)
}
var response WsFuturesMarkPricePositionChanges
return ku.processData(resp.Data, &response)
case "position.settlement":
var response WsFuturesPositionFundingSettlement
return ku.processData(resp.Data, &response)
}
default:
ku.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: ku.Name + stream.UnhandledMessage + string(respData),
}
return errors.New("push data not handled")
}
return nil
}
func (ku *Kucoin) processData(respData []byte, resp interface{}) error {
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
ku.Websocket.DataHandler <- resp
return nil
}
func (ku *Kucoin) processFuturesAccountBalanceEvent(respData []byte) error {
resp := WsFuturesAvailableBalance{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
ku.Websocket.DataHandler <- account.Change{
Exchange: ku.Name,
Currency: currency.NewCode(resp.Currency),
Asset: asset.Futures,
Amount: resp.AvailableBalance,
}
return nil
}
func (ku *Kucoin) processFuturesStopOrderLifecycleEvent(respData []byte) error {
resp := WsStopOrderLifecycleEvent{}
err := json.Unmarshal(respData, &resp)
if err != nil {
return err
}
var enabledPairs currency.Pairs
enabledPairs, err = ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
pair, err := enabledPairs.DeriveFrom(resp.Symbol)
if err != nil {
return err
}
oType, err := order.StringToOrderType(resp.OrderType)
if err != nil {
return err
}
side, err := order.StringToOrderSide(resp.Side)
if err != nil {
return err
}
ku.Websocket.DataHandler <- &order.Detail{
Price: resp.OrderPrice,
TriggerPrice: resp.StopPrice,
Amount: resp.Size,
Exchange: ku.Name,
OrderID: resp.OrderID,
Type: oType,
Side: side,
AssetType: asset.Futures,
Date: resp.CreatedAt.Time(),
LastUpdated: resp.Timestamp.Time(),
Pair: pair,
}
return nil
}
func (ku *Kucoin) processFuturesPrivateTradeOrders(respData []byte) error {
resp := WsFuturesTradeOrder{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
oType, err := order.StringToOrderType(resp.OrderType)
if err != nil {
return err
}
oStatus, err := ku.stringToOrderStatus(resp.Status)
if err != nil {
return err
}
var enabledPairs currency.Pairs
enabledPairs, err = ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
pair, err := enabledPairs.DeriveFrom(resp.Symbol)
if err != nil {
return err
}
side, err := order.StringToOrderSide(resp.Side)
if err != nil {
return err
}
ku.Websocket.DataHandler <- &order.Detail{
Type: oType,
Status: oStatus,
Pair: pair,
Side: side,
Amount: resp.OrderSize,
Price: resp.OrderPrice,
Exchange: ku.Name,
ExecutedAmount: resp.FilledSize,
RemainingAmount: resp.RemainSize,
ClientOrderID: resp.ClientOid,
OrderID: resp.TradeID,
AssetType: asset.Futures,
LastUpdated: resp.OrderTime.Time(),
}
return nil
}
func (ku *Kucoin) processFuturesTransactionStatistics(respData []byte, instrument string) error {
resp := WsFuturesTransactionStatisticsTimeEvent{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
resp.Symbol = instrument
return nil
}
func (ku *Kucoin) processFuturesSystemAnnouncement(respData []byte, subject string) error {
resp := WsFuturesFundingBegin{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
resp.Subject = subject
ku.Websocket.DataHandler <- &resp
return nil
}
func (ku *Kucoin) processFuturesFundingData(respData []byte, instrument string) error {
resp := WsFundingRate{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
resp.Symbol = instrument
ku.Websocket.DataHandler <- &resp
return nil
}
func (ku *Kucoin) processFuturesMarkPriceAndIndexPrice(respData []byte, instrument string) error {
resp := WsFuturesMarkPriceAndIndexPrice{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
resp.Symbol = instrument
ku.Websocket.DataHandler <- &resp
return nil
}
// ensureFuturesOrderbookSnapshotLoaded makes sure an initial futures orderbook snapshot is loaded
func (ku *Kucoin) ensureFuturesOrderbookSnapshotLoaded(symbol string) error {
fetchedFuturesOrderbookMutex.Lock()
defer fetchedFuturesOrderbookMutex.Unlock()
if fetchedFuturesOrderbook[symbol] {
return nil
}
fetchedFuturesOrderbook[symbol] = true
enabledPairs, err := ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
cp, err := enabledPairs.DeriveFrom(symbol)
if err != nil {
return err
}
orderbooks, err := ku.FetchOrderbook(context.Background(), cp, asset.Futures)
if err != nil {
return err
}
return ku.Websocket.Orderbook.LoadSnapshot(orderbooks)
}
func (ku *Kucoin) processFuturesOrderbookSnapshot(respData []byte, instrument string) error {
response := WsOrderbookLevel5Response{}
if err := json.Unmarshal(respData, &response); err != nil {
return err
}
resp := response.ExtractOrderbookItems()
enabledPairs, err := ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
cp, err := enabledPairs.DeriveFrom(instrument)
if err != nil {
return err
}
return ku.Websocket.Orderbook.Update(&orderbook.Update{
UpdateID: resp.Sequence,
UpdateTime: resp.Timestamp.Time(),
Asset: asset.Futures,
Bids: resp.Bids,
Asks: resp.Asks,
Pair: cp,
})
}
func (ku *Kucoin) processFuturesOrderbookLevel2(respData []byte, instrument string) error {
resp := WsFuturesOrderbookInfo{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
detail, err := ku.GetFuturesPartOrderbook100(context.Background(), instrument)
if err != nil {
return err
}
enabledPairs, err := ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
pair, err := enabledPairs.DeriveFrom(instrument)
if err != nil {
return err
}
base := orderbook.Update{
UpdateTime: detail.Time,
Pair: pair,
Asset: asset.Futures,
Asks: detail.Asks,
Bids: detail.Bids,
}
return ku.Websocket.Orderbook.Update(&base)
}
func (ku *Kucoin) processFuturesTickerV2(respData []byte) error {
resp := WsFuturesTicker{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
enabledPairs, err := ku.GetEnabledPairs(asset.Futures)
if err != nil {
return err
}
pair, err := enabledPairs.DeriveFrom(resp.Symbol)
if err != nil {
return err
}
ku.Websocket.DataHandler <- &ticker.Price{
AssetType: asset.Futures,
Last: resp.FilledPrice,
Volume: resp.FilledSize,
LastUpdated: resp.FilledTime.Time(),
ExchangeName: ku.Name,
Pair: pair,
Ask: resp.BestAskPrice.Float64(),
Bid: resp.BestBidPrice.Float64(),
AskSize: resp.BestAskSize,
BidSize: resp.BestBidSize,
}
return nil
}
func (ku *Kucoin) processStopOrderEvent(respData []byte) error {
resp := WsStopOrder{}
err := json.Unmarshal(respData, &resp)
if err != nil {
return err
}
var pair currency.Pair
pair, err = currency.NewPairFromString(resp.Symbol)
if err != nil {
return err
}
oType, err := order.StringToOrderType(resp.OrderType)
if err != nil {
return err
}
side, err := order.StringToOrderSide(resp.Side)
if err != nil {
return err
}
ku.Websocket.DataHandler <- &order.Detail{
Price: resp.OrderPrice,
TriggerPrice: resp.StopPrice,
Amount: resp.Size,
Exchange: ku.Name,
OrderID: resp.OrderID,
Type: oType,
Side: side,
AssetType: asset.Spot,
Date: resp.CreatedAt.Time(),
LastUpdated: resp.Timestamp.Time(),
Pair: pair,
}
return nil
}
func (ku *Kucoin) processMarginLendingTradeOrderEvent(respData []byte) error {
resp := WsMarginTradeOrderEntersEvent{}
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
ku.Websocket.DataHandler <- resp
return nil
}
func (ku *Kucoin) processAccountBalanceChange(respData []byte) error {
response := WsAccountBalance{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
ku.Websocket.DataHandler <- account.Change{
Exchange: ku.Name,
Currency: currency.NewCode(response.Currency),
Asset: asset.Futures,
Amount: response.Available,
}
return nil
}
func (ku *Kucoin) processOrderChangeEvent(respData []byte, topic string) error {
response := WsTradeOrder{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
oType, err := order.StringToOrderType(response.OrderType)
if err != nil {
return err
}
oStatus, err := ku.stringToOrderStatus(response.Status)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(response.Symbol)
if err != nil {
return err
}
side, err := order.StringToOrderSide(response.Side)
if err != nil {
return err
}
// TODO: should amend this function as we need to know the order asset type when we call it
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
ku.Websocket.DataHandler <- &order.Detail{
Price: response.Price,
Amount: response.Size,
ExecutedAmount: response.FilledSize,
RemainingAmount: response.RemainSize,
Exchange: ku.Name,
OrderID: response.OrderID,
ClientOrderID: response.ClientOid,
Type: oType,
Side: side,
Status: oStatus,
AssetType: assets[x],
Date: response.OrderTime.Time(),
LastUpdated: response.Timestamp.Time(),
Pair: pair,
}
}
return nil
}
func (ku *Kucoin) processTradeData(respData []byte, instrument, topic string) error {
response := WsTrade{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
saveTradeData := ku.IsSaveTradeDataEnabled()
if !saveTradeData &&
!ku.IsTradeFeedEnabled() {
return nil
}
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
}
side, err := order.StringToOrderSide(response.Side)
if err != nil {
return err
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
err = ku.Websocket.Trade.Update(saveTradeData, trade.Data{
CurrencyPair: pair,
Timestamp: response.Time.Time(),
Price: response.Price,
Amount: response.Size,
Side: side,
Exchange: ku.Name,
TID: response.TradeID,
AssetType: assets[x],
})
if err != nil {
return err
}
}
return nil
}
func (ku *Kucoin) processTicker(respData []byte, instrument, topic string) error {
response := WsTicker{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &ticker.Price{
AssetType: assets[x],
Last: response.Price,
LastUpdated: response.Timestamp.Time(),
ExchangeName: ku.Name,
Pair: pair,
Ask: response.BestAsk,
Bid: response.BestBid,
AskSize: response.BestAskSize,
BidSize: response.BestBidSize,
Volume: response.Size,
}
}
return nil
}
func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalString, topic string) error {
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
}
response := WsCandlestickData{}
err = json.Unmarshal(respData, &response)
if err != nil {
return err
}
resp, err := response.getCandlestickData()
if err != nil {
return err
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &stream.KlineData{
Timestamp: response.Time.Time(),
Pair: pair,
AssetType: assets[x],
Exchange: ku.Name,
StartTime: resp.Candles.StartTime,
Interval: intervalString,
OpenPrice: resp.Candles.OpenPrice,
ClosePrice: resp.Candles.ClosePrice,
HighPrice: resp.Candles.HighPrice,
LowPrice: resp.Candles.LowPrice,
Volume: resp.Candles.TransactionVolume,
}
}
return nil
}
func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument, topic string) error {
pair, err := currency.NewPairFromString(instrument)
if err != nil {
return err
}
result := struct {
Result *WsOrderbook `json:"data"`
}{}
err = json.Unmarshal(respData, &result)
if err != nil {
return err
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
var init bool
init, err = ku.UpdateLocalBuffer(result.Result, assets[x])
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s", ku.Name, assets[x], err)
}
}
return nil
}
// UpdateLocalBuffer updates orderbook buffer and checks status if the book is Initial Sync being via the REST
// protocol.
func (ku *Kucoin) UpdateLocalBuffer(wsdp *WsOrderbook, assetType asset.Item) (bool, error) {
enabledPairs, err := ku.GetEnabledPairs(assetType)
if err != nil {
return false, err
}
format, err := ku.GetPairFormat(assetType, true)
if err != nil {
return false, err
}
currencyPair, err := currency.NewPairFromFormattedPairs(wsdp.Symbol,
enabledPairs,
format)
if err != nil {
return false, err
}
err = ku.obm.stageWsUpdate(wsdp, currencyPair, assetType)
if err != nil {
init, err2 := ku.obm.checkIsInitialSync(currencyPair, assetType)
if err2 != nil {
return false, err2
}
return init, err
}
err = ku.applyBufferUpdate(currencyPair, assetType)
if err != nil {
ku.flushAndCleanup(currencyPair, assetType)
}
return false, err
}
func (ku *Kucoin) processOrderbook(respData []byte, symbol, topic string) error {
var response Level2Depth5Or20
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(symbol)
if err != nil {
return err
}
asks := make([]orderbook.Tranche, len(response.Asks))
for x := range response.Asks {
asks[x].Price = response.Asks[x][0].Float64()
asks[x].Amount = response.Asks[x][1].Float64()
}
bids := make([]orderbook.Tranche, len(response.Bids))
for x := range response.Bids {
bids[x].Price = response.Bids[x][0].Float64()
bids[x].Amount = response.Bids[x][1].Float64()
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
lastUpdated := time.UnixMilli(response.Timestamp)
for x := range assets {
err = ku.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: ku.Name,
Asks: asks,
Bids: bids,
Pair: pair,
Asset: assets[x],
LastUpdated: lastUpdated,
})
if err != nil {
return err
}
}
return nil
}
func (ku *Kucoin) processMarketSnapshot(respData []byte, topic string) error {
response := WsSnapshot{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(response.Data.Symbol)
if err != nil {
return err
}
assets, err := ku.CalculateAssets(topic, pair)
if err != nil {
return err
}
for x := range assets {
if !ku.AssetWebsocketSupport.IsAssetWebsocketSupported(assets[x]) {
continue
}
ku.Websocket.DataHandler <- &ticker.Price{
ExchangeName: ku.Name,
AssetType: assets[x],
Last: response.Data.LastTradedPrice,
Pair: pair,
Low: response.Data.Low,
High: response.Data.High,
QuoteVolume: response.Data.VolValue,
Volume: response.Data.Vol,
Open: response.Data.Open,
Close: response.Data.Close,
LastUpdated: response.Data.Datetime.Time(),
}
}
return nil
}
// Subscribe sends a websocket message to receive data from the channel
func (ku *Kucoin) Subscribe(subscriptions subscription.List) error {
return ku.manageSubscriptions(subscriptions, "subscribe")
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (ku *Kucoin) Unsubscribe(subscriptions subscription.List) error {
return ku.manageSubscriptions(subscriptions, "unsubscribe")
}
func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) error {
var errs error
for _, s := range subs {
msgID := strconv.FormatInt(ku.Websocket.Conn.GenerateMessageID(false), 10)
req := WsSubscriptionInput{
ID: msgID,
Type: operation,
Topic: s.QualifiedChannel,
PrivateChannel: s.Authenticated,
Response: true,
}
if respRaw, err := ku.Websocket.Conn.SendMessageReturnResponse("msgID:"+msgID, req); err != nil {
errs = common.AppendError(errs, err)
} else {
rType, err := jsonparser.GetUnsafeString(respRaw, "type")
switch {
case err != nil:
errs = common.AppendError(errs, err)
case rType == "error":
code, _ := jsonparser.GetUnsafeString(respRaw, "code")
msg, msgErr := jsonparser.GetUnsafeString(respRaw, "data")
if msgErr != nil {
msg = "unknown error"
}
errs = common.AppendError(errs, fmt.Errorf("%s (%s)", msg, code))
case rType != "ack":
errs = common.AppendError(errs, fmt.Errorf("%w: %s from %s", errInvalidMsgType, rType, respRaw))
default:
if operation == "unsubscribe" {
err = ku.Websocket.RemoveSubscriptions(s)
} else {
err = ku.Websocket.AddSuccessfulSubscriptions(s)
if ku.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s", ku.Name, s.Channel)
}
}
if err != nil {
errs = common.AppendError(errs, err)
}
}
}
}
return errs
}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (ku *Kucoin) generateSubscriptions() (subscription.List, error) {
return ku.Features.Subscriptions.ExpandTemplates(ku)
}
// GetSubscriptionTemplate returns a subscription channel template
func (ku *Kucoin) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").
Funcs(template.FuncMap{
"channelName": channelName,
"removeSpotFromMargin": func(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) string {
spotPairs, _ := ku.GetEnabledPairs(asset.Spot)
return removeSpotFromMargin(s, ap, spotPairs)
},
"isCurrencyChannel": isCurrencyChannel,
"isSymbolChannel": isSymbolChannel,
"channelInterval": channelInterval,
"assetCurrencies": assetCurrencies,
"joinPairsWithInterval": joinPairsWithInterval,
"batch": common.Batch[currency.Pairs],
}).
Parse(subTplText)
}
// orderbookManager defines a way of managing and maintaining synchronisation
// across connections and assets.
type orderbookManager struct {
state map[currency.Code]map[currency.Code]map[asset.Item]*update
sync.Mutex
jobs chan job
}
type update struct {
buffer chan *WsOrderbook
fetchingBook bool
initialSync bool
needsFetchingBook bool
lastUpdateID int64
}
// job defines a synchronisation job that tells a go routine to fetch an
// orderbook via the REST protocol
type job struct {
Pair currency.Pair
AssetType asset.Item
}
func (ku *Kucoin) setupOrderbookManager() {
locker.Lock()
defer locker.Unlock()
if ku.obm == nil {
ku.obm = &orderbookManager{
state: make(map[currency.Code]map[currency.Code]map[asset.Item]*update),
jobs: make(chan job, maxWSOrderbookJobs),
}
} else {
// Change state on reconnect for initial sync.
ku.obm.Mutex.Lock()
for _, m1 := range ku.obm.state {
for _, m2 := range m1 {
for _, idk := range m2 {
idk.initialSync = true
idk.needsFetchingBook = true
idk.lastUpdateID = 0
}
}
}
ku.obm.Mutex.Unlock()
}
for i := 0; i < maxWSOrderbookWorkers; i++ {
// 10 workers for synchronising book
ku.SynchroniseWebsocketOrderbook()
}
}
// ProcessUpdate processes the websocket orderbook update
func (ku *Kucoin) ProcessUpdate(cp currency.Pair, a asset.Item, ws *WsOrderbook) error {
updateBid := make([]orderbook.Tranche, len(ws.Changes.Bids))
for i := range ws.Changes.Bids {
p, err := strconv.ParseFloat(ws.Changes.Bids[i][0], 64)
if err != nil {
return err
}
a, err := strconv.ParseFloat(ws.Changes.Bids[i][1], 64)
if err != nil {
return err
}
var sequence int64
if len(ws.Changes.Bids[i]) > 2 && ws.Changes.Bids[i][2] != "" {
sequence, err = strconv.ParseInt(ws.Changes.Bids[i][2], 10, 64)
if err != nil {
return err
}
}
updateBid[i] = orderbook.Tranche{Price: p, Amount: a, ID: sequence}
}
updateAsk := make([]orderbook.Tranche, len(ws.Changes.Asks))
for i := range ws.Changes.Asks {
p, err := strconv.ParseFloat(ws.Changes.Asks[i][0], 64)
if err != nil {
return err
}
a, err := strconv.ParseFloat(ws.Changes.Asks[i][1], 64)
if err != nil {
return err
}
var sequence int64
if len(ws.Changes.Asks[i]) > 2 && ws.Changes.Asks[i][2] != "" {
sequence, err = strconv.ParseInt(ws.Changes.Asks[i][2], 10, 64)
if err != nil {
return err
}
}
updateAsk[i] = orderbook.Tranche{Price: p, Amount: a, ID: sequence}
}
return ku.Websocket.Orderbook.Update(&orderbook.Update{
Bids: updateBid,
Asks: updateAsk,
Pair: cp,
UpdateID: ws.SequenceEnd,
UpdateTime: ws.TimeMS.Time(),
Asset: a,
})
}
// applyBufferUpdate applies the buffer to the orderbook or initiates a new
// orderbook sync by the REST protocol which is off handed to go routine.
func (ku *Kucoin) applyBufferUpdate(pair currency.Pair, assetType asset.Item) error {
fetching, needsFetching, err := ku.obm.handleFetchingBook(pair, assetType)
if err != nil {
return err
}
if fetching {
return nil
}
if needsFetching {
if ku.Verbose {
log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", ku.Name)
}
return ku.obm.fetchBookViaREST(pair, assetType)
}
recent, err := ku.Websocket.Orderbook.GetOrderbook(pair, assetType)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error fetching recent orderbook when applying updates: %s\n",
ku.Name,
err)
}
if recent != nil {
err = ku.obm.checkAndProcessUpdate(ku.ProcessUpdate, pair, assetType, recent)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error processing update - initiating new orderbook sync via REST: %s\n",
ku.Name,
err)
err = ku.obm.setNeedsFetchingBook(pair, assetType)
if err != nil {
return err
}
}
}
return nil
}
// setNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair, assetType asset.Item) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
assetType)
}
state.needsFetchingBook = true
return nil
}
// SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair
// asset
func (ku *Kucoin) SynchroniseWebsocketOrderbook() {
ku.Websocket.Wg.Add(1)
go func() {
defer ku.Websocket.Wg.Done()
for {
select {
case <-ku.Websocket.ShutdownC:
for {
select {
case <-ku.obm.jobs:
default:
return
}
}
case j := <-ku.obm.jobs:
err := ku.processJob(j.Pair, j.AssetType)
if err != nil {
log.Errorf(log.WebsocketMgr,
"%s processing websocket orderbook error %v",
ku.Name, err)
}
}
}
}()
}
// SeedLocalCache seeds depth data
func (ku *Kucoin) SeedLocalCache(ctx context.Context, p currency.Pair, assetType asset.Item) error {
var ob *Orderbook
var err error
ob, err = ku.GetPartOrderbook100(ctx, p.String())
if err != nil {
return err
}
if ob.Sequence <= 0 {
return fmt.Errorf("%w p", errMissingOrderbookSequence)
}
return ku.SeedLocalCacheWithBook(p, ob, assetType)
}
// SeedLocalCacheWithBook seeds the local orderbook cache
func (ku *Kucoin) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *Orderbook, assetType asset.Item) error {
newOrderBook := orderbook.Base{
Pair: p,
Asset: assetType,
Exchange: ku.Name,
LastUpdated: time.Now(),
LastUpdateID: orderbookNew.Sequence,
VerifyOrderbook: ku.CanVerifyOrderbook,
Bids: make(orderbook.Tranches, len(orderbookNew.Bids)),
Asks: make(orderbook.Tranches, len(orderbookNew.Asks)),
}
for i := range orderbookNew.Bids {
newOrderBook.Bids[i] = orderbook.Tranche{
Amount: orderbookNew.Bids[i].Amount,
Price: orderbookNew.Bids[i].Price,
}
}
for i := range orderbookNew.Asks {
newOrderBook.Asks[i] = orderbook.Tranche{
Amount: orderbookNew.Asks[i].Amount,
Price: orderbookNew.Asks[i].Price,
}
}
return ku.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// processJob fetches and processes orderbook updates
func (ku *Kucoin) processJob(p currency.Pair, assetType asset.Item) error {
err := ku.SeedLocalCache(context.TODO(), p, assetType)
if err != nil {
err = ku.obm.stopFetchingBook(p, assetType)
if err != nil {
return err
}
return fmt.Errorf("%s %s seeding local cache for orderbook error: %v",
p, assetType, err)
}
err = ku.obm.stopFetchingBook(p, assetType)
if err != nil {
return err
}
// Immediately apply the buffer updates so we don't wait for a
// new update to initiate this.
err = ku.applyBufferUpdate(p, assetType)
if err != nil {
ku.flushAndCleanup(p, assetType)
return err
}
return nil
}
// flushAndCleanup flushes orderbook and clean local cache
func (ku *Kucoin) flushAndCleanup(p currency.Pair, assetType asset.Item) {
errClean := ku.Websocket.Orderbook.FlushOrderbook(p, assetType)
if errClean != nil {
log.Errorf(log.WebsocketMgr,
"%s flushing websocket error: %v",
ku.Name,
errClean)
}
errClean = ku.obm.cleanup(p, assetType)
if errClean != nil {
log.Errorf(log.WebsocketMgr, "%s cleanup websocket error: %v",
ku.Name,
errClean)
}
}
// stageWsUpdate stages websocket update to roll through updates that need to
// be applied to a fetched orderbook via REST.
func (o *orderbookManager) stageWsUpdate(u *WsOrderbook, pair currency.Pair, a asset.Item) error {
o.Lock()
defer o.Unlock()
m1, ok := o.state[pair.Base]
if !ok {
m1 = make(map[currency.Code]map[asset.Item]*update)
o.state[pair.Base] = m1
}
m2, ok := m1[pair.Quote]
if !ok {
m2 = make(map[asset.Item]*update)
m1[pair.Quote] = m2
}
state, ok := m2[a]
if !ok {
state = &update{
// 100ms update assuming we might have up to a 10 second delay.
// There could be a potential 100 updates for the currency.
buffer: make(chan *WsOrderbook, maxWSUpdateBuffer),
fetchingBook: false,
initialSync: true,
needsFetchingBook: true,
}
m2[a] = state
}
if state.lastUpdateID != 0 && u.SequenceStart > state.lastUpdateID+1 {
// Apply the new Level 2 data flow to the local snapshot to ensure that sequenceStart(new)<=sequenceEnd+1(old) and sequenceEnd(new) > sequenceEnd(old)
return fmt.Errorf("websocket orderbook synchronisation failure for pair %s and asset %s", pair, a)
}
state.lastUpdateID = u.SequenceEnd
select {
// Put update in the channel buffer to be processed
case state.buffer <- u:
return nil
default:
<-state.buffer // pop one element
state.buffer <- u // to shift buffer on fail
return fmt.Errorf("channel blockage for %s, asset %s and connection",
pair, a)
}
}
// handleFetchingBook checks if a full book is being fetched or needs to be
// fetched
func (o *orderbookManager) handleFetchingBook(pair currency.Pair, assetType asset.Item) (fetching, needsFetching bool, err error) {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return false,
false,
fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s",
pair,
assetType)
}
if state.fetchingBook {
return true, false, nil
}
if state.needsFetchingBook {
state.needsFetchingBook = false
state.fetchingBook = true
return false, true, nil
}
return false, false, nil
}
// stopFetchingBook completes the book fetching.
func (o *orderbookManager) stopFetchingBook(pair currency.Pair, assetType asset.Item) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
assetType)
}
if !state.fetchingBook {
return fmt.Errorf("fetching book already set to false for %s %s",
pair,
assetType)
}
state.fetchingBook = false
return nil
}
// completeInitialSync sets if an asset type has completed its initial sync
func (o *orderbookManager) completeInitialSync(pair currency.Pair, assetType asset.Item) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("complete initial sync cannot match currency pair %s asset type %s",
pair,
assetType)
}
if !state.initialSync {
return fmt.Errorf("initial sync already set to false for %s %s",
pair,
assetType)
}
state.initialSync = false
return nil
}
// checkIsInitialSync checks status if the book is Initial Sync being via the REST
// protocol.
func (o *orderbookManager) checkIsInitialSync(pair currency.Pair, assetType asset.Item) (bool, error) {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return false,
fmt.Errorf("checkIsInitialSync of orderbook cannot match currency pair %s asset type %s",
pair,
assetType)
}
return state.initialSync, nil
}
// fetchBookViaREST pushes a job of fetching the orderbook via the REST protocol
// to get an initial full book that we can apply our buffered updates too.
func (o *orderbookManager) fetchBookViaREST(pair currency.Pair, assetType asset.Item) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("fetch book via rest cannot match currency pair %s asset type %s",
pair,
assetType)
}
state.initialSync = true
state.fetchingBook = true
select {
case o.jobs <- job{pair, assetType}:
return nil
default:
return fmt.Errorf("%s %s book synchronisation channel blocked up",
pair,
assetType)
}
}
func (o *orderbookManager) checkAndProcessUpdate(processor func(currency.Pair, asset.Item, *WsOrderbook) error, pair currency.Pair, assetType asset.Item, recent *orderbook.Base) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("could not match pair [%s] asset type [%s] in hash table to process websocket orderbook update",
pair, assetType)
}
// This will continuously remove updates from the buffered channel and
// apply them to the current orderbook.
buffer:
for {
select {
case d := <-state.buffer:
process, err := state.validate(d, recent)
if err != nil {
return err
}
if process {
err := processor(pair, assetType, d)
if err != nil {
return fmt.Errorf("%s %s processing update error: %w",
pair, assetType, err)
}
}
default:
break buffer
}
}
return nil
}
// validate checks for correct update alignment
func (u *update) validate(updt *WsOrderbook, recent *orderbook.Base) (bool, error) {
if updt.SequenceEnd <= recent.LastUpdateID {
// Drop any event where u is <= lastUpdateId in the snapshot.
return false, nil
}
id := recent.LastUpdateID + 1
if u.initialSync {
// The first processed event should have U <= lastUpdateId+1 AND
// u >= lastUpdateId+1.
if updt.SequenceStart > id || updt.SequenceEnd < id {
return false, fmt.Errorf("initial websocket orderbook sync failure for pair %s and asset %s",
recent.Pair,
recent.Asset)
}
u.initialSync = false
}
return true, nil
}
// cleanup cleans up buffer and reset fetch and init
func (o *orderbookManager) cleanup(pair currency.Pair, assetType asset.Item) error {
o.Lock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
o.Unlock()
return fmt.Errorf("cleanup cannot match %s %s to hash table",
pair,
assetType)
}
bufferEmpty:
for {
select {
case <-state.buffer:
// bleed and discard buffer
default:
break bufferEmpty
}
}
o.Unlock()
// disable rest orderbook synchronisation
_ = o.stopFetchingBook(pair, assetType)
_ = o.completeInitialSync(pair, assetType)
_ = o.stopNeedsFetchingBook(pair, assetType)
return nil
}
// stopNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair, assetType asset.Item) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][assetType]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
assetType)
}
if !state.needsFetchingBook {
return fmt.Errorf("needs fetching book already set to false for %s %s",
pair,
assetType)
}
state.needsFetchingBook = false
return nil
}
// CalculateAssets returns the available asset types for a currency pair
func (ku *Kucoin) CalculateAssets(topic string, cp currency.Pair) ([]asset.Item, error) {
switch {
case cp.Quote.Equal(currency.USDTM), strings.HasPrefix(topic, "/contract"):
if err := ku.CurrencyPairs.IsAssetEnabled(asset.Futures); err != nil {
if !errors.Is(err, asset.ErrNotSupported) {
return nil, err
}
return nil, nil
}
return []asset.Item{asset.Futures}, nil
case strings.HasPrefix(topic, "/margin"), strings.HasPrefix(topic, "/index"):
if err := ku.CurrencyPairs.IsAssetEnabled(asset.Margin); err != nil {
if !errors.Is(err, asset.ErrNotSupported) {
return nil, err
}
return nil, nil
}
return []asset.Item{asset.Margin}, nil
default:
resp := make([]asset.Item, 0, 2)
spotEnabled, err := ku.IsPairEnabled(cp, asset.Spot)
if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) {
return nil, err
}
if spotEnabled {
resp = append(resp, asset.Spot)
}
marginEnabled, err := ku.IsPairEnabled(cp, asset.Margin)
if err != nil && !errors.Is(currency.ErrCurrencyNotFound, err) {
return nil, err
}
if marginEnabled {
resp = append(resp, asset.Margin)
}
return resp, nil
}
}
// checkSubscriptions looks for any backwards incompatibilities with missing assets
// This should be unnecessary and removable by 2025
func (ku *Kucoin) checkSubscriptions() {
upgraded := false
for _, s := range ku.Config.Features.Subscriptions {
if s.Asset != asset.Empty {
continue
}
upgraded = true
s.Channel = strings.TrimSuffix(s.Channel, ":%s")
switch s.Channel {
case subscription.TickerChannel, subscription.OrderbookChannel:
s.Asset = asset.All
case subscription.AllTradesChannel:
for _, d := range defaultSubscriptions {
if d.Channel == s.Channel {
ku.Config.Features.Subscriptions = append(ku.Config.Features.Subscriptions, d)
}
}
case futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel:
s.Asset = asset.Futures
case marginPositionChannel, marginLoanChannel:
s.Asset = asset.Margin
}
}
ku.Config.Features.Subscriptions = slices.DeleteFunc(ku.Config.Features.Subscriptions, func(s *subscription.Subscription) bool {
switch s.Channel {
case "/contractMarket/level2Depth50", // Replaced by subsctiption.Orderbook for asset.All
"/contractMarket/tickerV2", // Replaced by subscription.Ticker for asset.All
"/margin/fundingBook": // Deprecated and removed
return true
case subscription.AllTradesChannel:
return s.Asset == asset.Empty
}
return false
})
if upgraded {
ku.Features.Subscriptions = subscription.List{}
for _, s := range ku.Config.Features.Subscriptions {
if s.Enabled {
ku.Features.Subscriptions = append(ku.Features.Subscriptions, s)
}
}
}
}
// channelName returns the correct channel name for the asset
func channelName(s *subscription.Subscription, a asset.Item) string {
switch s.Channel {
case subscription.TickerChannel:
if a == asset.Futures {
return futuresTickerChannel
}
return marketTickerChannel
case subscription.OrderbookChannel:
if a == asset.Futures {
return futuresOrderbookDepth5Channel
}
return marketOrderbookDepth5Channel // This does not require a REST request to get the orderbook.
case subscription.CandlesChannel:
return marketCandlesChannel // No support in GCT yet for Futures candles
case subscription.AllTradesChannel:
return marketMatchChannel // No support in GCT yet for Futures all trades
}
return s.Channel
}
// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map for subscriptions to non-margin endpoints
func removeSpotFromMargin(s *subscription.Subscription, ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
if strings.HasPrefix(s.Channel, "/margin") {
return ""
}
if p, ok := ap[asset.Margin]; ok {
ap[asset.Margin] = p.Remove(spotPairs...)
}
return ""
}
// isSymbolChannel returns if the channel expects receive a symbol
func isSymbolChannel(s *subscription.Subscription) bool {
switch channelName(s, s.Asset) {
case privateSpotTradeOrders, accountBalanceChannel, marginPositionChannel, spotMarketAdvancedChannel, futuresSystemAnnouncementChannel,
futuresTradeOrderChannel, futuresStopOrdersLifecycleEventChannel, futuresAccountBalanceEventChannel:
return false
}
return true
}
// isCurrencyChannel returns if the channel expects receive a currency
func isCurrencyChannel(s *subscription.Subscription) bool {
return s.Channel == marginLoanChannel
}
// channelInterval returns the channel interval if it has one
func channelInterval(s *subscription.Subscription) string {
if channelName(s, s.Asset) == marketCandlesChannel {
if i, err := intervalToString(s.Interval); err == nil {
return i
}
}
return ""
}
// assetCurrencies returns the currencies from all pairs in an asset
// Updates the AssetPairs map parameter to contain only those currencies as Base items for expandTemplates to see
func assetCurrencies(s *subscription.Subscription, ap map[asset.Item]currency.Pairs) currency.Currencies {
cs := common.SortStrings(ap[s.Asset].GetCurrencies())
p := currency.Pairs{}
for _, c := range cs {
p = append(p, currency.Pair{Base: c})
}
ap[s.Asset] = p
return cs
}
// joinPairsWithInterval returns a list of currency pair symbols joined by comma
// If the subscription has a viable interval it's appended after each symbol
func joinPairsWithInterval(b currency.Pairs, s *subscription.Subscription) string {
out := make([]string, len(b))
suffix, err := intervalToString(s.Interval)
if err == nil {
suffix = "_" + suffix
}
for i, p := range b {
out[i] = p.String() + suffix
}
return strings.Join(out, ",")
}
const subTplText = `
{{- removeSpotFromMargin $.S $.AssetPairs -}}
{{- if isCurrencyChannel $.S }}
{{ channelName $.S $.S.Asset -}} : {{- (assetCurrencies $.S $.AssetPairs).Join -}}
{{- else if isSymbolChannel $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- with $name := channelName $.S $asset }}
{{- if and (eq $name "/market/ticker") (gt (len $pairs) 10) -}}
{{- $name -}} :all
{{- with $i := channelInterval $.S -}}_{{- $i -}}{{- end -}}
{{- $.BatchSize -}} {{ len $pairs }}
{{- else -}}
{{- range $b := batch $pairs 100 -}}
{{- $name -}} : {{- joinPairsWithInterval $b $.S -}}
{{ $.PairSeparator }}
{{- end -}}
{{- $.BatchSize -}} 100
{{- end }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{- else -}}
{{ channelName $.S $.S.Asset }}
{{- end }}
`