Okx: Integrate websocket book5 processing and add configurable max subscriptions per connection (#1275)

* okx: books 5 (cherry-pick)

* okx: shift types to types file, remove commented code and updated field name to better reflect pushed type

* linter: fix

* remove slowness

* * Introduce function checksubscriptions and shift check of subscriptions to internal websocket package
* Shift Max websocket connection int to Websocket setup (temp) for this use case only.

* glorious: nits

* linter: fix

* websocket: don't try and subscribed with nothing to subscribe to.

* Update exchanges/stream/websocket_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
Ryan O'Hara-Reid
2023-10-13 15:54:49 +11:00
committed by GitHub
parent 859c4512fb
commit 773441d5a7
9 changed files with 274 additions and 108 deletions

View File

@@ -106,54 +106,47 @@ func (m *WebsocketRoutineManager) websocketRoutine() {
if err != nil {
log.Errorf(log.WebsocketMgr, "websocket routine manager cannot get exchanges: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(len(exchanges))
for i := range exchanges {
go func(i int) {
defer wg.Done()
if exchanges[i].SupportsWebsocket() {
if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: Yes Enabled: %v",
exchanges[i].GetName(),
common.IsEnabled(exchanges[i].IsWebsocketEnabled()),
)
}
ws, err := exchanges[i].GetWebsocket()
if err != nil {
log.Errorf(
log.WebsocketMgr,
"Exchange %s GetWebsocket error: %s",
exchanges[i].GetName(),
err,
)
return
}
if ws.IsEnabled() {
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = m.websocketDataReceiver(ws)
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = ws.FlushChannels()
if err != nil {
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err)
}
}
} else if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: No",
exchanges[i].GetName(),
)
var wg sync.WaitGroup
for _, exch := range exchanges {
if !exch.SupportsWebsocket() {
if m.verbose {
log.Debugf(log.WebsocketMgr, "Exchange %s websocket support: No",
exch.GetName())
}
}(i)
continue
}
if m.verbose {
log.Debugf(log.WebsocketMgr, "Exchange %s websocket support: Yes Enabled: %v",
exch.GetName(),
common.IsEnabled(exch.IsWebsocketEnabled()))
}
ws, err := exch.GetWebsocket()
if err != nil {
log.Errorf(log.WebsocketMgr, "Exchange %s GetWebsocket error: %s",
exch.GetName(),
err)
continue
}
if !ws.IsEnabled() {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = m.websocketDataReceiver(ws)
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
}()
}
wg.Wait()
}

View File

@@ -3646,3 +3646,42 @@ func TestGetFuturesContractDetails(t *testing.T) {
t.Error(err)
}
}
func TestWsProcessOrderbook5(t *testing.T) {
t.Parallel()
var ob5payload = []byte(`{"arg":{"channel":"books5","instId":"OKB-USDT"},"data":[{"asks":[["0.0000007465","2290075956","0","4"],["0.0000007466","1747284705","0","4"],["0.0000007467","1338861655","0","3"],["0.0000007468","1661668387","0","6"],["0.0000007469","2715477116","0","5"]],"bids":[["0.0000007464","15693119","0","1"],["0.0000007463","2330835024","0","4"],["0.0000007462","1182926517","0","2"],["0.0000007461","3818684357","0","4"],["0.000000746","6021641435","0","7"]],"instId":"OKB-USDT","ts":"1695864901807","seqId":4826378794}]}`)
err := ok.wsProcessOrderbook5(ob5payload)
if err != nil {
t.Error(err)
}
required := currency.NewPairWithDelimiter("OKB", "USDT", "-")
got, err := orderbook.Get("okx", required, asset.Spot)
if err != nil {
t.Fatal(err)
}
if len(got.Asks) != 5 {
t.Errorf("expected %v, received %v", 5, len(got.Asks))
}
if len(got.Bids) != 5 {
t.Errorf("expected %v, received %v", 5, len(got.Bids))
}
// Book replicated to margin
got, err = orderbook.Get("okx", required, asset.Margin)
if err != nil {
t.Fatal(err)
}
if len(got.Asks) != 5 {
t.Errorf("expected %v, received %v", 5, len(got.Asks))
}
if len(got.Bids) != 5 {
t.Errorf("expected %v, received %v", 5, len(got.Bids))
}
}

