Okx: Add subscription configuration (#1605)

This commit is contained in:
Gareth Kirwan
2024-11-12 03:15:02 +01:00
committed by GitHub
parent 72a2a16415
commit 0c0ae7bcaa
3 changed files with 149 additions and 159 deletions

View File

@@ -28,7 +28,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -3756,3 +3758,67 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
ok := new(Okx)
require.NoError(t, testexch.Setup(ok), "Test instance Setup must not error")
ok.Websocket.SetCanUseAuthenticatedEndpoints(true)
subs, err := ok.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{
{Channel: subscription.MyAccountChannel, QualifiedChannel: `{"channel":"account"}`, Authenticated: true},
}
for _, s := range ok.Features.Subscriptions {
for _, a := range ok.GetAssetTypes(true) {
if s.Asset != asset.All && s.Asset != a {
continue
}
pairs, err := ok.GetEnabledPairs(a)
require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a)
pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: true, Delimiter: "-"})
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.Asset = a
name := channelName(s)
if isSymbolChannel(s) {
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = fmt.Sprintf(`{"channel":%q,"instID":%q}`, name, p)
s.Pairs = pairs[i : i+1]
exp = append(exp, s)
}
} else {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
if isAssetChannel(s) {
s.QualifiedChannel = fmt.Sprintf(`{"channel":%q,"instType":%q}`, name, ok.GetInstrumentTypeFromAssetItem(s.Asset))
} else {
s.QualifiedChannel = `{"channel":"` + name + `"}`
}
s.Pairs = pairs
exp = append(exp, s)
}
}
}
testsubs.EqualLists(t, exp, subs)
}
func TestGenerateGridSubscriptions(t *testing.T) {
t.Parallel()
ok := new(Okx)
require.NoError(t, testexch.Setup(ok), "Test instance Setup must not error")
ok.Features.Subscriptions = subscription.List{{Channel: okxChannelGridPositions, Params: map[string]any{"algoId": "42"}}}
subs, err := ok.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{{Channel: okxChannelGridPositions, Params: map[string]any{"algoId": "42"}, QualifiedChannel: `{"channel":"grid-positions","algoId":"42"}`}}
testsubs.EqualLists(t, exp, subs)
ok.Features.Subscriptions = subscription.List{{Channel: okxChannelGridPositions}}
subs, err = ok.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp = subscription.List{{Channel: okxChannelGridPositions, QualifiedChannel: `{"channel":"grid-positions"}`}}
testsubs.EqualLists(t, exp, subs)
}

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
@@ -31,20 +32,6 @@ var (
errInvalidChecksum = errors.New("invalid checksum")
)
var (
// defaultSubscribedChannels list of channels which are subscribed by default
defaultSubscribedChannels = []string{
okxChannelTrades,
okxChannelOrderBooks,
okxChannelTickers,
}
// defaultAuthChannels list of channels which are subscribed when authenticated
defaultAuthChannels = []string{
okxChannelAccount,
okxChannelOrders,
}
)
var (
candlestickChannelsMap = map[string]bool{okxChannelCandle1Y: true, okxChannelCandle6M: true, okxChannelCandle3M: true, okxChannelCandle1M: true, okxChannelCandle1W: true, okxChannelCandle1D: true, okxChannelCandle2D: true, okxChannelCandle3D: true, okxChannelCandle5D: true, okxChannelCandle12H: true, okxChannelCandle6H: true, okxChannelCandle4H: true, okxChannelCandle2H: true, okxChannelCandle1H: true, okxChannelCandle30m: true, okxChannelCandle15m: true, okxChannelCandle5m: true, okxChannelCandle3m: true, okxChannelCandle1m: true, okxChannelCandle1Yutc: true, okxChannelCandle3Mutc: true, okxChannelCandle1Mutc: true, okxChannelCandle1Wutc: true, okxChannelCandle1Dutc: true, okxChannelCandle2Dutc: true, okxChannelCandle3Dutc: true, okxChannelCandle5Dutc: true, okxChannelCandle12Hutc: true, okxChannelCandle6Hutc: true}
candlesticksMarkPriceMap = map[string]bool{okxChannelMarkPriceCandle1Y: true, okxChannelMarkPriceCandle6M: true, okxChannelMarkPriceCandle3M: true, okxChannelMarkPriceCandle1M: true, okxChannelMarkPriceCandle1W: true, okxChannelMarkPriceCandle1D: true, okxChannelMarkPriceCandle2D: true, okxChannelMarkPriceCandle3D: true, okxChannelMarkPriceCandle5D: true, okxChannelMarkPriceCandle12H: true, okxChannelMarkPriceCandle6H: true, okxChannelMarkPriceCandle4H: true, okxChannelMarkPriceCandle2H: true, okxChannelMarkPriceCandle1H: true, okxChannelMarkPriceCandle30m: true, okxChannelMarkPriceCandle15m: true, okxChannelMarkPriceCandle5m: true, okxChannelMarkPriceCandle3m: true, okxChannelMarkPriceCandle1m: true, okxChannelMarkPriceCandle1Yutc: true, okxChannelMarkPriceCandle3Mutc: true, okxChannelMarkPriceCandle1Mutc: true, okxChannelMarkPriceCandle1Wutc: true, okxChannelMarkPriceCandle1Dutc: true, okxChannelMarkPriceCandle2Dutc: true, okxChannelMarkPriceCandle3Dutc: true, okxChannelMarkPriceCandle5Dutc: true, okxChannelMarkPriceCandle12Hutc: true, okxChannelMarkPriceCandle6Hutc: true}
@@ -214,6 +201,22 @@ const (
okxChannelMarkPriceCandle6Hutc = markPrice + okxChannelCandle6Hutc
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.All, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.All, Channel: subscription.OrderbookChannel},
{Enabled: true, Asset: asset.All, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.All, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}
var subscriptionNames = map[string]string{
subscription.AllTradesChannel: okxChannelTrades,
subscription.OrderbookChannel: okxChannelOrderBooks,
subscription.TickerChannel: okxChannelTickers,
subscription.MyAccountChannel: okxChannelAccount,
subscription.MyOrdersChannel: okxChannelOrders,
}
// WsConnect initiates a websocket connection
func (ok *Okx) WsConnect() error {
if !ok.Websocket.IsEnabled() || !ok.IsEnabled() {
@@ -360,119 +363,23 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe subscription.List) error {
// handleSubscription sends a subscription and unsubscription information thought the websocket endpoint.
// as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects.
func (ok *Okx) handleSubscription(operation string, subscriptions subscription.List) error {
func (ok *Okx) handleSubscription(operation string, subs subscription.List) error {
reqs := WSSubscriptionInformationList{Operation: operation}
authRequests := WSSubscriptionInformationList{Operation: operation}
ok.WsRequestSemaphore <- 1
defer func() { <-ok.WsRequestSemaphore }()
var channels subscription.List
var authChannels subscription.List
for i := 0; i < len(subscriptions); i++ {
s := subscriptions[i]
if len(s.Pairs) > 1 {
return subscription.ErrNotSinglePair
}
arg := SubscriptionInfo{
Channel: s.Channel,
}
var instrumentID string
var underlying string
var okay bool
var instrumentType string
var authSubscription bool
var algoID string
var uid string
switch arg.Channel {
case okxChannelAccount,
okxChannelPositions,
okxChannelBalanceAndPosition,
okxChannelOrders,
okxChannelAlgoOrders,
okxChannelAlgoAdvance,
okxChannelLiquidationWarning,
okxChannelAccountGreeks,
okxChannelRfqs,
okxChannelQuotes,
okxChannelStructureBlockTrades,
okxChannelSpotGridOrder,
okxChannelGridOrdersContract,
okxChannelGridPositions,
okcChannelGridSubOrders:
authSubscription = true
var errs error
for i := 0; i < len(subs); i++ {
s := subs[i]
var arg SubscriptionInfo
if err := json.Unmarshal([]byte(s.QualifiedChannel), &arg); err != nil {
errs = common.AppendError(errs, err)
continue
}
if arg.Channel == okxChannelGridPositions {
algoID, _ = s.Params["algoId"].(string)
}
if arg.Channel == okcChannelGridSubOrders ||
arg.Channel == okxChannelGridPositions {
uid, _ = s.Params["uid"].(string)
}
if strings.HasPrefix(arg.Channel, "candle") ||
arg.Channel == okxChannelTickers ||
arg.Channel == okxChannelOrderBooks ||
arg.Channel == okxChannelOrderBooks5 ||
arg.Channel == okxChannelOrderBooks50TBT ||
arg.Channel == okxChannelOrderBooksTBT ||
arg.Channel == okxChannelFundingRate ||
arg.Channel == okxChannelTrades {
if s.Params["instId"] != "" {
instrumentID, okay = s.Params["instId"].(string)
if !okay {
instrumentID = ""
}
} else if s.Params["instrumentID"] != "" {
instrumentID, okay = s.Params["instrumentID"].(string)
if !okay {
instrumentID = ""
}
}
if instrumentID == "" {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
format, err := ok.GetPairFormat(s.Asset, false)
if err != nil {
return err
}
p := s.Pairs[0]
if p.Base.String() == "" || p.Quote.String() == "" {
return errIncompleteCurrencyPair
}
instrumentID = format.Format(p)
}
}
if arg.Channel == okxChannelInstruments ||
arg.Channel == okxChannelPositions ||
arg.Channel == okxChannelOrders ||
arg.Channel == okxChannelAlgoOrders ||
arg.Channel == okxChannelAlgoAdvance ||
arg.Channel == okxChannelLiquidationWarning ||
arg.Channel == okxChannelSpotGridOrder ||
arg.Channel == okxChannelGridOrdersContract ||
arg.Channel == okxChannelEstimatedPrice {
instrumentType = ok.GetInstrumentTypeFromAssetItem(s.Asset)
}
if arg.Channel == okxChannelPositions ||
arg.Channel == okxChannelOrders ||
arg.Channel == okxChannelAlgoOrders ||
arg.Channel == okxChannelEstimatedPrice ||
arg.Channel == okxChannelOptSummary {
if len(s.Pairs) == 1 {
underlying, _ = ok.GetUnderlying(s.Pairs[0], s.Asset)
}
}
arg.InstrumentID = instrumentID
arg.Underlying = underlying
arg.InstrumentType = instrumentType
arg.UID = uid
arg.AlgoID = algoID
if authSubscription {
if s.Authenticated {
authChannels = append(authChannels, s)
authRequests.Arguments = append(authRequests.Arguments, arg)
authChunk, err := json.Marshal(authRequests)
@@ -1300,45 +1207,19 @@ func (ok *Okx) wsProcessTickers(data []byte) error {
return nil
}
// GenerateDefaultSubscriptions returns a list of default subscription message.
func (ok *Okx) GenerateDefaultSubscriptions() (subscription.List, error) {
var subscriptions subscription.List
assets := ok.GetAssetTypes(true)
subs := make([]string, 0, len(defaultSubscribedChannels)+len(defaultAuthChannels))
subs = append(subs, defaultSubscribedChannels...)
if ok.Websocket.CanUseAuthenticatedEndpoints() {
subs = append(subs, defaultAuthChannels...)
}
for c := range subs {
switch subs[c] {
case okxChannelOrders:
for x := range assets {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: subs[c],
Asset: assets[x],
})
}
case okxChannelCandle5m, okxChannelTickers, okxChannelOrderBooks, okxChannelFundingRate, okxChannelOrderBooks5, okxChannelOrderBooks50TBT, okxChannelOrderBooksTBT, okxChannelTrades:
for x := range assets {
pairs, err := ok.GetEnabledPairs(assets[x])
if err != nil {
return nil, err
}
for p := range pairs {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: subs[c],
Asset: assets[x],
Pairs: currency.Pairs{pairs[p]},
})
}
}
default:
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: subs[c],
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (ok *Okx) generateSubscriptions() (subscription.List, error) {
return ok.Features.Subscriptions.ExpandTemplates(ok)
}
// GetSubscriptionTemplate returns a subscription channel template
func (ok *Okx) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"isSymbolChannel": isSymbolChannel,
"isAssetChannel": isAssetChannel,
"instType": ok.GetInstrumentTypeFromAssetItem,
}).Parse(subTplText)
}
// wsProcessPushData processes push data coming through the websocket channel
@@ -2111,3 +1992,45 @@ func (ok *Okx) PublicStructureBlockTradesSubscription(operation string, assetTyp
func (ok *Okx) BlockTickerSubscription(operation string, assetType asset.Item, pair currency.Pair) error {
return ok.wsChannelSubscription(operation, okxChannelBlockTickers, assetType, pair, false, true, false)
}
// channelName converts global subscription channel names to exchange specific names
func channelName(s *subscription.Subscription) string {
if s, ok := subscriptionNames[s.Channel]; ok {
return s
}
return s.Channel
}
// isAssetChannel returns if the channel expects one Asset per subscription
func isAssetChannel(s *subscription.Subscription) bool {
return s.Channel == subscription.MyOrdersChannel
}
// isSymbolChannel returns if the channel expects one Symbol per subscription
func isSymbolChannel(s *subscription.Subscription) bool {
switch s.Channel {
case subscription.CandlesChannel, subscription.TickerChannel, subscription.OrderbookChannel, subscription.AllTradesChannel, okxChannelFundingRate:
return true
}
return false
}
const subTplText = `
{{- with $name := channelName $.S }}
{{- range $asset, $pairs := $.AssetPairs }}
{{- if isAssetChannel $.S -}}
{"channel":"{{ $name }}","instType":"{{ instType $asset }}"}
{{- else if isSymbolChannel $.S }}
{{- range $p := $pairs -}}
{"channel":"{{ $name }}","instID":"{{ $p }}"}
{{ $.PairSeparator }}
{{- end -}}
{{- else }}
{"channel":"{{ $name }}"
{{- with $algoId := index $.S.Params "algoId" -}} ,"algoId":"{{ $algoId }}" {{- end -}}
}
{{- end }}
{{- $.AssetSeparator }}
{{- end }}
{{- end }}
`

View File

@@ -156,6 +156,7 @@ func (ok *Okx) SetDefaults() {
GlobalResultLimit: 100, // Reference: https://www.okx.com/docs-v5/en/#rest-api-market-data-get-candlesticks-history
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
ok.Requester, err = request.New(ok.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
@@ -211,7 +212,7 @@ func (ok *Okx) Setup(exch *config.Exchange) error {
Connector: ok.WsConnect,
Subscriber: ok.Subscribe,
Unsubscriber: ok.Unsubscribe,
GenerateSubscriptions: ok.GenerateDefaultSubscriptions,
GenerateSubscriptions: ok.generateSubscriptions,
Features: &ok.Features.Supports.WebsocketCapabilities,
MaxWebsocketSubscriptionsPerConnection: 240,
OrderbookBufferConfig: buffer.Config{