mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 15:09:42 +00:00
* Okx:remove * Okx:replace * Okx:ping gws.PingMessage * Okx:ping gws.PingMessage * Okx: add authenticateConnection * Okx: fix pingHandler * Update exchanges/okx/okx_business_websocket.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/okx/okx_business_websocket.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/okx/okx_websocket.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/okx/okx_business_websocket.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Okx:UseMultiConnectionManagement * Okx:rm UseMultiConnectionManagement * Okx:roll back * Okx:apply diff * Okx:make lint fix * Okx:make lint fix * Okx:make lint fix * Okx:fix * Okx:fix name * Okx:fix NilGuard depends on #2076 * Okx:remove comment --------- Co-authored-by: Ryan O'Hara-Reid <oharareid.ryan@gmail.com> Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
213 lines
6.9 KiB
Go
213 lines
6.9 KiB
Go
package okx
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
gws "github.com/gorilla/websocket"
|
|
"github.com/thrasher-corp/gocryptotrader/common"
|
|
"github.com/thrasher-corp/gocryptotrader/currency"
|
|
"github.com/thrasher-corp/gocryptotrader/encoding/json"
|
|
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
|
|
"github.com/thrasher-corp/gocryptotrader/log"
|
|
)
|
|
|
|
const (
|
|
// okxBusinessWebsocketURL
|
|
okxBusinessWebsocketURL = "wss://ws.okx.com:8443/ws/v5/business"
|
|
)
|
|
|
|
var (
|
|
// defaultBusinessSubscribedChannels list of channels which are subscribed by default
|
|
defaultBusinessSubscribedChannels = []string{
|
|
okxSpreadPublicTrades,
|
|
okxSpreadOrderbook,
|
|
okxSpreadPublicTicker,
|
|
|
|
channelPublicStrucBlockTrades,
|
|
channelPublicBlockTrades,
|
|
channelBlockTickers,
|
|
}
|
|
|
|
// defaultBusinessAuthChannels list of authenticated channels
|
|
defaultBusinessAuthChannels = []string{
|
|
okxSpreadOrders,
|
|
okxSpreadTrades,
|
|
}
|
|
)
|
|
|
|
// WsConnectBusiness connects to a business websocket channel.
|
|
func (e *Exchange) WsConnectBusiness(ctx context.Context) error {
|
|
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
|
|
return websocket.ErrWebsocketNotEnabled
|
|
}
|
|
var dialer gws.Dialer
|
|
dialer.ReadBufferSize = 8192
|
|
dialer.WriteBufferSize = 8192
|
|
|
|
e.Websocket.Conn.SetURL(okxBusinessWebsocketURL)
|
|
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
|
|
return err
|
|
}
|
|
e.Websocket.Wg.Go(func() { e.wsReadData(ctx, e.Websocket.Conn) })
|
|
|
|
if e.Verbose {
|
|
log.Debugf(log.ExchangeSys, "Successful connection to %v", e.Websocket.GetWebsocketURL())
|
|
}
|
|
e.Websocket.Conn.SetupPingHandler(request.UnAuth, websocket.PingHandler{
|
|
MessageType: gws.TextMessage,
|
|
Message: pingMsg,
|
|
Delay: time.Second * 20,
|
|
})
|
|
if e.Websocket.CanUseAuthenticatedEndpoints() {
|
|
if err := e.authenticateConnection(ctx, e.Websocket.Conn); err != nil { // business WS uses same conn for public and private
|
|
log.Errorf(log.ExchangeSys, "Error authenticating business websocket: %s", err)
|
|
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GenerateDefaultBusinessSubscriptions returns a list of default subscriptions to business websocket.
|
|
func (e *Exchange) GenerateDefaultBusinessSubscriptions() ([]subscription.Subscription, error) {
|
|
var subs []string
|
|
var subscriptions []subscription.Subscription
|
|
subs = append(subs, defaultBusinessSubscribedChannels...)
|
|
if e.Websocket.CanUseAuthenticatedEndpoints() {
|
|
subs = append(subs, defaultBusinessAuthChannels...)
|
|
}
|
|
for c := range subs {
|
|
switch subs[c] {
|
|
case okxSpreadOrders,
|
|
okxSpreadTrades,
|
|
okxSpreadOrderbookLevel1,
|
|
okxSpreadOrderbook,
|
|
okxSpreadPublicTrades,
|
|
okxSpreadPublicTicker:
|
|
pairs, err := e.GetEnabledPairs(asset.Spread)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for p := range pairs {
|
|
subscriptions = append(subscriptions, subscription.Subscription{
|
|
Channel: subs[c],
|
|
Asset: asset.Spread,
|
|
Pairs: []currency.Pair{pairs[p]},
|
|
})
|
|
}
|
|
case channelPublicBlockTrades,
|
|
channelBlockTickers:
|
|
pairs, err := e.GetEnabledPairs(asset.PerpetualSwap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for p := range pairs {
|
|
subscriptions = append(subscriptions, subscription.Subscription{
|
|
Channel: subs[c],
|
|
Asset: asset.PerpetualSwap,
|
|
Pairs: []currency.Pair{pairs[p]},
|
|
})
|
|
}
|
|
default:
|
|
subscriptions = append(subscriptions, subscription.Subscription{
|
|
Channel: subs[c],
|
|
})
|
|
}
|
|
}
|
|
return subscriptions, nil
|
|
}
|
|
|
|
// BusinessSubscribe sends a websocket subscription request to several channels to receive data.
|
|
func (e *Exchange) BusinessSubscribe(ctx context.Context, channelsToSubscribe subscription.List) error {
|
|
return e.handleBusinessSubscription(ctx, operationSubscribe, channelsToSubscribe)
|
|
}
|
|
|
|
// BusinessUnsubscribe sends a websocket unsubscription request to several channels to receive data.
|
|
func (e *Exchange) BusinessUnsubscribe(ctx context.Context, channelsToUnsubscribe subscription.List) error {
|
|
return e.handleBusinessSubscription(ctx, operationUnsubscribe, channelsToUnsubscribe)
|
|
}
|
|
|
|
// handleBusinessSubscription sends a subscription and unsubscription information thought the business websocket endpoint.
|
|
// as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects.
|
|
func (e *Exchange) handleBusinessSubscription(ctx context.Context, operation string, subscriptions subscription.List) error {
|
|
wsSubscriptionReq := WSSubscriptionInformationList{Operation: operation}
|
|
var channels subscription.List
|
|
var authChannels subscription.List
|
|
var err error
|
|
for i := 0; i < len(subscriptions); i++ {
|
|
arg := SubscriptionInfo{
|
|
Channel: subscriptions[i].Channel,
|
|
}
|
|
|
|
switch arg.Channel {
|
|
case okxSpreadOrders, okxSpreadTrades, okxSpreadOrderbookLevel1, okxSpreadOrderbook, okxSpreadPublicTrades, okxSpreadPublicTicker:
|
|
if len(subscriptions[i].Pairs) != 1 {
|
|
return currency.ErrCurrencyPairEmpty
|
|
}
|
|
arg.SpreadID = subscriptions[i].Pairs[0].String()
|
|
case channelPublicBlockTrades, channelBlockTickers:
|
|
if len(subscriptions[i].Pairs) != 1 {
|
|
return currency.ErrCurrencyPairEmpty
|
|
}
|
|
arg.InstrumentID = subscriptions[i].Pairs[0]
|
|
}
|
|
|
|
if strings.HasPrefix(arg.Channel, candle) || strings.HasPrefix(arg.Channel, indexCandlestick) || strings.HasPrefix(arg.Channel, markPrice) {
|
|
if len(subscriptions[i].Pairs) != 1 {
|
|
return currency.ErrCurrencyPairEmpty
|
|
}
|
|
arg.InstrumentID = subscriptions[i].Pairs[0]
|
|
}
|
|
|
|
if ifAny, ok := subscriptions[i].Params["instFamily"]; ok {
|
|
if arg.InstrumentFamily, ok = ifAny.(string); !ok {
|
|
return common.GetTypeAssertError("string", ifAny, "instFamily")
|
|
}
|
|
}
|
|
|
|
var chunk []byte
|
|
channels = append(channels, subscriptions[i])
|
|
wsSubscriptionReq.Arguments = append(wsSubscriptionReq.Arguments, arg)
|
|
chunk, err = json.Marshal(wsSubscriptionReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(chunk) > maxConnByteLen {
|
|
i--
|
|
err = e.Websocket.Conn.SendJSONMessage(ctx, request.UnAuth, wsSubscriptionReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if operation == operationUnsubscribe {
|
|
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, channels...)
|
|
} else {
|
|
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, channels...)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
channels = subscription.List{}
|
|
wsSubscriptionReq.Arguments = []SubscriptionInfo{}
|
|
continue
|
|
}
|
|
}
|
|
err = e.Websocket.Conn.SendJSONMessage(ctx, request.UnAuth, wsSubscriptionReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if operation == operationUnsubscribe {
|
|
channels = append(channels, authChannels...)
|
|
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, channels...)
|
|
} else {
|
|
channels = append(channels, authChannels...)
|
|
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, channels...)
|
|
}
|
|
return err
|
|
}
|