View File

@@ -6,7 +6,6 @@ import (
"strings"
"time"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
)
@@ -41,22 +40,6 @@ func (a *okxNumericalValue) Float64() float64 { return float64(*a) }
type okxUnixMilliTime int64
type okxAssetType struct {
asset.Item
}
// UnmarshalJSON deserializes JSON, and timestamp information.
func (a *okxAssetType) UnmarshalJSON(data []byte) error {
var t string
err := json.Unmarshal(data, &t)
if err != nil {
return err
}
a.Item = GetAssetTypeFromInstrumentType(strings.ToUpper(t))
return nil
}
// UnmarshalJSON deserializes byte data to okxunixMilliTime instance.
func (a *okxUnixMilliTime) UnmarshalJSON(data []byte) error {
var num string

View File

@@ -3189,3 +3189,21 @@ type wsSubscriptionParameters struct {
Underlying bool
Currency bool
}
// WsOrderbook5 stores the orderbook data for orderbook 5 websocket
type WsOrderbook5 struct {
Argument struct {
Channel string `json:"channel"`
InstrumentID string `json:"instId"`
} `json:"arg"`
Data []Book5Data `json:"data"`
}
// Book5Data stores the orderbook data for orderbook 5 websocket
type Book5Data struct {
Asks [][4]string `json:"asks"`
Bids [][4]string `json:"bids"`
InstrumentID string `json:"instId"`
TimestampMilli int64 `json:"ts,string"`
SequenceID int64 `json:"seqId"`
}

View File

@@ -359,15 +359,8 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) e
// handleSubscription sends a subscription and unsubscription information thought the websocket endpoint.
// as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects.
func (ok *Okx) handleSubscription(operation string, subscriptions []stream.ChannelSubscription) error {
request := WSSubscriptionInformationList{
Operation: operation,
Arguments: []SubscriptionInfo{},
}
authRequests := WSSubscriptionInformationList{
Operation: operation,
Arguments: []SubscriptionInfo{},
}
request := WSSubscriptionInformationList{Operation: operation}
authRequests := WSSubscriptionInformationList{Operation: operation}
ok.WsRequestSemaphore <- 1
defer func() { <-ok.WsRequestSemaphore }()
var channels []stream.ChannelSubscription
@@ -650,8 +643,9 @@ func (ok *Okx) WsHandleData(respRaw []byte) error {
okxChannelPriceLimit:
var response WsMarkPrice
return ok.wsProcessPushData(respRaw, &response)
case okxChannelOrderBooks5:
return ok.wsProcessOrderbook5(respRaw)
case okxChannelOrderBooks,
okxChannelOrderBooks5,
okxChannelOrderBooks50TBT,
okxChannelBBOTBT,
okxChannelOrderBooksTBT:
@@ -738,11 +732,74 @@ func (ok *Okx) wsProcessIndexCandles(respRaw []byte) error {
return nil
}
// wsProcessOrderbook5 processes orderbook data
func (ok *Okx) wsProcessOrderbook5(data []byte) error {
var resp WsOrderbook5
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
if len(resp.Data) != 1 {
return fmt.Errorf("%s - no data returned", ok.Name)
}
assets, err := ok.GetAssetsFromInstrumentTypeOrID("", resp.Argument.InstrumentID)
if err != nil {
return err
}
pair, err := ok.GetPairFromInstrumentID(resp.Argument.InstrumentID)
if err != nil {
return err
}
asks := make([]orderbook.Item, len(resp.Data[0].Asks))
for x := range resp.Data[0].Asks {
asks[x].Price, err = strconv.ParseFloat(resp.Data[0].Asks[x][0], 64)
if err != nil {
return err
}
asks[x].Amount, err = strconv.ParseFloat(resp.Data[0].Asks[x][1], 64)
if err != nil {
return err
}
}
bids := make([]orderbook.Item, len(resp.Data[0].Bids))
for x := range resp.Data[0].Bids {
bids[x].Price, err = strconv.ParseFloat(resp.Data[0].Bids[x][0], 64)
if err != nil {
return err
}
bids[x].Amount, err = strconv.ParseFloat(resp.Data[0].Bids[x][1], 64)
if err != nil {
return err
}
}
for x := range assets {
err = ok.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Asset: assets[x],
Asks: asks,
Bids: bids,
LastUpdated: time.UnixMilli(resp.Data[0].TimestampMilli),
Pair: pair,
Exchange: ok.Name,
VerifyOrderbook: ok.CanVerifyOrderbook})
if err != nil {
return err
}
}
return nil
}
// wsProcessOrderBooks processes "snapshot" and "update" order book
func (ok *Okx) wsProcessOrderBooks(data []byte) error {
var response WsOrderBook
var err error
err = json.Unmarshal(data, &response)
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
@@ -1277,10 +1334,6 @@ func (ok *Okx) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, err
})
}
}
if len(subscriptions) >= 240 {
log.Warnf(log.WebsocketMgr, "OKx has 240 subscription limit, only subscribing within limit. Requested %v", len(subscriptions))
subscriptions = subscriptions[:239]
}
return subscriptions, nil
}

