diff --git a/exchanges/okx/okx_test.go b/exchanges/okx/okx_test.go index 7a34c7d4..a5d86fde 100644 --- a/exchanges/okx/okx_test.go +++ b/exchanges/okx/okx_test.go @@ -28,7 +28,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -3756,3 +3758,67 @@ func TestGetCurrencyTradeURL(t *testing.T) { assert.NotEmpty(t, resp) } } + +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + ok := new(Okx) + require.NoError(t, testexch.Setup(ok), "Test instance Setup must not error") + + ok.Websocket.SetCanUseAuthenticatedEndpoints(true) + subs, err := ok.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + exp := subscription.List{ + {Channel: subscription.MyAccountChannel, QualifiedChannel: `{"channel":"account"}`, Authenticated: true}, + } + for _, s := range ok.Features.Subscriptions { + for _, a := range ok.GetAssetTypes(true) { + if s.Asset != asset.All && s.Asset != a { + continue + } + pairs, err := ok.GetEnabledPairs(a) + require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a) + pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: true, Delimiter: "-"}) + s := s.Clone() //nolint:govet // Intentional lexical scope shadow + s.Asset = a + name := channelName(s) + if isSymbolChannel(s) { + for i, p := range pairs { + s := s.Clone() //nolint:govet // Intentional lexical scope shadow + s.QualifiedChannel = fmt.Sprintf(`{"channel":%q,"instID":%q}`, name, p) + s.Pairs = pairs[i : i+1] + exp = append(exp, s) + } + } else { + s := s.Clone() //nolint:govet // Intentional lexical scope shadow + if isAssetChannel(s) { + s.QualifiedChannel = fmt.Sprintf(`{"channel":%q,"instType":%q}`, name, ok.GetInstrumentTypeFromAssetItem(s.Asset)) + } else { + s.QualifiedChannel = `{"channel":"` + name + `"}` + } + s.Pairs = pairs + exp = append(exp, s) + } + } + } + testsubs.EqualLists(t, exp, subs) +} + +func TestGenerateGridSubscriptions(t *testing.T) { + t.Parallel() + + ok := new(Okx) + require.NoError(t, testexch.Setup(ok), "Test instance Setup must not error") + + ok.Features.Subscriptions = subscription.List{{Channel: okxChannelGridPositions, Params: map[string]any{"algoId": "42"}}} + subs, err := ok.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + exp := subscription.List{{Channel: okxChannelGridPositions, Params: map[string]any{"algoId": "42"}, QualifiedChannel: `{"channel":"grid-positions","algoId":"42"}`}} + testsubs.EqualLists(t, exp, subs) + + ok.Features.Subscriptions = subscription.List{{Channel: okxChannelGridPositions}} + subs, err = ok.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + exp = subscription.List{{Channel: okxChannelGridPositions, QualifiedChannel: `{"channel":"grid-positions"}`}} + testsubs.EqualLists(t, exp, subs) +} diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index ab53be1f..83034525 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -10,6 +10,7 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" "github.com/gorilla/websocket" @@ -31,20 +32,6 @@ var ( errInvalidChecksum = errors.New("invalid checksum") ) -var ( - // defaultSubscribedChannels list of channels which are subscribed by default - defaultSubscribedChannels = []string{ - okxChannelTrades, - okxChannelOrderBooks, - okxChannelTickers, - } - // defaultAuthChannels list of channels which are subscribed when authenticated - defaultAuthChannels = []string{ - okxChannelAccount, - okxChannelOrders, - } -) - var ( candlestickChannelsMap = map[string]bool{okxChannelCandle1Y: true, okxChannelCandle6M: true, okxChannelCandle3M: true, okxChannelCandle1M: true, okxChannelCandle1W: true, okxChannelCandle1D: true, okxChannelCandle2D: true, okxChannelCandle3D: true, okxChannelCandle5D: true, okxChannelCandle12H: true, okxChannelCandle6H: true, okxChannelCandle4H: true, okxChannelCandle2H: true, okxChannelCandle1H: true, okxChannelCandle30m: true, okxChannelCandle15m: true, okxChannelCandle5m: true, okxChannelCandle3m: true, okxChannelCandle1m: true, okxChannelCandle1Yutc: true, okxChannelCandle3Mutc: true, okxChannelCandle1Mutc: true, okxChannelCandle1Wutc: true, okxChannelCandle1Dutc: true, okxChannelCandle2Dutc: true, okxChannelCandle3Dutc: true, okxChannelCandle5Dutc: true, okxChannelCandle12Hutc: true, okxChannelCandle6Hutc: true} candlesticksMarkPriceMap = map[string]bool{okxChannelMarkPriceCandle1Y: true, okxChannelMarkPriceCandle6M: true, okxChannelMarkPriceCandle3M: true, okxChannelMarkPriceCandle1M: true, okxChannelMarkPriceCandle1W: true, okxChannelMarkPriceCandle1D: true, okxChannelMarkPriceCandle2D: true, okxChannelMarkPriceCandle3D: true, okxChannelMarkPriceCandle5D: true, okxChannelMarkPriceCandle12H: true, okxChannelMarkPriceCandle6H: true, okxChannelMarkPriceCandle4H: true, okxChannelMarkPriceCandle2H: true, okxChannelMarkPriceCandle1H: true, okxChannelMarkPriceCandle30m: true, okxChannelMarkPriceCandle15m: true, okxChannelMarkPriceCandle5m: true, okxChannelMarkPriceCandle3m: true, okxChannelMarkPriceCandle1m: true, okxChannelMarkPriceCandle1Yutc: true, okxChannelMarkPriceCandle3Mutc: true, okxChannelMarkPriceCandle1Mutc: true, okxChannelMarkPriceCandle1Wutc: true, okxChannelMarkPriceCandle1Dutc: true, okxChannelMarkPriceCandle2Dutc: true, okxChannelMarkPriceCandle3Dutc: true, okxChannelMarkPriceCandle5Dutc: true, okxChannelMarkPriceCandle12Hutc: true, okxChannelMarkPriceCandle6Hutc: true} @@ -214,6 +201,22 @@ const ( okxChannelMarkPriceCandle6Hutc = markPrice + okxChannelCandle6Hutc ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.All, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel}, + {Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel}, + {Enabled: true, Asset: asset.All, Channel: subscription.MyOrdersChannel, Authenticated: true}, + {Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true}, +} + +var subscriptionNames = map[string]string{ + subscription.AllTradesChannel: okxChannelTrades, + subscription.OrderbookChannel: okxChannelOrderBooks, + subscription.TickerChannel: okxChannelTickers, + subscription.MyAccountChannel: okxChannelAccount, + subscription.MyOrdersChannel: okxChannelOrders, +} + // WsConnect initiates a websocket connection func (ok *Okx) WsConnect() error { if !ok.Websocket.IsEnabled() || !ok.IsEnabled() { @@ -360,119 +363,23 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe subscription.List) error { // handleSubscription sends a subscription and unsubscription information thought the websocket endpoint. // as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects. -func (ok *Okx) handleSubscription(operation string, subscriptions subscription.List) error { +func (ok *Okx) handleSubscription(operation string, subs subscription.List) error { reqs := WSSubscriptionInformationList{Operation: operation} authRequests := WSSubscriptionInformationList{Operation: operation} ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() var channels subscription.List var authChannels subscription.List - for i := 0; i < len(subscriptions); i++ { - s := subscriptions[i] - if len(s.Pairs) > 1 { - return subscription.ErrNotSinglePair - } - arg := SubscriptionInfo{ - Channel: s.Channel, - } - var instrumentID string - var underlying string - var okay bool - var instrumentType string - var authSubscription bool - var algoID string - var uid string - - switch arg.Channel { - case okxChannelAccount, - okxChannelPositions, - okxChannelBalanceAndPosition, - okxChannelOrders, - okxChannelAlgoOrders, - okxChannelAlgoAdvance, - okxChannelLiquidationWarning, - okxChannelAccountGreeks, - okxChannelRfqs, - okxChannelQuotes, - okxChannelStructureBlockTrades, - okxChannelSpotGridOrder, - okxChannelGridOrdersContract, - okxChannelGridPositions, - okcChannelGridSubOrders: - authSubscription = true + var errs error + for i := 0; i < len(subs); i++ { + s := subs[i] + var arg SubscriptionInfo + if err := json.Unmarshal([]byte(s.QualifiedChannel), &arg); err != nil { + errs = common.AppendError(errs, err) + continue } - if arg.Channel == okxChannelGridPositions { - algoID, _ = s.Params["algoId"].(string) - } - - if arg.Channel == okcChannelGridSubOrders || - arg.Channel == okxChannelGridPositions { - uid, _ = s.Params["uid"].(string) - } - - if strings.HasPrefix(arg.Channel, "candle") || - arg.Channel == okxChannelTickers || - arg.Channel == okxChannelOrderBooks || - arg.Channel == okxChannelOrderBooks5 || - arg.Channel == okxChannelOrderBooks50TBT || - arg.Channel == okxChannelOrderBooksTBT || - arg.Channel == okxChannelFundingRate || - arg.Channel == okxChannelTrades { - if s.Params["instId"] != "" { - instrumentID, okay = s.Params["instId"].(string) - if !okay { - instrumentID = "" - } - } else if s.Params["instrumentID"] != "" { - instrumentID, okay = s.Params["instrumentID"].(string) - if !okay { - instrumentID = "" - } - } - if instrumentID == "" { - if len(s.Pairs) != 1 { - return subscription.ErrNotSinglePair - } - format, err := ok.GetPairFormat(s.Asset, false) - if err != nil { - return err - } - p := s.Pairs[0] - if p.Base.String() == "" || p.Quote.String() == "" { - return errIncompleteCurrencyPair - } - instrumentID = format.Format(p) - } - } - if arg.Channel == okxChannelInstruments || - arg.Channel == okxChannelPositions || - arg.Channel == okxChannelOrders || - arg.Channel == okxChannelAlgoOrders || - arg.Channel == okxChannelAlgoAdvance || - arg.Channel == okxChannelLiquidationWarning || - arg.Channel == okxChannelSpotGridOrder || - arg.Channel == okxChannelGridOrdersContract || - arg.Channel == okxChannelEstimatedPrice { - instrumentType = ok.GetInstrumentTypeFromAssetItem(s.Asset) - } - - if arg.Channel == okxChannelPositions || - arg.Channel == okxChannelOrders || - arg.Channel == okxChannelAlgoOrders || - arg.Channel == okxChannelEstimatedPrice || - arg.Channel == okxChannelOptSummary { - if len(s.Pairs) == 1 { - underlying, _ = ok.GetUnderlying(s.Pairs[0], s.Asset) - } - } - arg.InstrumentID = instrumentID - arg.Underlying = underlying - arg.InstrumentType = instrumentType - arg.UID = uid - arg.AlgoID = algoID - - if authSubscription { + if s.Authenticated { authChannels = append(authChannels, s) authRequests.Arguments = append(authRequests.Arguments, arg) authChunk, err := json.Marshal(authRequests) @@ -1300,45 +1207,19 @@ func (ok *Okx) wsProcessTickers(data []byte) error { return nil } -// GenerateDefaultSubscriptions returns a list of default subscription message. -func (ok *Okx) GenerateDefaultSubscriptions() (subscription.List, error) { - var subscriptions subscription.List - assets := ok.GetAssetTypes(true) - subs := make([]string, 0, len(defaultSubscribedChannels)+len(defaultAuthChannels)) - subs = append(subs, defaultSubscribedChannels...) - if ok.Websocket.CanUseAuthenticatedEndpoints() { - subs = append(subs, defaultAuthChannels...) - } - for c := range subs { - switch subs[c] { - case okxChannelOrders: - for x := range assets { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: subs[c], - Asset: assets[x], - }) - } - case okxChannelCandle5m, okxChannelTickers, okxChannelOrderBooks, okxChannelFundingRate, okxChannelOrderBooks5, okxChannelOrderBooks50TBT, okxChannelOrderBooksTBT, okxChannelTrades: - for x := range assets { - pairs, err := ok.GetEnabledPairs(assets[x]) - if err != nil { - return nil, err - } - for p := range pairs { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: subs[c], - Asset: assets[x], - Pairs: currency.Pairs{pairs[p]}, - }) - } - } - default: - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: subs[c], - }) - } - } - return subscriptions, nil +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature +func (ok *Okx) generateSubscriptions() (subscription.List, error) { + return ok.Features.Subscriptions.ExpandTemplates(ok) +} + +// GetSubscriptionTemplate returns a subscription channel template +func (ok *Okx) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{ + "channelName": channelName, + "isSymbolChannel": isSymbolChannel, + "isAssetChannel": isAssetChannel, + "instType": ok.GetInstrumentTypeFromAssetItem, + }).Parse(subTplText) } // wsProcessPushData processes push data coming through the websocket channel @@ -2111,3 +1992,45 @@ func (ok *Okx) PublicStructureBlockTradesSubscription(operation string, assetTyp func (ok *Okx) BlockTickerSubscription(operation string, assetType asset.Item, pair currency.Pair) error { return ok.wsChannelSubscription(operation, okxChannelBlockTickers, assetType, pair, false, true, false) } + +// channelName converts global subscription channel names to exchange specific names +func channelName(s *subscription.Subscription) string { + if s, ok := subscriptionNames[s.Channel]; ok { + return s + } + return s.Channel +} + +// isAssetChannel returns if the channel expects one Asset per subscription +func isAssetChannel(s *subscription.Subscription) bool { + return s.Channel == subscription.MyOrdersChannel +} + +// isSymbolChannel returns if the channel expects one Symbol per subscription +func isSymbolChannel(s *subscription.Subscription) bool { + switch s.Channel { + case subscription.CandlesChannel, subscription.TickerChannel, subscription.OrderbookChannel, subscription.AllTradesChannel, okxChannelFundingRate: + return true + } + return false +} + +const subTplText = ` +{{- with $name := channelName $.S }} + {{- range $asset, $pairs := $.AssetPairs }} + {{- if isAssetChannel $.S -}} + {"channel":"{{ $name }}","instType":"{{ instType $asset }}"} + {{- else if isSymbolChannel $.S }} + {{- range $p := $pairs -}} + {"channel":"{{ $name }}","instID":"{{ $p }}"} + {{ $.PairSeparator }} + {{- end -}} + {{- else }} + {"channel":"{{ $name }}" + {{- with $algoId := index $.S.Params "algoId" -}} ,"algoId":"{{ $algoId }}" {{- end -}} + } + {{- end }} + {{- $.AssetSeparator }} + {{- end }} +{{- end }} +` diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index debfb054..344fa4ba 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -156,6 +156,7 @@ func (ok *Okx) SetDefaults() { GlobalResultLimit: 100, // Reference: https://www.okx.com/docs-v5/en/#rest-api-market-data-get-candlesticks-history }, }, + Subscriptions: defaultSubscriptions.Clone(), } ok.Requester, err = request.New(ok.Name, common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout), @@ -211,7 +212,7 @@ func (ok *Okx) Setup(exch *config.Exchange) error { Connector: ok.WsConnect, Subscriber: ok.Subscribe, Unsubscriber: ok.Unsubscribe, - GenerateSubscriptions: ok.GenerateDefaultSubscriptions, + GenerateSubscriptions: ok.generateSubscriptions, Features: &ok.Features.Supports.WebsocketCapabilities, MaxWebsocketSubscriptionsPerConnection: 240, OrderbookBufferConfig: buffer.Config{