From b461c32a5ecc1f5d9c73751592c1ace81a3f2553 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 13 Sep 2024 13:56:46 +1000 Subject: [PATCH] stream/okx: allow rate limit definitions to be used by the stream package (#1641) * stream: rate limiter definitions * Update exchanges/request/request_test.go Co-authored-by: Scott --------- Co-authored-by: Ryan O'Hara-Reid Co-authored-by: Scott --- exchanges/binance/binance_websocket.go | 5 +- exchanges/binanceus/binanceus_websocket.go | 11 +-- exchanges/bitfinex/bitfinex_websocket.go | 40 +++++------ exchanges/bithumb/bithumb_websocket.go | 3 +- exchanges/bitmex/bitmex_websocket.go | 7 +- exchanges/bitstamp/bitstamp_websocket.go | 7 +- exchanges/btcmarkets/btcmarkets_websocket.go | 5 +- exchanges/btse/btse_websocket.go | 9 +-- exchanges/bybit/bybit_inverse_websocket.go | 5 +- exchanges/bybit/bybit_linear_websocket.go | 5 +- exchanges/bybit/bybit_options_websocket.go | 5 +- exchanges/bybit/bybit_websocket.go | 11 +-- .../coinbasepro/coinbasepro_websocket.go | 5 +- exchanges/coinut/coinut_websocket.go | 41 ++++++----- exchanges/deribit/deribit_websocket.go | 11 +-- exchanges/deribit/deribit_websocket_eps.go | 2 +- exchanges/gateio/gateio_websocket.go | 5 +- .../gateio/gateio_ws_delivery_futures.go | 6 +- exchanges/gateio/gateio_ws_futures.go | 6 +- exchanges/gateio/gateio_ws_option.go | 5 +- exchanges/gemini/gemini_websocket.go | 3 +- exchanges/hitbtc/hitbtc_websocket.go | 41 +++++------ exchanges/huobi/huobi_websocket.go | 45 ++++++------ exchanges/kraken/kraken_websocket.go | 31 ++++---- exchanges/kucoin/kucoin_websocket.go | 4 +- exchanges/okx/okx_websocket.go | 45 ++++++------ exchanges/okx/okx_wrapper.go | 1 + exchanges/okx/ratelimit.go | 12 ++-- exchanges/poloniex/poloniex_websocket.go | 7 +- exchanges/request/limit.go | 9 +++ exchanges/request/request_test.go | 9 +++ exchanges/stream/stream_types.go | 21 +++--- exchanges/stream/websocket.go | 2 + exchanges/stream/websocket_connection.go | 71 +++++++++++-------- exchanges/stream/websocket_test.go | 46 +++++++++--- exchanges/stream/websocket_types.go | 18 ++++- 36 files changed, 328 insertions(+), 231 deletions(-) diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 7c920ea9..25df2c58 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -88,7 +89,7 @@ func (b *Binance) WsConnect() error { go b.KeepAuthKeyAlive() } - b.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + b.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ UseGorillaHandler: true, MessageType: websocket.PongMessage, Delay: pingDelay, @@ -577,7 +578,7 @@ func (b *Binance) manageSubs(op string, subs subscription.List) error { Params: subs.QualifiedChannels(), } - respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), req.ID, req) + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err == nil { if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil { err = rErr diff --git a/exchanges/binanceus/binanceus_websocket.go b/exchanges/binanceus/binanceus_websocket.go index a7c18899..859736cb 100644 --- a/exchanges/binanceus/binanceus_websocket.go +++ b/exchanges/binanceus/binanceus_websocket.go @@ -15,6 +15,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -81,7 +82,7 @@ func (bi *Binanceus) WsConnect() error { go bi.KeepAuthKeyAlive() } - bi.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + bi.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ UseGorillaHandler: true, MessageType: websocket.PongMessage, Delay: pingDelay, @@ -576,7 +577,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error { for i := range channelsToSubscribe { payload.Params = append(payload.Params, channelsToSubscribe[i].Channel) if i%50 == 0 && i != 0 { - err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payload) if err != nil { return err } @@ -584,7 +585,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error { } } if len(payload.Params) > 0 { - err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payload) if err != nil { return err } @@ -600,7 +601,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error for i := range channelsToUnsubscribe { payload.Params = append(payload.Params, channelsToUnsubscribe[i].Channel) if i%50 == 0 && i != 0 { - err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payload) if err != nil { return err } @@ -608,7 +609,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error } } if len(payload.Params) > 0 { - err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payload) if err != nil { return err } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 1a50702a..7cfa71b6 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -22,6 +22,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -1710,7 +1711,7 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() (subscription.List, error) { // ConfigureWS to send checksums and sequence numbers func (b *Bitfinex) ConfigureWS() error { - return b.Websocket.Conn.SendJSONMessage(context.TODO(), map[string]interface{}{ + return b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, map[string]interface{}{ "event": "conf", "flags": bitfinexChecksumFlag + bitfinexWsSequenceFlag, }) @@ -1756,7 +1757,7 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error { _ = b.Websocket.RemoveSubscriptions(c) }() - respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), "subscribe:"+subID, req) + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "subscribe:"+subID, req) if err != nil { return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs) } @@ -1849,7 +1850,7 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error { "chanId": chanID, } - respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), "unsubscribe:"+strconv.Itoa(chanID), req) + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "unsubscribe:"+strconv.Itoa(chanID), req) if err != nil { return err } @@ -1906,15 +1907,14 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error { if err != nil { return err } - request := WsAuthRequest{ + err = b.Websocket.AuthConn.SendJSONMessage(ctx, request.Unset, WsAuthRequest{ Event: "auth", APIKey: creds.Key, AuthPayload: payload, AuthSig: crypto.HexEncodeToString(hmac), AuthNonce: nonce, DeadManSwitch: 0, - } - err = b.Websocket.AuthConn.SendJSONMessage(ctx, request) + }) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -1925,8 +1925,8 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error { // WsNewOrder authenticated new order request func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) { data.CustomID = b.Websocket.AuthConn.GenerateMessageID(false) - request := makeRequestInterface(wsOrderNew, data) - resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), data.CustomID, request) + req := makeRequestInterface(wsOrderNew, data) + resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, data.CustomID, req) if err != nil { return "", err } @@ -1982,8 +1982,8 @@ func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) { // WsModifyOrder authenticated modify order request func (b *Bitfinex) WsModifyOrder(data *WsUpdateOrderRequest) error { - request := makeRequestInterface(wsOrderUpdate, data) - resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), data.OrderID, request) + req := makeRequestInterface(wsOrderUpdate, data) + resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, data.OrderID, req) if err != nil { return err } @@ -2027,8 +2027,8 @@ func (b *Bitfinex) WsCancelMultiOrders(orderIDs []int64) error { cancel := WsCancelGroupOrdersRequest{ OrderID: orderIDs, } - request := makeRequestInterface(wsCancelMultipleOrders, cancel) - return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) + req := makeRequestInterface(wsCancelMultipleOrders, cancel) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, req) } // WsCancelOrder authenticated cancel order request @@ -2036,8 +2036,8 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error { cancel := WsCancelOrderRequest{ OrderID: orderID, } - request := makeRequestInterface(wsOrderCancel, cancel) - resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), orderID, request) + req := makeRequestInterface(wsOrderCancel, cancel) + resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, orderID, req) if err != nil { return err } @@ -2078,14 +2078,14 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error { // WsCancelAllOrders authenticated cancel all orders request func (b *Bitfinex) WsCancelAllOrders() error { cancelAll := WsCancelAllOrdersRequest{All: 1} - request := makeRequestInterface(wsCancelMultipleOrders, cancelAll) - return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) + req := makeRequestInterface(wsCancelMultipleOrders, cancelAll) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, req) } // WsNewOffer authenticated new offer request func (b *Bitfinex) WsNewOffer(data *WsNewOfferRequest) error { - request := makeRequestInterface(wsFundingOfferNew, data) - return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) + req := makeRequestInterface(wsFundingOfferNew, data) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, req) } // WsCancelOffer authenticated cancel offer request @@ -2093,8 +2093,8 @@ func (b *Bitfinex) WsCancelOffer(orderID int64) error { cancel := WsCancelOrderRequest{ OrderID: orderID, } - request := makeRequestInterface(wsFundingOfferCancel, cancel) - resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), orderID, request) + req := makeRequestInterface(wsFundingOfferCancel, cancel) + resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, orderID, req) if err != nil { return err } diff --git a/exchanges/bithumb/bithumb_websocket.go b/exchanges/bithumb/bithumb_websocket.go index b7783dd2..461f6dcb 100644 --- a/exchanges/bithumb/bithumb_websocket.go +++ b/exchanges/bithumb/bithumb_websocket.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -204,7 +205,7 @@ func (b *Bithumb) Subscribe(channelsToSubscribe subscription.List) error { if s.Channel == "ticker" { req.TickTypes = wsDefaultTickTypes } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index e604b6ed..8c4dbf16 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -599,7 +600,7 @@ func (b *Bitmex) Subscribe(subs subscription.List) error { req.Arguments = append(req.Arguments, cName+":"+p.String()) } } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(subs...) } @@ -618,7 +619,7 @@ func (b *Bitmex) Unsubscribe(subs subscription.List) error { req.Arguments = append(req.Arguments, cName+":"+p.String()) } } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.RemoveSubscriptions(subs...) } @@ -655,7 +656,7 @@ func (b *Bitmex) websocketSendAuth(ctx context.Context) error { sendAuth.Command = "authKeyExpires" sendAuth.Arguments = append(sendAuth.Arguments, creds.Key, timestamp, signature) - err = b.Websocket.Conn.SendJSONMessage(ctx, sendAuth) + err = b.Websocket.Conn.SendJSONMessage(ctx, request.Unset, sendAuth) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index efed7efb..5effb2a6 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -17,6 +17,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -55,7 +56,7 @@ func (b *Bitstamp) WsConnect() error { if b.Verbose { log.Debugf(log.ExchangeSys, "%s Connected to Websocket.\n", b.Name) } - b.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + b.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: hbMsg, Delay: hbInterval, @@ -292,7 +293,7 @@ func (b *Bitstamp) Subscribe(channelsToSubscribe subscription.List) error { req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(auth.UserID)) req.Data.Auth = auth.Token } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } @@ -314,7 +315,7 @@ func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe subscription.List) error { Channel: s.Channel, }, } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.RemoveSubscriptions(s) } diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 6e8464d0..26b66e64 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -374,7 +375,7 @@ func (b *BTCMarkets) Subscribe(subs subscription.List) error { r.Channels = []string{s.Channel} r.MarketIDs = s.Pairs.Strings() - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), r) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } @@ -414,7 +415,7 @@ func (b *BTCMarkets) Unsubscribe(subs subscription.List) error { MarketIDs: s.Pairs.Strings(), } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err == nil { err = b.Websocket.RemoveSubscriptions(s) } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 8f682b23..9399c9ce 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -37,7 +38,7 @@ func (b *BTSE) WsConnect() error { if err != nil { return err } - b.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + b.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.PingMessage, Delay: btseWebsocketTimer, }) @@ -78,7 +79,7 @@ func (b *BTSE) WsAuthenticate(ctx context.Context) error { Operation: "authKeyExpires", Arguments: []string{creds.Key, nonce, sign}, } - return b.Websocket.Conn.SendJSONMessage(ctx, req) + return b.Websocket.Conn.SendJSONMessage(ctx, request.Unset, req) } func stringToOrderStatus(status string) (order.Status, error) { @@ -392,7 +393,7 @@ func (b *BTSE) Subscribe(channelsToSubscribe subscription.List) error { for i := range channelsToSubscribe { sub.Arguments = append(sub.Arguments, channelsToSubscribe[i].Channel) } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), sub) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, sub) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) } @@ -407,7 +408,7 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe subscription.List) error { unSub.Arguments = append(unSub.Arguments, channelsToUnsubscribe[i].Channel) } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), unSub) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, unSub) if err == nil { err = b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...) } diff --git a/exchanges/bybit/bybit_inverse_websocket.go b/exchanges/bybit/bybit_inverse_websocket.go index 6ea2d83e..1160fc2d 100644 --- a/exchanges/bybit/bybit_inverse_websocket.go +++ b/exchanges/bybit/bybit_inverse_websocket.go @@ -7,6 +7,7 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" ) @@ -22,7 +23,7 @@ func (by *Bybit) WsInverseConnect() error { if err != nil { return err } - by.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + by.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: []byte(`{"op": "ping"}`), Delay: bybitWebsocketTimer, @@ -72,7 +73,7 @@ func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubsc for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/bybit_linear_websocket.go b/exchanges/bybit/bybit_linear_websocket.go index 8ee49abd..303b2f76 100644 --- a/exchanges/bybit/bybit_linear_websocket.go +++ b/exchanges/bybit/bybit_linear_websocket.go @@ -7,6 +7,7 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" ) @@ -22,7 +23,7 @@ func (by *Bybit) WsLinearConnect() error { if err != nil { return err } - by.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + by.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: []byte(`{"op": "ping"}`), Delay: bybitWebsocketTimer, @@ -90,7 +91,7 @@ func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscr for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/bybit_options_websocket.go b/exchanges/bybit/bybit_options_websocket.go index d387b513..c67a6114 100644 --- a/exchanges/bybit/bybit_options_websocket.go +++ b/exchanges/bybit/bybit_options_websocket.go @@ -9,6 +9,7 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" ) @@ -29,7 +30,7 @@ func (by *Bybit) WsOptionsConnect() error { if err != nil { return err } - by.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + by.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: pingData, Delay: bybitWebsocketTimer, @@ -79,7 +80,7 @@ func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubsc for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/bybit_websocket.go b/exchanges/bybit/bybit_websocket.go index 9e6308f2..301a2c6c 100644 --- a/exchanges/bybit/bybit_websocket.go +++ b/exchanges/bybit/bybit_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -64,7 +65,7 @@ func (by *Bybit) WsConnect() error { if err != nil { return err } - by.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + by.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: []byte(`{"op": "ping"}`), Delay: bybitWebsocketTimer, @@ -90,7 +91,7 @@ func (by *Bybit) WsAuth(ctx context.Context) error { return err } - by.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ + by.Websocket.AuthConn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: []byte(`{"op":"ping"}`), Delay: bybitWebsocketTimer, @@ -118,7 +119,7 @@ func (by *Bybit) WsAuth(ctx context.Context) error { Operation: "auth", Args: []interface{}{creds.Key, intNonce, sign}, } - resp, err := by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), req.RequestID, req) + resp, err := by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, req.RequestID, req) if err != nil { return err } @@ -220,12 +221,12 @@ func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe su for a := range payloads { var response []byte if payloads[a].auth { - response, err = by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), payloads[a].RequestID, payloads[a]) + response, err = by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, payloads[a].RequestID, payloads[a]) if err != nil { return err } } else { - response, err = by.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[a].RequestID, payloads[a]) + response, err = by.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, payloads[a].RequestID, payloads[a]) if err != nil { return err } diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 647612b1..22a16cd6 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -423,7 +424,7 @@ func (c *CoinbasePro) Subscribe(subs subscription.List) error { r.Channels = append(r.Channels, s.Channel) } } - err := c.Websocket.Conn.SendJSONMessage(context.TODO(), r) + err := c.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) if err == nil { err = c.Websocket.AddSuccessfulSubscriptions(subs...) } @@ -459,7 +460,7 @@ func (c *CoinbasePro) Unsubscribe(subs subscription.List) error { ProductIDs: s.Pairs.Strings(), }) } - err := c.Websocket.Conn.SendJSONMessage(context.TODO(), r) + err := c.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) if err == nil { err = c.Websocket.RemoveSubscriptions(subs...) } diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index 10bb52e8..e147cb9e 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -475,12 +476,12 @@ func (c *COINUT) parseOrderContainer(oContainer *wsOrderContainer) (*order.Detai // WsGetInstruments fetches instrument list and propagates a local cache func (c *COINUT) WsGetInstruments() (Instruments, error) { var list Instruments - request := wsRequest{ + req := wsRequest{ Request: "inst_list", SecurityType: strings.ToUpper(asset.Spot.String()), Nonce: getNonce(), } - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Nonce, request) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.Nonce, req) if err != nil { return list, err } @@ -618,7 +619,7 @@ func (c *COINUT) Subscribe(subs subscription.List) error { Subscribe: true, Nonce: getNonce(), } - err = c.Websocket.Conn.SendJSONMessage(context.TODO(), subscribe) + err = c.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, subscribe) if err == nil { err = c.Websocket.AddSuccessfulSubscriptions(s) } @@ -648,7 +649,7 @@ func (c *COINUT) Unsubscribe(channelToUnsubscribe subscription.List) error { Subscribe: false, Nonce: getNonce(), } - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), subscribe.Nonce, subscribe) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, subscribe.Nonce, subscribe) if err != nil { errs = common.AppendError(errs, err) continue @@ -691,7 +692,7 @@ func (c *COINUT) wsAuthenticate(ctx context.Context) error { } r.Hmac = crypto.HexEncodeToString(hmac) - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), r.Nonce, r) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, r.Nonce, r) if err != nil { return err } @@ -714,7 +715,7 @@ func (c *COINUT) wsGetAccountBalance() (*UserBalance, error) { Request: "user_balance", Nonce: getNonce(), } - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), accBalance.Nonce, accBalance) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, accBalance.Nonce, accBalance) if err != nil { return nil, err } @@ -750,7 +751,7 @@ func (c *COINUT) wsSubmitOrder(o *WsSubmitOrderParameters) (*order.Detail, error if o.OrderID > 0 { orderSubmissionRequest.OrderID = o.OrderID } - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), orderSubmissionRequest.Nonce, orderSubmissionRequest) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, orderSubmissionRequest.Nonce, orderSubmissionRequest) if err != nil { return nil, err } @@ -793,7 +794,7 @@ func (c *COINUT) wsSubmitOrders(orders []WsSubmitOrderParameters) ([]order.Detai orderRequest.Nonce = getNonce() orderRequest.Request = "new_orders" - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), orderRequest.Nonce, orderRequest) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, orderRequest.Nonce, orderRequest) if err != nil { errs = append(errs, err) return nil, errs @@ -829,7 +830,7 @@ func (c *COINUT) wsGetOpenOrders(curr string) (*WsUserOpenOrdersResponse, error) openOrdersRequest.Nonce = getNonce() openOrdersRequest.InstrumentID = c.instrumentMap.LookupID(curr) - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), openOrdersRequest.Nonce, openOrdersRequest) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, openOrdersRequest.Nonce, openOrdersRequest) if err != nil { return response, err } @@ -862,7 +863,7 @@ func (c *COINUT) wsCancelOrder(cancellation *WsCancelOrderParameters) (*CancelOr cancellationRequest.OrderID = cancellation.OrderID cancellationRequest.Nonce = getNonce() - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), cancellationRequest.Nonce, cancellationRequest) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, cancellationRequest.Nonce, cancellationRequest) if err != nil { return response, err } @@ -903,7 +904,7 @@ func (c *COINUT) wsCancelOrders(cancellations []WsCancelOrderParameters) (*Cance cancelOrderRequest.Request = "cancel_orders" cancelOrderRequest.Nonce = getNonce() - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), cancelOrderRequest.Nonce, cancelOrderRequest) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, cancelOrderRequest.Nonce, cancelOrderRequest) if err != nil { return response, err } @@ -926,14 +927,14 @@ func (c *COINUT) wsGetTradeHistory(p currency.Pair, start, limit int64) (*WsTrad return nil, err } - var request WsTradeHistoryRequest - request.Request = "trade_history" - request.InstID = c.instrumentMap.LookupID(curr.String()) - request.Nonce = getNonce() - request.Start = start - request.Limit = limit + var req WsTradeHistoryRequest + req.Request = "trade_history" + req.InstID = c.instrumentMap.LookupID(curr.String()) + req.Nonce = getNonce() + req.Start = start + req.Limit = limit - resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Nonce, request) + resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.Nonce, req) if err != nil { return response, err } @@ -942,9 +943,7 @@ func (c *COINUT) wsGetTradeHistory(p currency.Pair, start, limit int64) (*WsTrad return response, err } if response.Status[0] != "OK" { - return response, fmt.Errorf("%v get trade history failed for %v", - c.Name, - request) + return response, fmt.Errorf("%v get trade history failed for %v", c.Name, req) } return response, nil } diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index bec47df0..d99dc862 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/nonce" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -113,7 +114,7 @@ func (d *Deribit) WsConnect() error { d.Websocket.SetCanUseAuthenticatedEndpoints(false) } } - return d.Websocket.Conn.SendJSONMessage(context.TODO(), setHeartBeatMessage) + return d.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, setHeartBeatMessage) } func (d *Deribit) wsLogin(ctx context.Context) error { @@ -135,7 +136,7 @@ func (d *Deribit) wsLogin(ctx context.Context) error { return err } - request := wsInput{ + req := wsInput{ JSONRPCVersion: rpcVersion, Method: "public/auth", ID: d.Websocket.Conn.GenerateMessageID(false), @@ -147,7 +148,7 @@ func (d *Deribit) wsLogin(ctx context.Context) error { "signature": crypto.HexEncodeToString(hmac), }, } - resp, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { d.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -187,7 +188,7 @@ func (d *Deribit) wsHandleData(respRaw []byte) error { return fmt.Errorf("%s - err %s could not parse websocket data: %s", d.Name, err, respRaw) } if response.Method == "heartbeat" { - return d.Websocket.Conn.SendJSONMessage(context.TODO(), pingMessage) + return d.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, pingMessage) } if response.ID > 2 { if !d.Websocket.Match.IncomingWithData(response.ID, respRaw) { @@ -1165,7 +1166,7 @@ func (d *Deribit) handleSubscription(operation string, channels subscription.Lis return err } for x := range payloads { - data, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[x].ID, payloads[x]) + data, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, payloads[x].ID, payloads[x]) if err != nil { return err } diff --git a/exchanges/deribit/deribit_websocket_eps.go b/exchanges/deribit/deribit_websocket_eps.go index 4c639cc6..5c761b2a 100644 --- a/exchanges/deribit/deribit_websocket_eps.go +++ b/exchanges/deribit/deribit_websocket_eps.go @@ -2406,7 +2406,7 @@ func (d *Deribit) sendWsPayload(ep request.EndpointLimit, input *WsRequest, resp log.Debugf(log.RequestSys, "%s attempt %d", d.Name, attempt) } var payload []byte - payload, err = d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), input.ID, input) + payload, err = d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, input.ID, input) if err != nil { return err } diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index a436ede3..68241946 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -22,6 +22,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -74,7 +75,7 @@ func (g *Gateio) WsConnect() error { if err != nil { return err } - g.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + g.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Websocket: true, Delay: time.Second * 15, Message: pingMessage, @@ -700,7 +701,7 @@ func (g *Gateio) handleSubscription(event string, channelsToSubscribe subscripti } var errs error for k := range payloads { - result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[k].ID, payloads[k]) + result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, payloads[k].ID, payloads[k]) if err != nil { errs = common.AppendError(errs, err) continue diff --git a/exchanges/gateio/gateio_ws_delivery_futures.go b/exchanges/gateio/gateio_ws_delivery_futures.go index a2908c95..fb6c9b7f 100644 --- a/exchanges/gateio/gateio_ws_delivery_futures.go +++ b/exchanges/gateio/gateio_ws_delivery_futures.go @@ -92,7 +92,7 @@ func (g *Gateio) WsDeliveryFuturesConnect() error { if err != nil { return err } - g.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + g.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Websocket: true, Delay: time.Second * 5, MessageType: websocket.PingMessage, @@ -208,9 +208,9 @@ func (g *Gateio) handleDeliveryFuturesSubscription(event string, channelsToSubsc for con, val := range payloads { for k := range val { if con == 0 { - respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k]) + respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, val[k].ID, val[k]) } else { - respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k]) + respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, val[k].ID, val[k]) } if err != nil { errs = common.AppendError(errs, err) diff --git a/exchanges/gateio/gateio_ws_futures.go b/exchanges/gateio/gateio_ws_futures.go index a6f4207f..df3868ad 100644 --- a/exchanges/gateio/gateio_ws_futures.go +++ b/exchanges/gateio/gateio_ws_futures.go @@ -113,7 +113,7 @@ func (g *Gateio) WsFuturesConnect() error { if err != nil { return err } - g.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + g.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Websocket: true, MessageType: websocket.PingMessage, Delay: time.Second * 15, @@ -288,9 +288,9 @@ func (g *Gateio) handleFuturesSubscription(event string, channelsToSubscribe sub for con, val := range payloads { for k := range val { if con == 0 { - respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k]) + respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, val[k].ID, val[k]) } else { - respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k]) + respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, val[k].ID, val[k]) } if err != nil { errs = common.AppendError(errs, err) diff --git a/exchanges/gateio/gateio_ws_option.go b/exchanges/gateio/gateio_ws_option.go index 55b7a720..8c8b8d0f 100644 --- a/exchanges/gateio/gateio_ws_option.go +++ b/exchanges/gateio/gateio_ws_option.go @@ -19,6 +19,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -95,7 +96,7 @@ func (g *Gateio) WsOptionsConnect() error { } g.Websocket.Wg.Add(1) go g.wsReadOptionsConnData() - g.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + g.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Websocket: true, Delay: time.Second * 5, MessageType: websocket.PingMessage, @@ -319,7 +320,7 @@ func (g *Gateio) handleOptionsSubscription(event string, channelsToSubscribe sub } var errs error for k := range payloads { - result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[k].ID, payloads[k]) + result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, payloads[k].ID, payloads[k]) if err != nil { errs = common.AppendError(errs, err) continue diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index 6c811ccd..27952409 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -19,6 +19,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -112,7 +113,7 @@ func (g *Gemini) manageSubs(subs subscription.List, op wsSubOp) error { }) } - if err := g.Websocket.Conn.SendJSONMessage(context.TODO(), req); err != nil { + if err := g.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req); err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index ac5a439f..51cc1806 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -17,6 +17,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -523,7 +524,7 @@ func (h *HitBTC) Subscribe(channelsToSubscribe subscription.List) error { r.Params.Limit = 100 } - err := h.Websocket.Conn.SendJSONMessage(context.TODO(), r) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) if err == nil { err = h.Websocket.AddSuccessfulSubscriptions(s) } @@ -559,7 +560,7 @@ func (h *HitBTC) Unsubscribe(subs subscription.List) error { r.Params.Limit = 100 } - err := h.Websocket.Conn.SendJSONMessage(context.TODO(), r) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) if err == nil { err = h.Websocket.RemoveSubscriptions(s) } @@ -588,7 +589,7 @@ func (h *HitBTC) wsLogin(ctx context.Context) error { return err } - request := WsLoginRequest{ + req := WsLoginRequest{ Method: "login", Params: WsLoginData{ Algo: "HS256", @@ -599,7 +600,7 @@ func (h *HitBTC) wsLogin(ctx context.Context) error { ID: h.Websocket.Conn.GenerateMessageID(false), } - err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request) + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -620,7 +621,7 @@ func (h *HitBTC) wsPlaceOrder(pair currency.Pair, side string, price, quantity f return nil, err } - request := WsSubmitOrderRequest{ + req := WsSubmitOrderRequest{ Method: "newOrder", Params: WsSubmitOrderRequestData{ ClientOrderID: id, @@ -631,7 +632,7 @@ func (h *HitBTC) wsPlaceOrder(pair currency.Pair, side string, price, quantity f }, ID: id, } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), id, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, id, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -651,14 +652,14 @@ func (h *HitBTC) wsCancelOrder(clientOrderID string) (*WsCancelOrderResponse, er if !h.Websocket.CanUseAuthenticatedEndpoints() { return nil, fmt.Errorf("%v not authenticated, cannot place order", h.Name) } - request := WsCancelOrderRequest{ + req := WsCancelOrderRequest{ Method: "cancelOrder", Params: WsCancelOrderRequestData{ ClientOrderID: clientOrderID, }, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -678,7 +679,7 @@ func (h *HitBTC) wsReplaceOrder(clientOrderID string, quantity, price float64) ( if !h.Websocket.CanUseAuthenticatedEndpoints() { return nil, fmt.Errorf("%v not authenticated, cannot place order", h.Name) } - request := WsReplaceOrderRequest{ + req := WsReplaceOrderRequest{ Method: "cancelReplaceOrder", Params: WsReplaceOrderRequestData{ ClientOrderID: clientOrderID, @@ -688,7 +689,7 @@ func (h *HitBTC) wsReplaceOrder(clientOrderID string, quantity, price float64) ( }, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -708,12 +709,12 @@ func (h *HitBTC) wsGetActiveOrders() (*wsActiveOrdersResponse, error) { if !h.Websocket.CanUseAuthenticatedEndpoints() { return nil, fmt.Errorf("%v not authenticated, cannot get active orders", h.Name) } - request := WsReplaceOrderRequest{ + req := WsReplaceOrderRequest{ Method: "getOrders", Params: WsReplaceOrderRequestData{}, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -733,12 +734,12 @@ func (h *HitBTC) wsGetTradingBalance() (*WsGetTradingBalanceResponse, error) { if !h.Websocket.CanUseAuthenticatedEndpoints() { return nil, fmt.Errorf("%v not authenticated, cannot place order", h.Name) } - request := WsReplaceOrderRequest{ + req := WsReplaceOrderRequest{ Method: "getTradingBalance", Params: WsReplaceOrderRequestData{}, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -755,14 +756,14 @@ func (h *HitBTC) wsGetTradingBalance() (*WsGetTradingBalanceResponse, error) { // wsGetCurrencies sends a websocket message to get trading balance func (h *HitBTC) wsGetCurrencies(currencyItem currency.Code) (*WsGetCurrenciesResponse, error) { - request := WsGetCurrenciesRequest{ + req := WsGetCurrenciesRequest{ Method: "getCurrency", Params: WsGetCurrenciesRequestParameters{ Currency: currencyItem, }, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -784,14 +785,14 @@ func (h *HitBTC) wsGetSymbols(c currency.Pair) (*WsGetSymbolsResponse, error) { return nil, err } - request := WsGetSymbolsRequest{ + req := WsGetSymbolsRequest{ Method: "getSymbol", Params: WsGetSymbolsRequestParameters{ Symbol: fPair.String(), }, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } @@ -813,7 +814,7 @@ func (h *HitBTC) wsGetTrades(c currency.Pair, limit int64, sort, by string) (*Ws return nil, err } - request := WsGetTradesRequest{ + req := WsGetTradesRequest{ Method: "getTrades", Params: WsGetTradesRequestParameters{ Symbol: fPair.String(), @@ -823,7 +824,7 @@ func (h *HitBTC) wsGetTrades(c currency.Pair, limit int64, sort, by string) (*Ws }, ID: h.Websocket.Conn.GenerateMessageID(false), } - resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request) + resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req) if err != nil { return nil, fmt.Errorf("%v %v", h.Name, err) } diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 268c6dd4..83974372 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -19,6 +19,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -224,7 +225,7 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { OP: "pong", TS: init.TS, } - err := h.Websocket.AuthConn.SendJSONMessage(context.TODO(), authPing) + err := h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, authPing) if err != nil { log.Errorln(log.ExchangeSys, err) } @@ -444,7 +445,7 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { } func (h *HUOBI) sendPingResponse(pong int64) { - err := h.Websocket.Conn.SendJSONMessage(context.TODO(), WsPong{Pong: pong}) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsPong{Pong: pong}) if err != nil { log.Errorln(log.ExchangeSys, err) } @@ -564,7 +565,7 @@ func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error { wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel, channelsToSubscribe[i].Channel) } else { - err = h.Websocket.Conn.SendJSONMessage(context.TODO(), WsRequest{ + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{ Subscribe: channelsToSubscribe[i].Channel, }) } @@ -598,7 +599,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error { wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel, channelsToUnsubscribe[i].Channel) } else { - err = h.Websocket.Conn.SendJSONMessage(context.TODO(), WsRequest{ + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{ Unsubscribe: channelsToUnsubscribe[i].Channel, }) } @@ -635,7 +636,7 @@ func (h *HUOBI) wsLogin(ctx context.Context) error { h.Websocket.SetCanUseAuthenticatedEndpoints(true) timestamp := time.Now().UTC().Format(wsDateTimeFormatting) - request := WsAuthenticationRequest{ + req := WsAuthenticationRequest{ Op: authOp, AccessKeyID: creds.Key, SignatureMethod: signatureMethod, @@ -646,8 +647,8 @@ func (h *HUOBI) wsLogin(ctx context.Context) error { if err != nil { return err } - request.Signature = crypto.Base64Encode(hmac) - err = h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) + req.Signature = crypto.Base64Encode(hmac) + err = h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, req) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -659,7 +660,7 @@ func (h *HUOBI) wsLogin(ctx context.Context) error { func (h *HUOBI) wsAuthenticatedSubscribe(creds *account.Credentials, operation, endpoint, topic string) error { timestamp := time.Now().UTC().Format(wsDateTimeFormatting) - request := WsAuthenticatedSubscriptionRequest{ + req := WsAuthenticatedSubscriptionRequest{ Op: operation, AccessKeyID: creds.Key, SignatureMethod: signatureMethod, @@ -671,8 +672,8 @@ func (h *HUOBI) wsAuthenticatedSubscribe(creds *account.Credentials, operation, if err != nil { return err } - request.Signature = crypto.Base64Encode(hmac) - return h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) + req.Signature = crypto.Base64Encode(hmac) + return h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, req) } func (h *HUOBI) wsGetAccountsList(ctx context.Context) (*WsAuthenticatedAccountsListResponse, error) { @@ -685,7 +686,7 @@ func (h *HUOBI) wsGetAccountsList(ctx context.Context) (*WsAuthenticatedAccounts } timestamp := time.Now().UTC().Format(wsDateTimeFormatting) - request := WsAuthenticatedAccountsListRequest{ + req := WsAuthenticatedAccountsListRequest{ Op: requestOp, AccessKeyID: creds.Key, SignatureMethod: signatureMethod, @@ -697,9 +698,9 @@ func (h *HUOBI) wsGetAccountsList(ctx context.Context) (*WsAuthenticatedAccounts if err != nil { return nil, err } - request.Signature = crypto.Base64Encode(hmac) - request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) - resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request) + req.Signature = crypto.Base64Encode(hmac) + req.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) + resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ClientID, req) if err != nil { return nil, err } @@ -732,7 +733,7 @@ func (h *HUOBI) wsGetOrdersList(ctx context.Context, accountID int64, pair curre } timestamp := time.Now().UTC().Format(wsDateTimeFormatting) - request := WsAuthenticatedOrdersListRequest{ + req := WsAuthenticatedOrdersListRequest{ Op: requestOp, AccessKeyID: creds.Key, SignatureMethod: signatureMethod, @@ -748,10 +749,10 @@ func (h *HUOBI) wsGetOrdersList(ctx context.Context, accountID int64, pair curre if err != nil { return nil, err } - request.Signature = crypto.Base64Encode(hmac) - request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) + req.Signature = crypto.Base64Encode(hmac) + req.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) - resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request) + resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ClientID, req) if err != nil { return nil, err } @@ -778,7 +779,7 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe return nil, err } timestamp := time.Now().UTC().Format(wsDateTimeFormatting) - request := WsAuthenticatedOrderDetailsRequest{ + req := WsAuthenticatedOrderDetailsRequest{ Op: requestOp, AccessKeyID: creds.Key, SignatureMethod: signatureMethod, @@ -791,9 +792,9 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe if err != nil { return nil, err } - request.Signature = crypto.Base64Encode(hmac) - request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) - resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request) + req.Signature = crypto.Base64Encode(hmac) + req.ClientID = h.Websocket.AuthConn.GenerateMessageID(true) + resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ClientID, req) if err != nil { return nil, err } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 6cf4eadf..6ae5cbf9 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -20,6 +20,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -334,7 +335,7 @@ func (k *Kraken) wsPingHandler() error { if err != nil { return err } - k.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + k.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Message: message, Delay: krakenWsPingDelay, MessageType: websocket.TextMessage, @@ -348,7 +349,7 @@ func (k *Kraken) wsAuthPingHandler() error { if err != nil { return err } - k.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ + k.Websocket.AuthConn.SetupPingHandler(request.Unset, stream.PingHandler{ Message: message, Delay: krakenWsPingDelay, MessageType: websocket.TextMessage, @@ -1228,9 +1229,9 @@ channels: for i := range *subs { var err error if common.StringSliceContains(authenticatedChannels, (*subs)[i].Subscription.Name) { - _, err = k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), (*subs)[i].RequestID, (*subs)[i]) + _, err = k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, (*subs)[i].RequestID, (*subs)[i]) } else { - _, err = k.Websocket.Conn.SendMessageReturnResponse(context.TODO(), (*subs)[i].RequestID, (*subs)[i]) + _, err = k.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, (*subs)[i].RequestID, (*subs)[i]) } if err == nil { err = k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) @@ -1287,9 +1288,9 @@ channels: for i := range unsubs { var err error if common.StringSliceContains(authenticatedChannels, unsubs[i].Subscription.Name) { - _, err = k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), unsubs[i].RequestID, unsubs[i]) + _, err = k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, unsubs[i].RequestID, unsubs[i]) } else { - _, err = k.Websocket.Conn.SendMessageReturnResponse(context.TODO(), unsubs[i].RequestID, unsubs[i]) + _, err = k.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, unsubs[i].RequestID, unsubs[i]) } if err == nil { err = k.Websocket.RemoveSubscriptions(unsubs[i].Channels...) @@ -1302,12 +1303,12 @@ channels: } // wsAddOrder creates an order, returned order ID if success -func (k *Kraken) wsAddOrder(request *WsAddOrderRequest) (string, error) { +func (k *Kraken) wsAddOrder(req *WsAddOrderRequest) (string, error) { id := k.Websocket.AuthConn.GenerateMessageID(false) - request.RequestID = id - request.Event = krakenWsAddOrder - request.Token = authToken - jsonResp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), id, request) + req.RequestID = id + req.Event = krakenWsAddOrder + req.Token = authToken + jsonResp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, id, req) if err != nil { return "", err } @@ -1339,14 +1340,14 @@ func (k *Kraken) wsCancelOrders(orderIDs []string) error { // wsCancelOrder cancels an open order func (k *Kraken) wsCancelOrder(orderID string) error { id := k.Websocket.AuthConn.GenerateMessageID(false) - request := WsCancelOrderRequest{ + req := WsCancelOrderRequest{ Event: krakenWsCancelOrder, Token: authToken, TransactionIDs: []string{orderID}, RequestID: id, } - resp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), id, request) + resp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, id, req) if err != nil { return fmt.Errorf("%w %s: %w", errCancellingOrder, orderID, err) } @@ -1370,13 +1371,13 @@ func (k *Kraken) wsCancelOrder(orderID string) error { // Returns number (count param) of affected orders or 0 if no open orders found func (k *Kraken) wsCancelAllOrders() (*WsCancelOrderResponse, error) { id := k.Websocket.AuthConn.GenerateMessageID(false) - request := WsCancelOrderRequest{ + req := WsCancelOrderRequest{ Event: krakenWsCancelAll, Token: authToken, RequestID: id, } - jsonResp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), id, request) + jsonResp, err := k.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.Unset, id, req) if err != nil { return &WsCancelOrderResponse{}, err } diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index f21a7c73..3a6bb5ce 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -137,7 +137,7 @@ func (ku *Kucoin) WsConnect() error { } ku.Websocket.Wg.Add(1) go ku.wsReadData() - ku.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + ku.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ Delay: time.Millisecond * time.Duration(instances.InstanceServers[0].PingTimeout), Message: []byte(`{"type":"ping"}`), MessageType: websocket.TextMessage, @@ -956,7 +956,7 @@ func (ku *Kucoin) manageSubscriptions(subs subscription.List, operation string) PrivateChannel: s.Authenticated, Response: true, } - if respRaw, err := ku.Websocket.Conn.SendMessageReturnResponse(context.TODO(), "msgID:"+msgID, req); err != nil { + if respRaw, err := ku.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "msgID:"+msgID, req); err != nil { errs = common.AppendError(errs, err) } else { rType, err := jsonparser.GetUnsafeString(respRaw, "type") diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index 61544953..5a4d1b87 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -19,6 +19,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -232,7 +233,7 @@ func (ok *Okx) WsConnect() error { log.Debugf(log.ExchangeSys, "Successful connection to %v\n", ok.Websocket.GetWebsocketURL()) } - ok.Websocket.Conn.SetupPingHandler(stream.PingHandler{ + ok.Websocket.Conn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: pingMsg, Delay: time.Second * 20, @@ -261,7 +262,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { } ok.Websocket.Wg.Add(1) go ok.wsReadData(ok.Websocket.AuthConn) - ok.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ + ok.Websocket.AuthConn.SetupPingHandler(request.Unset, stream.PingHandler{ MessageType: websocket.TextMessage, Message: pingMsg, Delay: time.Second * 20, @@ -281,7 +282,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { return err } base64Sign := crypto.Base64Encode(hmac) - request := WebsocketEventRequest{ + req := WebsocketEventRequest{ Operation: operationLogin, Arguments: []WebsocketLoginData{ { @@ -292,7 +293,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { }, }, } - err = ok.Websocket.AuthConn.SendJSONMessage(ctx, request) + err = ok.Websocket.AuthConn.SendJSONMessage(ctx, request.Unset, req) if err != nil { return err } @@ -328,7 +329,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { timer.Stop() return fmt.Errorf("%s websocket connection: timeout waiting for response with an operation: %v", ok.Name, - request.Operation) + req.Operation) } } } @@ -360,7 +361,7 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe subscription.List) error { // handleSubscription sends a subscription and unsubscription information thought the websocket endpoint. // as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects. func (ok *Okx) handleSubscription(operation string, subscriptions subscription.List) error { - request := WSSubscriptionInformationList{Operation: operation} + reqs := WSSubscriptionInformationList{Operation: operation} authRequests := WSSubscriptionInformationList{Operation: operation} ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() @@ -481,7 +482,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L if len(authChunk) > maxConnByteLen { authRequests.Arguments = authRequests.Arguments[:len(authRequests.Arguments)-1] i-- - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), authRequests) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, authRequests) if err != nil { return err } @@ -498,14 +499,14 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L } } else { channels = append(channels, s) - request.Arguments = append(request.Arguments, arg) - chunk, err := json.Marshal(request) + reqs.Arguments = append(reqs.Arguments, arg) + chunk, err := json.Marshal(reqs) if err != nil { return err } if len(chunk) > maxConnByteLen { i-- - err = ok.Websocket.Conn.SendJSONMessage(context.TODO(), request) + err = ok.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, reqs) if err != nil { return err } @@ -518,20 +519,20 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L return err } channels = subscription.List{} - request.Arguments = []SubscriptionInfo{} + reqs.Arguments = []SubscriptionInfo{} continue } } } - if len(request.Arguments) > 0 { - if err := ok.Websocket.Conn.SendJSONMessage(context.TODO(), request); err != nil { + if len(reqs.Arguments) > 0 { + if err := ok.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, reqs); err != nil { return err } } if len(authRequests.Arguments) > 0 && ok.Websocket.CanUseAuthenticatedEndpoints() { - if err := ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), authRequests); err != nil { + if err := ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, authRequests); err != nil { return err } } @@ -1369,7 +1370,7 @@ func (ok *Okx) WsPlaceOrder(arg *PlaceOrderRequestParam) (*OrderData, error) { Arguments: []PlaceOrderRequestParam{*arg}, Operation: okxOpOrder, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), placeOrderEPL, input) if err != nil { return nil, err } @@ -1428,7 +1429,7 @@ func (ok *Okx) WsPlaceMultipleOrder(args []PlaceOrderRequestParam) ([]OrderData, Arguments: args, Operation: okxOpBatchOrders, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), placeMultipleOrdersEPL, input) if err != nil { return nil, err } @@ -1498,7 +1499,7 @@ func (ok *Okx) WsCancelOrder(arg CancelOrderRequestParam) (*OrderData, error) { Arguments: []CancelOrderRequestParam{arg}, Operation: okxOpCancelOrder, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), cancelOrderEPL, input) if err != nil { return nil, err } @@ -1558,7 +1559,7 @@ func (ok *Okx) WsCancelMultipleOrder(args []CancelOrderRequestParam) ([]OrderDat Arguments: args, Operation: okxOpBatchCancelOrders, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), cancelMultipleOrdersEPL, input) if err != nil { return nil, err } @@ -1634,7 +1635,7 @@ func (ok *Okx) WsAmendOrder(arg *AmendOrderRequestParams) (*OrderData, error) { Operation: okxOpAmendOrder, Arguments: []AmendOrderRequestParams{*arg}, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), amendOrderEPL, input) if err != nil { return nil, err } @@ -1696,7 +1697,7 @@ func (ok *Okx) WsAmendMultipleOrders(args []AmendOrderRequestParams) ([]OrderDat Operation: okxOpBatchAmendOrders, Arguments: args, } - err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), amendMultipleOrdersEPL, input) if err != nil { return nil, err } @@ -1849,7 +1850,7 @@ func (ok *Okx) wsChannelSubscription(operation, channel string, assetType asset. } ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() - return ok.Websocket.Conn.SendJSONMessage(context.TODO(), input) + return ok.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, input) } // Private Channel Websocket methods @@ -1915,7 +1916,7 @@ func (ok *Okx) wsAuthChannelSubscription(operation, channel string, assetType as } ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() - return ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) + return ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), request.Unset, input) } // WsAccountSubscription retrieve account information. Data will be pushed when triggered by diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index 13c3d87e..971e19d5 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -217,6 +217,7 @@ func (ok *Okx) Setup(exch *config.Exchange) error { OrderbookBufferConfig: buffer.Config{ Checksum: ok.CalculateUpdateOrderbookChecksum, }, + RateLimitDefinitions: ok.Requester.GetRateLimiterDefinitions(), }); err != nil { return err } diff --git a/exchanges/okx/ratelimit.go b/exchanges/okx/ratelimit.go index bc1fd23d..ca0ec471 100644 --- a/exchanges/okx/ratelimit.go +++ b/exchanges/okx/ratelimit.go @@ -202,12 +202,12 @@ const ( ) const ( - placeOrderEPL request.EndpointLimit = iota - placeMultipleOrdersEPL - cancelOrderEPL - cancelMultipleOrdersEPL - amendOrderEPL - amendMultipleOrdersEPL + placeOrderEPL request.EndpointLimit = iota + 1 // This endpoint limit is shared with `Place order` Websocket API endpoints + placeMultipleOrdersEPL // This endpoint limit is shared with `Place multiple orders` Websocket API endpoints + cancelOrderEPL // This endpoint limit is shared with `Cancel order` Websocket API endpoints + cancelMultipleOrdersEPL // This endpoint limit is shared with `Cancel multiple orders` Websocket API endpoints + amendOrderEPL // This endpoint limit is shared with `Amend order` Websocket API endpoints + amendMultipleOrdersEPL // This endpoint limit is shared with `Amend multiple orders` Websocket API endpoints closePositionEPL getOrderDetEPL getOrderListEPL diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index 65ca4330..04eb08d3 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -604,7 +605,7 @@ func (p *Poloniex) manageSubs(subs subscription.List, op wsOp) error { } req.Channel = s.Pairs[0].String() } - err = p.Websocket.Conn.SendJSONMessage(context.TODO(), req) + err = p.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) } if err == nil { if op == wsSubscribeOp { @@ -628,14 +629,14 @@ func (p *Poloniex) wsSendAuthorisedCommand(secret, key string, op wsOp) error { if err != nil { return err } - request := wsAuthorisationRequest{ + req := wsAuthorisationRequest{ Command: op, Channel: 1000, Sign: crypto.HexEncodeToString(hmac), Key: key, Payload: nonce, } - return p.Websocket.Conn.SendJSONMessage(context.TODO(), request) + return p.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) } func (p *Poloniex) processAccountMarginPosition(notification []interface{}) error { diff --git a/exchanges/request/limit.go b/exchanges/request/limit.go index bdb09649..72e2815a 100644 --- a/exchanges/request/limit.go +++ b/exchanges/request/limit.go @@ -121,6 +121,15 @@ func (r *Requester) InitiateRateLimit(ctx context.Context, e EndpointLimit) erro return nil } +// GetRateLimiterDefinitions returns the rate limiter definitions for the +// requester +func (r *Requester) GetRateLimiterDefinitions() RateLimitDefinitions { + if r == nil { + return nil + } + return r.limiter +} + // RateLimit is a function that will rate limit a request based on the rate // limiter provided. It will return an error if the context is cancelled or // deadline exceeded. diff --git a/exchanges/request/request_test.go b/exchanges/request/request_test.go index fdd13801..5793ef66 100644 --- a/exchanges/request/request_test.go +++ b/exchanges/request/request_test.go @@ -707,3 +707,12 @@ func TestContextVerbosity(t *testing.T) { require.False(t, IsVerbose(context.WithValue(context.Background(), contextVerboseFlag, "bruh"), false)) require.True(t, IsVerbose(context.WithValue(context.Background(), contextVerboseFlag, true), false)) } + +func TestGetRateLimiterDefinitions(t *testing.T) { + t.Parallel() + require.Equal(t, RateLimitDefinitions(nil), (*Requester)(nil).GetRateLimiterDefinitions()) + r, err := New("test", new(http.Client), WithLimiter(globalshell)) + require.NoError(t, err) + require.NotEmpty(t, r.GetRateLimiterDefinitions()) + assert.Equal(t, globalshell, r.GetRateLimiterDefinitions()) +} diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index b7f16a6a..22963e25 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -16,16 +16,19 @@ import ( type Connection interface { Dial(*websocket.Dialer, http.Header) error ReadMessage() Response - SendJSONMessage(ctx context.Context, payload any) error - SetupPingHandler(PingHandler) - // GenerateMessageID generates a message ID for the individual connection. - // If a bespoke function is set (by using SetupNewConnection) it will use - // that, otherwise it will use the defaultGenerateMessageID function defined - // in websocket_connection.go. + SetupPingHandler(request.EndpointLimit, PingHandler) + // GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set + // (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function + // defined in websocket_connection.go. GenerateMessageID(highPrecision bool) int64 - SendMessageReturnResponse(ctx context.Context, signature any, request any) ([]byte, error) - SendMessageReturnResponses(ctx context.Context, signature any, request any, expected int) ([][]byte, error) - SendRawMessage(ctx context.Context, messageType int, message []byte) error + // SendMessageReturnResponse will send a WS message to the connection and wait for response + SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature any, request any) ([]byte, error) + // SendMessageReturnResponses will send a WS message to the connection and wait for N responses + SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature any, request any, expected int) ([][]byte, error) + // SendRawMessage sends a message over the connection without JSON encoding it + SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error + // SendJSONMessage sends a JSON encoded message over the connection + SendJSONMessage(ctx context.Context, epl request.EndpointLimit, payload any) error SetURL(string) SetProxy(string) GetURL() string diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 4fa512e0..b4a6ba63 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -196,6 +196,7 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.MaxSubscriptionsPerConnection = s.MaxWebsocketSubscriptionsPerConnection w.setState(disconnectedState) + w.rateLimitDefinitions = s.RateLimitDefinitions return nil } @@ -253,6 +254,7 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error { RateLimit: c.RateLimit, Reporter: c.ConnectionLevelReporter, bespokeGenerateMessageID: c.BespokeGenerateMessageID, + RateLimitDefinitions: w.rateLimitDefinitions, } if c.Authenticated { diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 22bbde84..a98ccee0 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -7,6 +7,7 @@ import ( "context" "crypto/rand" "encoding/json" + "errors" "fmt" "io" "math/big" @@ -22,6 +23,11 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) +var ( + errWebsocketIsDisconnected = errors.New("websocket connection is disconnected") + errRateLimitNotFound = errors.New("rate limit definition not found") +) + // Dial sets proxy urls and then connects to the websocket func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error { if w.ProxyURL != "" { @@ -56,8 +62,8 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header } // SendJSONMessage sends a JSON encoded message over the connection -func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, data interface{}) error { - return w.writeToConn(ctx, func() error { +func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, epl request.EndpointLimit, data any) error { + return w.writeToConn(ctx, epl, func() error { if request.IsVerbose(ctx, w.Verbose) { if msg, err := json.Marshal(data); err == nil { // WriteJSON will error for us anyway log.Debugf(log.WebsocketMgr, "%v %v: Sending message: %v", w.ExchangeName, removeURLQueryString(w.URL), string(msg)) @@ -68,8 +74,8 @@ func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, data interfac } // SendRawMessage sends a message over the connection without JSON encoding it -func (w *WebsocketConnection) SendRawMessage(ctx context.Context, messageType int, message []byte) error { - return w.writeToConn(ctx, func() error { +func (w *WebsocketConnection) SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error { + return w.writeToConn(ctx, epl, func() error { if request.IsVerbose(ctx, w.Verbose) { log.Debugf(log.WebsocketMgr, "%v %v: Sending message: %v", w.ExchangeName, removeURLQueryString(w.URL), string(message)) } @@ -77,43 +83,51 @@ func (w *WebsocketConnection) SendRawMessage(ctx context.Context, messageType in }) } -func (w *WebsocketConnection) writeToConn(ctx context.Context, writeConn func() error) error { +func (w *WebsocketConnection) writeToConn(ctx context.Context, epl request.EndpointLimit, writeConn func() error) error { if !w.IsConnected() { - return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) + return fmt.Errorf("%v websocket connection: cannot send message %w", w.ExchangeName, errWebsocketIsDisconnected) } - if w.RateLimit != nil { - err := request.RateLimit(ctx, w.RateLimit) - if err != nil { + + var rl *request.RateLimiterWithWeight + if w.RateLimitDefinitions != nil { + var ok bool + if rl, ok = w.RateLimitDefinitions[epl]; !ok && w.RateLimit == nil { + // Return an error if no specific connection rate limit is found for the endpoint but a global rate limit is + // set. This ensures the system attempts to apply rate limiting, prioritizing endpoint-specific limits + // if they are defined. + return fmt.Errorf("%s websocket connection: %w for %v", w.ExchangeName, errRateLimitNotFound, epl) + } + } + + if rl == nil { + // If a global rate limit definition is not found, use the connection rate limit as a fallback. + rl = w.RateLimit + } + + if rl != nil { + if err := request.RateLimit(ctx, rl); err != nil { return fmt.Errorf("%s websocket connection: rate limit error: %w", w.ExchangeName, err) } } // This lock acts as a rolling gate to prevent WriteMessage panics. Acquire after rate limit check. w.writeControl.Lock() defer w.writeControl.Unlock() - // NOTE: Secondary check to ensure the connection is still active after - // semacquire and potential rate limit. - if !w.IsConnected() { - return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) - } return writeConn() } // SetupPingHandler will automatically send ping or pong messages based on // WebsocketPingHandler configuration -func (w *WebsocketConnection) SetupPingHandler(handler PingHandler) { +func (w *WebsocketConnection) SetupPingHandler(epl request.EndpointLimit, handler PingHandler) { if handler.UseGorillaHandler { - h := func(msg string) error { - err := w.Connection.WriteControl(handler.MessageType, - []byte(msg), - time.Now().Add(handler.Delay)) + w.Connection.SetPingHandler(func(msg string) error { + err := w.Connection.WriteControl(handler.MessageType, []byte(msg), time.Now().Add(handler.Delay)) if err == websocket.ErrCloseSent { return nil } else if e, ok := err.(net.Error); ok && e.Timeout() { return nil } return err - } - w.Connection.SetPingHandler(h) + }) return } w.Wg.Add(1) @@ -126,12 +140,9 @@ func (w *WebsocketConnection) SetupPingHandler(handler PingHandler) { ticker.Stop() return case <-ticker.C: - err := w.SendRawMessage(context.TODO(), handler.MessageType, handler.Message) + err := w.SendRawMessage(context.TODO(), epl, handler.MessageType, handler.Message) if err != nil { - log.Errorf(log.WebsocketMgr, - "%v websocket connection: ping handler failed to send message [%s]", - w.ExchangeName, - handler.Message) + log.Errorf(log.WebsocketMgr, "%v websocket connection: ping handler failed to send message [%s]", w.ExchangeName, handler.Message) return } } @@ -272,8 +283,8 @@ func (w *WebsocketConnection) GetURL() string { } // SendMessageReturnResponse will send a WS message to the connection and wait for response -func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, signature, request any) ([]byte, error) { - resps, err := w.SendMessageReturnResponses(ctx, signature, request, 1) +func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error) { + resps, err := w.SendMessageReturnResponses(ctx, epl, signature, request, 1) if err != nil { return nil, err } @@ -282,7 +293,7 @@ func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, sig // SendMessageReturnResponses will send a WS message to the connection and wait for N responses // An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked -func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, signature, payload any, expected int) ([][]byte, error) { +func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature, payload any, expected int) ([][]byte, error) { outbound, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("error marshaling json for %s: %w", signature, err) @@ -294,7 +305,7 @@ func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, si } start := time.Now() - err = w.SendRawMessage(ctx, websocket.TextMessage, outbound) + err = w.SendRawMessage(ctx, epl, websocket.TextMessage, outbound) if err != nil { return nil, err } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index c6aacfcf..ecd32646 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -714,11 +714,11 @@ func TestSendMessage(t *testing.T) { } t.Fatal(err) } - err = testData.WC.SendJSONMessage(context.Background(), Ping) + err = testData.WC.SendJSONMessage(context.Background(), request.Unset, Ping) if err != nil { t.Error(err) } - err = testData.WC.SendRawMessage(context.Background(), websocket.TextMessage, []byte(Ping)) + err = testData.WC.SendRawMessage(context.Background(), request.Unset, websocket.TextMessage, []byte(Ping)) if err != nil { t.Error(err) } @@ -745,7 +745,7 @@ func TestSendMessageReturnResponse(t *testing.T) { go readMessages(t, wc) - request := testRequest{ + req := testRequest{ Event: "subscribe", Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, Subscription: testRequestData{ @@ -754,19 +754,19 @@ func TestSendMessageReturnResponse(t *testing.T) { RequestID: wc.GenerateMessageID(false), } - _, err = wc.SendMessageReturnResponse(context.Background(), request.RequestID, request) + _, err = wc.SendMessageReturnResponse(context.Background(), request.Unset, req.RequestID, req) if err != nil { t.Error(err) } cancelledCtx, fn := context.WithDeadline(context.Background(), time.Now()) fn() - _, err = wc.SendMessageReturnResponse(cancelledCtx, "123", request) + _, err = wc.SendMessageReturnResponse(cancelledCtx, request.Unset, "123", req) assert.ErrorIs(t, err, context.DeadlineExceeded) // with timeout wc.ResponseMaxLimit = 1 - _, err = wc.SendMessageReturnResponse(context.Background(), "123", request) + _, err = wc.SendMessageReturnResponse(context.Background(), request.Unset, "123", req) assert.ErrorIs(t, err, ErrSignatureTimeout, "SendMessageReturnResponse should error when request ID not found") } @@ -829,7 +829,7 @@ func TestSetupPingHandler(t *testing.T) { t.Fatal(err) } - wc.SetupPingHandler(PingHandler{ + wc.SetupPingHandler(request.Unset, PingHandler{ UseGorillaHandler: true, MessageType: websocket.PingMessage, Delay: 100, @@ -844,7 +844,7 @@ func TestSetupPingHandler(t *testing.T) { if err != nil { t.Fatal(err) } - wc.SetupPingHandler(PingHandler{ + wc.SetupPingHandler(request.Unset, PingHandler{ MessageType: websocket.TextMessage, Message: []byte(Ping), Delay: 200, @@ -1187,7 +1187,7 @@ func TestLatency(t *testing.T) { go readMessages(t, wc) - request := testRequest{ + req := testRequest{ Event: "subscribe", Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, Subscription: testRequestData{ @@ -1196,7 +1196,7 @@ func TestLatency(t *testing.T) { RequestID: wc.GenerateMessageID(false), } - _, err = wc.SendMessageReturnResponse(context.Background(), request.RequestID, request) + _, err = wc.SendMessageReturnResponse(context.Background(), request.Unset, req.RequestID, req) if err != nil { t.Error(err) } @@ -1248,3 +1248,29 @@ func TestRemoveURLQueryString(t *testing.T) { assert.Equal(t, "https://www.google.com", removeURLQueryString("https://www.google.com"), "removeURLQueryString should not change URL") assert.Equal(t, "", removeURLQueryString(""), "removeURLQueryString should be equal") } + +func TestWriteToConn(t *testing.T) { + t.Parallel() + wc := WebsocketConnection{} + require.ErrorIs(t, wc.writeToConn(context.Background(), request.Unset, func() error { return nil }), errWebsocketIsDisconnected) + wc.setConnectedStatus(true) + // No rate limits set + require.NoError(t, wc.writeToConn(context.Background(), request.Unset, func() error { return nil })) + // connection rate limit set + wc.RateLimit = request.NewWeightedRateLimitByDuration(time.Millisecond) + require.NoError(t, wc.writeToConn(context.Background(), request.Unset, func() error { return nil })) + // context cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.ErrorIs(t, wc.writeToConn(ctx, request.Unset, func() error { return nil }), context.Canceled) + // definitions set but with fallover + wc.RateLimitDefinitions = request.RateLimitDefinitions{ + request.Auth: request.NewWeightedRateLimitByDuration(time.Millisecond), + } + require.NoError(t, wc.writeToConn(context.Background(), request.Unset, func() error { return nil })) + // match with global rate limit + require.NoError(t, wc.writeToConn(context.Background(), request.Auth, func() error { return nil })) + // definitions set but connection rate limiter not set + wc.RateLimit = nil + require.ErrorIs(t, wc.writeToConn(ctx, request.Unset, func() error { return nil }), errRateLimitNotFound) +} diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index fd872ab7..54d73fcb 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -96,6 +96,10 @@ type Websocket struct { // MaxSubScriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxSubscriptionsPerConnection int + + // rateLimitDefinitions contains the rate limiters shared between Websocket and REST connections for all potential + // endpoints. + rateLimitDefinitions request.RateLimitDefinitions } // WebsocketSetup defines variables for setting up a websocket connection @@ -121,6 +125,13 @@ type WebsocketSetup struct { // MaxWebsocketSubscriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxWebsocketSubscriptionsPerConnection int + + // RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all endpoints. + // These rate limits take precedence over any rate limits specified in individual connection configurations. + // If no connection-specific rate limit is provided and the endpoint does not match any of these definitions, + // an error will be returned. However, if a connection configuration includes its own rate limit, + // it will fall back to that configuration’s rate limit without raising an error. + RateLimitDefinitions request.RateLimitDefinitions } // WebsocketConnection contains all the data needed to send a message to a WS @@ -133,7 +144,12 @@ type WebsocketConnection struct { // writes methods writeControl sync.Mutex - RateLimit *request.RateLimiterWithWeight + // RateLimit is a rate limiter for the connection itself + RateLimit *request.RateLimiterWithWeight + // RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all + // potential endpoints. + RateLimitDefinitions request.RateLimitDefinitions + ExchangeName string URL string ProxyURL string