View File

@@ -214,14 +214,15 @@ func (ok *Okx) Setup(exch *config.Exchange) error {
return err
}
if err := ok.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: okxAPIWebsocketPublicURL,
RunningURL: wsRunningEndpoint,
Connector: ok.WsConnect,
Subscriber: ok.Subscribe,
Unsubscriber: ok.Unsubscribe,
GenerateSubscriptions: ok.GenerateDefaultSubscriptions,
Features: &ok.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: okxAPIWebsocketPublicURL,
RunningURL: wsRunningEndpoint,
Connector: ok.WsConnect,
Subscriber: ok.Subscribe,
Unsubscriber: ok.Unsubscribe,
GenerateSubscriptions: ok.GenerateDefaultSubscriptions,
Features: &ok.Features.Supports.WebsocketCapabilities,
MaxWebsocketSubscriptionsPerConnection: 240,
OrderbookBufferConfig: buffer.Config{
Checksum: ok.CalculateUpdateOrderbookChecksum,
},

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/log"
)
@@ -48,6 +49,10 @@ var (
errWebsocketConnectorUnset = errors.New("websocket connector function not set")
errWebsocketSubscriptionsGeneratorUnset = errors.New("websocket subscriptions generator function needs to be set")
errClosedConnection = errors.New("use of closed network connection")
errSubscriptionsExceedsLimit = errors.New("subscriptions exceeds limit")
errInvalidMaxSubscriptions = errors.New("max subscriptions cannot be less than 0")
errNoSubscriptionsSupplied = errors.New("no subscriptions supplied")
errChannelSubscriptionAlreadySubscribed = errors.New("channel subscription already subscribed")
)
var globalReporter Reporter
@@ -167,6 +172,11 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
w.Trade.Setup(w.exchangeName, s.TradeFeed, w.DataHandler)
w.Fills.Setup(s.FillsFeed, w.DataHandler)
if s.MaxWebsocketSubscriptionsPerConnection < 0 {
return fmt.Errorf("%s %w", w.exchangeName, errInvalidMaxSubscriptions)
}
w.MaxSubscriptionsPerConnection = s.MaxWebsocketSubscriptionsPerConnection
return nil
}
@@ -275,11 +285,18 @@ func (w *Websocket) Connect() error {
subs, err := w.GenerateSubs() // regenerate state on new connection
if err != nil {
return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err)
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
if len(subs) == 0 {
return nil
}
err = w.checkSubscriptions(subs)
if err != nil {
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
err = w.Subscriber(subs)
if err != nil {
return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err)
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
return nil
}
@@ -905,24 +922,13 @@ func (w *Websocket) ResubscribeToChannel(subscribedChannel *ChannelSubscription)
// SubscribeToChannels appends supplied channels to channelsToSubscribe
func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error {
if len(channels) == 0 {
return fmt.Errorf("%s websocket: cannot subscribe no channels supplied",
w.exchangeName)
err := w.checkSubscriptions(channels)
if err != nil {
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
w.subscriptionMutex.Lock()
for x := range channels {
for y := range w.subscriptions {
if channels[x].Equal(&w.subscriptions[y]) {
w.subscriptionMutex.Unlock()
return fmt.Errorf("%s websocket: %v already subscribed",
w.exchangeName,
channels[x])
}
}
}
w.subscriptionMutex.Unlock()
if err := w.Subscriber(channels); err != nil {
return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err)
err = w.Subscriber(channels)
if err != nil {
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
return nil
}
@@ -1004,3 +1010,31 @@ func checkWebsocketURL(s string) error {
}
return nil
}
// checkSubscriptions checks subscriptions against the max subscription limit
// and if the subscription already exists.
func (w *Websocket) checkSubscriptions(subs []ChannelSubscription) error {
if len(subs) == 0 {
return errNoSubscriptionsSupplied
}
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.MaxSubscriptionsPerConnection > 0 && len(w.subscriptions)+len(subs) > w.MaxSubscriptionsPerConnection {
return fmt.Errorf("%w: current subscriptions: %v, incoming subscriptions: %v, max subscriptions per connection: %v - please reduce enabled pairs",
errSubscriptionsExceedsLimit,
len(w.subscriptions),
len(subs),
w.MaxSubscriptionsPerConnection)
}
for x := range subs {
for y := range w.subscriptions {
if subs[x].Equal(&w.subscriptions[y]) {
return fmt.Errorf("%w for %+v", errChannelSubscriptionAlreadySubscribed, subs[x])
}
}
}
return nil
}

View File

@@ -554,7 +554,15 @@ func TestSubscribeUnsubscribe(t *testing.T) {
func TestResubscribe(t *testing.T) {
t.Parallel()
ws := *New()
err := ws.Setup(defaultSetup)
wackedOutSetup := *defaultSetup
wackedOutSetup.MaxWebsocketSubscriptionsPerConnection = -1
err := ws.Setup(&wackedOutSetup)
if !errors.Is(err, errInvalidMaxSubscriptions) {
t.Fatalf("received: '%v' but expected: '%v'", err, errInvalidMaxSubscriptions)
}
err = ws.Setup(defaultSetup)
if err != nil {
t.Fatal(err)
}
@@ -1390,3 +1398,32 @@ func TestLatency(t *testing.T) {
t.Errorf("expected %v, got %v", exch, r.name)
}
}
func TestCheckSubscriptions(t *testing.T) {
t.Parallel()
ws := Websocket{}
err := ws.checkSubscriptions(nil)
if !errors.Is(err, errNoSubscriptionsSupplied) {
t.Fatalf("received: %v, but expected: %v", err, errNoSubscriptionsSupplied)
}
ws.MaxSubscriptionsPerConnection = 1
err = ws.checkSubscriptions([]ChannelSubscription{{}, {}})
if !errors.Is(err, errSubscriptionsExceedsLimit) {
t.Fatalf("received: %v, but expected: %v", err, errSubscriptionsExceedsLimit)
}
ws.MaxSubscriptionsPerConnection = 2
ws.subscriptions = []ChannelSubscription{{Channel: "test"}}
err = ws.checkSubscriptions([]ChannelSubscription{{Channel: "test"}})
if !errors.Is(err, errChannelSubscriptionAlreadySubscribed) {
t.Fatalf("received: %v, but expected: %v", err, errChannelSubscriptionAlreadySubscribed)
}
err = ws.checkSubscriptions([]ChannelSubscription{{}})
if !errors.Is(err, nil) {
t.Fatalf("received: %v, but expected: %v", err, nil)
}
}

View File

@@ -93,6 +93,10 @@ type Websocket struct {
// Latency reporter
ExchangeLevelReporter Reporter
// MaxSubScriptionsPerConnection defines the maximum number of
// subscriptions per connection that is allowed by the exchange.
MaxSubscriptionsPerConnection int
}
// WebsocketSetup defines variables for setting up a websocket connection
@@ -114,6 +118,10 @@ type WebsocketSetup struct {
// Fill data config values
FillsFeed bool
// MaxWebsocketSubscriptionsPerConnection defines the maximum number of
// subscriptions per connection that is allowed by the exchange.
MaxWebsocketSubscriptionsPerConnection int
}
// WebsocketConnection contains all the data needed to send a message to a WS