Files
gocryptotrader/exchanges/okx/okx_business_websocket.go
dazi005 cce28f9d2b OKX: Fix authenticated websocket login (#2051)
* Okx:remove

* Okx:replace

* Okx:ping gws.PingMessage

* Okx:ping gws.PingMessage

* Okx: add authenticateConnection

* Okx: fix pingHandler

* Update exchanges/okx/okx_business_websocket.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/okx/okx_business_websocket.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/okx/okx_websocket.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/okx/okx_business_websocket.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Okx:UseMultiConnectionManagement

* Okx:rm UseMultiConnectionManagement

* Okx:roll back

* Okx:apply diff

* Okx:make lint fix

* Okx:make lint fix

* Okx:make lint fix

* Okx:fix

* Okx:fix name

* Okx:fix NilGuard depends on #2076

* Okx:remove comment

---------

Co-authored-by: Ryan O'Hara-Reid <oharareid.ryan@gmail.com>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
2025-10-10 10:59:08 +11:00

213 lines
6.9 KiB
Go

package okx
import (
"context"
"net/http"
"strings"
"time"
gws "github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
// okxBusinessWebsocketURL
okxBusinessWebsocketURL = "wss://ws.okx.com:8443/ws/v5/business"
)
var (
// defaultBusinessSubscribedChannels list of channels which are subscribed by default
defaultBusinessSubscribedChannels = []string{
okxSpreadPublicTrades,
okxSpreadOrderbook,
okxSpreadPublicTicker,
channelPublicStrucBlockTrades,
channelPublicBlockTrades,
channelBlockTickers,
}
// defaultBusinessAuthChannels list of authenticated channels
defaultBusinessAuthChannels = []string{
okxSpreadOrders,
okxSpreadTrades,
}
)
// WsConnectBusiness connects to a business websocket channel.
func (e *Exchange) WsConnectBusiness(ctx context.Context) error {
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
return websocket.ErrWebsocketNotEnabled
}
var dialer gws.Dialer
dialer.ReadBufferSize = 8192
dialer.WriteBufferSize = 8192
e.Websocket.Conn.SetURL(okxBusinessWebsocketURL)
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
return err
}
e.Websocket.Wg.Go(func() { e.wsReadData(ctx, e.Websocket.Conn) })
if e.Verbose {
log.Debugf(log.ExchangeSys, "Successful connection to %v", e.Websocket.GetWebsocketURL())
}
e.Websocket.Conn.SetupPingHandler(request.UnAuth, websocket.PingHandler{
MessageType: gws.TextMessage,
Message: pingMsg,
Delay: time.Second * 20,
})
if e.Websocket.CanUseAuthenticatedEndpoints() {
if err := e.authenticateConnection(ctx, e.Websocket.Conn); err != nil { // business WS uses same conn for public and private
log.Errorf(log.ExchangeSys, "Error authenticating business websocket: %s", err)
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
return nil
}
// GenerateDefaultBusinessSubscriptions returns a list of default subscriptions to business websocket.
func (e *Exchange) GenerateDefaultBusinessSubscriptions() ([]subscription.Subscription, error) {
var subs []string
var subscriptions []subscription.Subscription
subs = append(subs, defaultBusinessSubscribedChannels...)
if e.Websocket.CanUseAuthenticatedEndpoints() {
subs = append(subs, defaultBusinessAuthChannels...)
}
for c := range subs {
switch subs[c] {
case okxSpreadOrders,
okxSpreadTrades,
okxSpreadOrderbookLevel1,
okxSpreadOrderbook,
okxSpreadPublicTrades,
okxSpreadPublicTicker:
pairs, err := e.GetEnabledPairs(asset.Spread)
if err != nil {
return nil, err
}
for p := range pairs {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: subs[c],
Asset: asset.Spread,
Pairs: []currency.Pair{pairs[p]},
})
}
case channelPublicBlockTrades,
channelBlockTickers:
pairs, err := e.GetEnabledPairs(asset.PerpetualSwap)
if err != nil {
return nil, err
}
for p := range pairs {
subscriptions = append(subscriptions, subscription.Subscription{
Channel: subs[c],
Asset: asset.PerpetualSwap,
Pairs: []currency.Pair{pairs[p]},
})
}
default:
subscriptions = append(subscriptions, subscription.Subscription{
Channel: subs[c],
})
}
}
return subscriptions, nil
}
// BusinessSubscribe sends a websocket subscription request to several channels to receive data.
func (e *Exchange) BusinessSubscribe(ctx context.Context, channelsToSubscribe subscription.List) error {
return e.handleBusinessSubscription(ctx, operationSubscribe, channelsToSubscribe)
}
// BusinessUnsubscribe sends a websocket unsubscription request to several channels to receive data.
func (e *Exchange) BusinessUnsubscribe(ctx context.Context, channelsToUnsubscribe subscription.List) error {
return e.handleBusinessSubscription(ctx, operationUnsubscribe, channelsToUnsubscribe)
}
// handleBusinessSubscription sends a subscription and unsubscription information thought the business websocket endpoint.
// as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects.
func (e *Exchange) handleBusinessSubscription(ctx context.Context, operation string, subscriptions subscription.List) error {
wsSubscriptionReq := WSSubscriptionInformationList{Operation: operation}
var channels subscription.List
var authChannels subscription.List
var err error
for i := 0; i < len(subscriptions); i++ {
arg := SubscriptionInfo{
Channel: subscriptions[i].Channel,
}
switch arg.Channel {
case okxSpreadOrders, okxSpreadTrades, okxSpreadOrderbookLevel1, okxSpreadOrderbook, okxSpreadPublicTrades, okxSpreadPublicTicker:
if len(subscriptions[i].Pairs) != 1 {
return currency.ErrCurrencyPairEmpty
}
arg.SpreadID = subscriptions[i].Pairs[0].String()
case channelPublicBlockTrades, channelBlockTickers:
if len(subscriptions[i].Pairs) != 1 {
return currency.ErrCurrencyPairEmpty
}
arg.InstrumentID = subscriptions[i].Pairs[0]
}
if strings.HasPrefix(arg.Channel, candle) || strings.HasPrefix(arg.Channel, indexCandlestick) || strings.HasPrefix(arg.Channel, markPrice) {
if len(subscriptions[i].Pairs) != 1 {
return currency.ErrCurrencyPairEmpty
}
arg.InstrumentID = subscriptions[i].Pairs[0]
}
if ifAny, ok := subscriptions[i].Params["instFamily"]; ok {
if arg.InstrumentFamily, ok = ifAny.(string); !ok {
return common.GetTypeAssertError("string", ifAny, "instFamily")
}
}
var chunk []byte
channels = append(channels, subscriptions[i])
wsSubscriptionReq.Arguments = append(wsSubscriptionReq.Arguments, arg)
chunk, err = json.Marshal(wsSubscriptionReq)
if err != nil {
return err
}
if len(chunk) > maxConnByteLen {
i--
err = e.Websocket.Conn.SendJSONMessage(ctx, request.UnAuth, wsSubscriptionReq)
if err != nil {
return err
}
if operation == operationUnsubscribe {
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, channels...)
} else {
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, channels...)
}
if err != nil {
return err
}
channels = subscription.List{}
wsSubscriptionReq.Arguments = []SubscriptionInfo{}
continue
}
}
err = e.Websocket.Conn.SendJSONMessage(ctx, request.UnAuth, wsSubscriptionReq)
if err != nil {
return err
}
if operation == operationUnsubscribe {
channels = append(channels, authChannels...)
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, channels...)
} else {
channels = append(channels, authChannels...)
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, channels...)
}
return err
}