diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index 55bb7442..6494b5e1 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -106,54 +106,47 @@ func (m *WebsocketRoutineManager) websocketRoutine() { if err != nil { log.Errorf(log.WebsocketMgr, "websocket routine manager cannot get exchanges: %v", err) } - wg := sync.WaitGroup{} - wg.Add(len(exchanges)) - for i := range exchanges { - go func(i int) { - defer wg.Done() - if exchanges[i].SupportsWebsocket() { - if m.verbose { - log.Debugf(log.WebsocketMgr, - "Exchange %s websocket support: Yes Enabled: %v", - exchanges[i].GetName(), - common.IsEnabled(exchanges[i].IsWebsocketEnabled()), - ) - } - - ws, err := exchanges[i].GetWebsocket() - if err != nil { - log.Errorf( - log.WebsocketMgr, - "Exchange %s GetWebsocket error: %s", - exchanges[i].GetName(), - err, - ) - return - } - - if ws.IsEnabled() { - err = ws.Connect() - if err != nil { - log.Errorf(log.WebsocketMgr, "%v", err) - } - - err = m.websocketDataReceiver(ws) - if err != nil { - log.Errorf(log.WebsocketMgr, "%v", err) - } - - err = ws.FlushChannels() - if err != nil { - log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err) - } - } - } else if m.verbose { - log.Debugf(log.WebsocketMgr, - "Exchange %s websocket support: No", - exchanges[i].GetName(), - ) + var wg sync.WaitGroup + for _, exch := range exchanges { + if !exch.SupportsWebsocket() { + if m.verbose { + log.Debugf(log.WebsocketMgr, "Exchange %s websocket support: No", + exch.GetName()) } - }(i) + continue + } + + if m.verbose { + log.Debugf(log.WebsocketMgr, "Exchange %s websocket support: Yes Enabled: %v", + exch.GetName(), + common.IsEnabled(exch.IsWebsocketEnabled())) + } + + ws, err := exch.GetWebsocket() + if err != nil { + log.Errorf(log.WebsocketMgr, "Exchange %s GetWebsocket error: %s", + exch.GetName(), + err) + continue + } + + if !ws.IsEnabled() { + continue + } + + wg.Add(1) + go func() { + defer wg.Done() + err = ws.Connect() + if err != nil { + log.Errorf(log.WebsocketMgr, "%v", err) + } + + err = m.websocketDataReceiver(ws) + if err != nil { + log.Errorf(log.WebsocketMgr, "%v", err) + } + }() } wg.Wait() } diff --git a/exchanges/okx/okx_test.go b/exchanges/okx/okx_test.go index e2eb5d24..6e9f452d 100644 --- a/exchanges/okx/okx_test.go +++ b/exchanges/okx/okx_test.go @@ -3646,3 +3646,42 @@ func TestGetFuturesContractDetails(t *testing.T) { t.Error(err) } } + +func TestWsProcessOrderbook5(t *testing.T) { + t.Parallel() + + var ob5payload = []byte(`{"arg":{"channel":"books5","instId":"OKB-USDT"},"data":[{"asks":[["0.0000007465","2290075956","0","4"],["0.0000007466","1747284705","0","4"],["0.0000007467","1338861655","0","3"],["0.0000007468","1661668387","0","6"],["0.0000007469","2715477116","0","5"]],"bids":[["0.0000007464","15693119","0","1"],["0.0000007463","2330835024","0","4"],["0.0000007462","1182926517","0","2"],["0.0000007461","3818684357","0","4"],["0.000000746","6021641435","0","7"]],"instId":"OKB-USDT","ts":"1695864901807","seqId":4826378794}]}`) + err := ok.wsProcessOrderbook5(ob5payload) + if err != nil { + t.Error(err) + } + + required := currency.NewPairWithDelimiter("OKB", "USDT", "-") + + got, err := orderbook.Get("okx", required, asset.Spot) + if err != nil { + t.Fatal(err) + } + + if len(got.Asks) != 5 { + t.Errorf("expected %v, received %v", 5, len(got.Asks)) + } + + if len(got.Bids) != 5 { + t.Errorf("expected %v, received %v", 5, len(got.Bids)) + } + + // Book replicated to margin + got, err = orderbook.Get("okx", required, asset.Margin) + if err != nil { + t.Fatal(err) + } + + if len(got.Asks) != 5 { + t.Errorf("expected %v, received %v", 5, len(got.Asks)) + } + + if len(got.Bids) != 5 { + t.Errorf("expected %v, received %v", 5, len(got.Bids)) + } +} diff --git a/exchanges/okx/okx_type_convert.go b/exchanges/okx/okx_type_convert.go index 4d911327..89c5db9f 100644 --- a/exchanges/okx/okx_type_convert.go +++ b/exchanges/okx/okx_type_convert.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" ) @@ -41,22 +40,6 @@ func (a *okxNumericalValue) Float64() float64 { return float64(*a) } type okxUnixMilliTime int64 -type okxAssetType struct { - asset.Item -} - -// UnmarshalJSON deserializes JSON, and timestamp information. -func (a *okxAssetType) UnmarshalJSON(data []byte) error { - var t string - err := json.Unmarshal(data, &t) - if err != nil { - return err - } - - a.Item = GetAssetTypeFromInstrumentType(strings.ToUpper(t)) - return nil -} - // UnmarshalJSON deserializes byte data to okxunixMilliTime instance. func (a *okxUnixMilliTime) UnmarshalJSON(data []byte) error { var num string diff --git a/exchanges/okx/okx_types.go b/exchanges/okx/okx_types.go index 54b4aa2c..47509dcf 100644 --- a/exchanges/okx/okx_types.go +++ b/exchanges/okx/okx_types.go @@ -3189,3 +3189,21 @@ type wsSubscriptionParameters struct { Underlying bool Currency bool } + +// WsOrderbook5 stores the orderbook data for orderbook 5 websocket +type WsOrderbook5 struct { + Argument struct { + Channel string `json:"channel"` + InstrumentID string `json:"instId"` + } `json:"arg"` + Data []Book5Data `json:"data"` +} + +// Book5Data stores the orderbook data for orderbook 5 websocket +type Book5Data struct { + Asks [][4]string `json:"asks"` + Bids [][4]string `json:"bids"` + InstrumentID string `json:"instId"` + TimestampMilli int64 `json:"ts,string"` + SequenceID int64 `json:"seqId"` +} diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index 4a1eb77c..dee1cd88 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -359,15 +359,8 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) e // handleSubscription sends a subscription and unsubscription information thought the websocket endpoint. // as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects. func (ok *Okx) handleSubscription(operation string, subscriptions []stream.ChannelSubscription) error { - request := WSSubscriptionInformationList{ - Operation: operation, - Arguments: []SubscriptionInfo{}, - } - - authRequests := WSSubscriptionInformationList{ - Operation: operation, - Arguments: []SubscriptionInfo{}, - } + request := WSSubscriptionInformationList{Operation: operation} + authRequests := WSSubscriptionInformationList{Operation: operation} ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() var channels []stream.ChannelSubscription @@ -650,8 +643,9 @@ func (ok *Okx) WsHandleData(respRaw []byte) error { okxChannelPriceLimit: var response WsMarkPrice return ok.wsProcessPushData(respRaw, &response) + case okxChannelOrderBooks5: + return ok.wsProcessOrderbook5(respRaw) case okxChannelOrderBooks, - okxChannelOrderBooks5, okxChannelOrderBooks50TBT, okxChannelBBOTBT, okxChannelOrderBooksTBT: @@ -738,11 +732,74 @@ func (ok *Okx) wsProcessIndexCandles(respRaw []byte) error { return nil } +// wsProcessOrderbook5 processes orderbook data +func (ok *Okx) wsProcessOrderbook5(data []byte) error { + var resp WsOrderbook5 + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + + if len(resp.Data) != 1 { + return fmt.Errorf("%s - no data returned", ok.Name) + } + + assets, err := ok.GetAssetsFromInstrumentTypeOrID("", resp.Argument.InstrumentID) + if err != nil { + return err + } + + pair, err := ok.GetPairFromInstrumentID(resp.Argument.InstrumentID) + if err != nil { + return err + } + + asks := make([]orderbook.Item, len(resp.Data[0].Asks)) + for x := range resp.Data[0].Asks { + asks[x].Price, err = strconv.ParseFloat(resp.Data[0].Asks[x][0], 64) + if err != nil { + return err + } + + asks[x].Amount, err = strconv.ParseFloat(resp.Data[0].Asks[x][1], 64) + if err != nil { + return err + } + } + + bids := make([]orderbook.Item, len(resp.Data[0].Bids)) + for x := range resp.Data[0].Bids { + bids[x].Price, err = strconv.ParseFloat(resp.Data[0].Bids[x][0], 64) + if err != nil { + return err + } + + bids[x].Amount, err = strconv.ParseFloat(resp.Data[0].Bids[x][1], 64) + if err != nil { + return err + } + } + + for x := range assets { + err = ok.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ + Asset: assets[x], + Asks: asks, + Bids: bids, + LastUpdated: time.UnixMilli(resp.Data[0].TimestampMilli), + Pair: pair, + Exchange: ok.Name, + VerifyOrderbook: ok.CanVerifyOrderbook}) + if err != nil { + return err + } + } + return nil +} + // wsProcessOrderBooks processes "snapshot" and "update" order book func (ok *Okx) wsProcessOrderBooks(data []byte) error { var response WsOrderBook - var err error - err = json.Unmarshal(data, &response) + err := json.Unmarshal(data, &response) if err != nil { return err } @@ -1277,10 +1334,6 @@ func (ok *Okx) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, err }) } } - if len(subscriptions) >= 240 { - log.Warnf(log.WebsocketMgr, "OKx has 240 subscription limit, only subscribing within limit. Requested %v", len(subscriptions)) - subscriptions = subscriptions[:239] - } return subscriptions, nil } diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index 034d611e..da66dad4 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -214,14 +214,15 @@ func (ok *Okx) Setup(exch *config.Exchange) error { return err } if err := ok.Websocket.Setup(&stream.WebsocketSetup{ - ExchangeConfig: exch, - DefaultURL: okxAPIWebsocketPublicURL, - RunningURL: wsRunningEndpoint, - Connector: ok.WsConnect, - Subscriber: ok.Subscribe, - Unsubscriber: ok.Unsubscribe, - GenerateSubscriptions: ok.GenerateDefaultSubscriptions, - Features: &ok.Features.Supports.WebsocketCapabilities, + ExchangeConfig: exch, + DefaultURL: okxAPIWebsocketPublicURL, + RunningURL: wsRunningEndpoint, + Connector: ok.WsConnect, + Subscriber: ok.Subscribe, + Unsubscriber: ok.Unsubscribe, + GenerateSubscriptions: ok.GenerateDefaultSubscriptions, + Features: &ok.Features.Supports.WebsocketCapabilities, + MaxWebsocketSubscriptionsPerConnection: 240, OrderbookBufferConfig: buffer.Config{ Checksum: ok.CalculateUpdateOrderbookChecksum, }, diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 4a6a318a..11f692a1 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/log" ) @@ -48,6 +49,10 @@ var ( errWebsocketConnectorUnset = errors.New("websocket connector function not set") errWebsocketSubscriptionsGeneratorUnset = errors.New("websocket subscriptions generator function needs to be set") errClosedConnection = errors.New("use of closed network connection") + errSubscriptionsExceedsLimit = errors.New("subscriptions exceeds limit") + errInvalidMaxSubscriptions = errors.New("max subscriptions cannot be less than 0") + errNoSubscriptionsSupplied = errors.New("no subscriptions supplied") + errChannelSubscriptionAlreadySubscribed = errors.New("channel subscription already subscribed") ) var globalReporter Reporter @@ -167,6 +172,11 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.Trade.Setup(w.exchangeName, s.TradeFeed, w.DataHandler) w.Fills.Setup(s.FillsFeed, w.DataHandler) + + if s.MaxWebsocketSubscriptionsPerConnection < 0 { + return fmt.Errorf("%s %w", w.exchangeName, errInvalidMaxSubscriptions) + } + w.MaxSubscriptionsPerConnection = s.MaxWebsocketSubscriptionsPerConnection return nil } @@ -275,11 +285,18 @@ func (w *Websocket) Connect() error { subs, err := w.GenerateSubs() // regenerate state on new connection if err != nil { - return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) + return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err)) + } + if len(subs) == 0 { + return nil + } + err = w.checkSubscriptions(subs) + if err != nil { + return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err)) } err = w.Subscriber(subs) if err != nil { - return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) + return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err)) } return nil } @@ -905,24 +922,13 @@ func (w *Websocket) ResubscribeToChannel(subscribedChannel *ChannelSubscription) // SubscribeToChannels appends supplied channels to channelsToSubscribe func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error { - if len(channels) == 0 { - return fmt.Errorf("%s websocket: cannot subscribe no channels supplied", - w.exchangeName) + err := w.checkSubscriptions(channels) + if err != nil { + return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err)) } - w.subscriptionMutex.Lock() - for x := range channels { - for y := range w.subscriptions { - if channels[x].Equal(&w.subscriptions[y]) { - w.subscriptionMutex.Unlock() - return fmt.Errorf("%s websocket: %v already subscribed", - w.exchangeName, - channels[x]) - } - } - } - w.subscriptionMutex.Unlock() - if err := w.Subscriber(channels); err != nil { - return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) + err = w.Subscriber(channels) + if err != nil { + return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err)) } return nil } @@ -1004,3 +1010,31 @@ func checkWebsocketURL(s string) error { } return nil } + +// checkSubscriptions checks subscriptions against the max subscription limit +// and if the subscription already exists. +func (w *Websocket) checkSubscriptions(subs []ChannelSubscription) error { + if len(subs) == 0 { + return errNoSubscriptionsSupplied + } + + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() + + if w.MaxSubscriptionsPerConnection > 0 && len(w.subscriptions)+len(subs) > w.MaxSubscriptionsPerConnection { + return fmt.Errorf("%w: current subscriptions: %v, incoming subscriptions: %v, max subscriptions per connection: %v - please reduce enabled pairs", + errSubscriptionsExceedsLimit, + len(w.subscriptions), + len(subs), + w.MaxSubscriptionsPerConnection) + } + + for x := range subs { + for y := range w.subscriptions { + if subs[x].Equal(&w.subscriptions[y]) { + return fmt.Errorf("%w for %+v", errChannelSubscriptionAlreadySubscribed, subs[x]) + } + } + } + return nil +} diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index a5651d0f..2f68e5ba 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -554,7 +554,15 @@ func TestSubscribeUnsubscribe(t *testing.T) { func TestResubscribe(t *testing.T) { t.Parallel() ws := *New() - err := ws.Setup(defaultSetup) + + wackedOutSetup := *defaultSetup + wackedOutSetup.MaxWebsocketSubscriptionsPerConnection = -1 + err := ws.Setup(&wackedOutSetup) + if !errors.Is(err, errInvalidMaxSubscriptions) { + t.Fatalf("received: '%v' but expected: '%v'", err, errInvalidMaxSubscriptions) + } + + err = ws.Setup(defaultSetup) if err != nil { t.Fatal(err) } @@ -1390,3 +1398,32 @@ func TestLatency(t *testing.T) { t.Errorf("expected %v, got %v", exch, r.name) } } + +func TestCheckSubscriptions(t *testing.T) { + t.Parallel() + ws := Websocket{} + err := ws.checkSubscriptions(nil) + if !errors.Is(err, errNoSubscriptionsSupplied) { + t.Fatalf("received: %v, but expected: %v", err, errNoSubscriptionsSupplied) + } + + ws.MaxSubscriptionsPerConnection = 1 + + err = ws.checkSubscriptions([]ChannelSubscription{{}, {}}) + if !errors.Is(err, errSubscriptionsExceedsLimit) { + t.Fatalf("received: %v, but expected: %v", err, errSubscriptionsExceedsLimit) + } + + ws.MaxSubscriptionsPerConnection = 2 + + ws.subscriptions = []ChannelSubscription{{Channel: "test"}} + err = ws.checkSubscriptions([]ChannelSubscription{{Channel: "test"}}) + if !errors.Is(err, errChannelSubscriptionAlreadySubscribed) { + t.Fatalf("received: %v, but expected: %v", err, errChannelSubscriptionAlreadySubscribed) + } + + err = ws.checkSubscriptions([]ChannelSubscription{{}}) + if !errors.Is(err, nil) { + t.Fatalf("received: %v, but expected: %v", err, nil) + } +} diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 2db60384..640fb74a 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -93,6 +93,10 @@ type Websocket struct { // Latency reporter ExchangeLevelReporter Reporter + + // MaxSubScriptionsPerConnection defines the maximum number of + // subscriptions per connection that is allowed by the exchange. + MaxSubscriptionsPerConnection int } // WebsocketSetup defines variables for setting up a websocket connection @@ -114,6 +118,10 @@ type WebsocketSetup struct { // Fill data config values FillsFeed bool + + // MaxWebsocketSubscriptionsPerConnection defines the maximum number of + // subscriptions per connection that is allowed by the exchange. + MaxWebsocketSubscriptionsPerConnection int } // WebsocketConnection contains all the data needed to send a message to a WS