From cb6b3421a7432dc24e68d33bdf3314389c7826bd Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Mon, 2 Sep 2024 16:43:05 +1000 Subject: [PATCH] exchanges/websocket: update websocket rate limiting to use requester rate limiting functionality (#1578) * exchanges/websocket: update websocket rate limiting to use requester rate limiting functionality. * glorious: nits * rm unsused * updoo * glorious: purgerino * reduce duplicate code * thrasher: engrish --------- Co-authored-by: shazbert --- exchanges/binance/binance_types.go | 2 - exchanges/binance/binance_wrapper.go | 2 +- exchanges/binanceus/binanceus_types.go | 2 - exchanges/binanceus/binanceus_websocket.go | 8 +-- exchanges/binanceus/binanceus_wrapper.go | 2 +- exchanges/bitfinex/bitfinex_websocket.go | 10 +-- exchanges/bithumb/bithumb_websocket.go | 3 +- exchanges/bithumb/bithumb_wrapper.go | 4 +- exchanges/bitmex/bitmex_websocket.go | 6 +- exchanges/bitstamp/bitstamp_websocket.go | 4 +- exchanges/btcmarkets/btcmarkets_websocket.go | 4 +- exchanges/btse/btse_websocket.go | 6 +- exchanges/bybit/bybit_inverse_websocket.go | 3 +- exchanges/bybit/bybit_linear_websocket.go | 2 +- exchanges/bybit/bybit_options_websocket.go | 3 +- exchanges/bybit/ratelimit.go | 4 +- .../coinbasepro/coinbasepro_websocket.go | 4 +- exchanges/coinut/coinut.go | 1 - exchanges/coinut/coinut_websocket.go | 2 +- exchanges/coinut/coinut_wrapper.go | 2 +- exchanges/deribit/deribit_websocket.go | 4 +- exchanges/gateio/gateio_websocket.go | 2 +- exchanges/gateio/gateio_wrapper.go | 2 +- .../gateio/gateio_ws_delivery_futures.go | 3 +- exchanges/gateio/gateio_ws_futures.go | 3 +- exchanges/gemini/gemini_websocket.go | 2 +- exchanges/hitbtc/hitbtc_websocket.go | 7 +- exchanges/hitbtc/hitbtc_wrapper.go | 2 +- exchanges/huobi/huobi_websocket.go | 13 ++-- exchanges/huobi/huobi_wrapper.go | 4 +- exchanges/kraken/kraken_websocket.go | 1 - exchanges/kraken/kraken_wrapper.go | 4 +- exchanges/kucoin/kucoin_wrapper.go | 2 +- exchanges/okx/okx_websocket.go | 26 ++++---- exchanges/okx/okx_wrapper.go | 4 +- exchanges/poloniex/poloniex_websocket.go | 4 +- exchanges/request/limit.go | 22 ++++++- exchanges/request/request_test.go | 6 +- exchanges/stream/stream_types.go | 7 +- exchanges/stream/websocket.go | 2 +- exchanges/stream/websocket_connection.go | 66 +++++++++---------- exchanges/stream/websocket_test.go | 9 +-- exchanges/stream/websocket_types.go | 3 +- 43 files changed, 142 insertions(+), 130 deletions(-) diff --git a/exchanges/binance/binance_types.go b/exchanges/binance/binance_types.go index c267fda7..c5813a7c 100644 --- a/exchanges/binance/binance_types.go +++ b/exchanges/binance/binance_types.go @@ -10,8 +10,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/types" ) -const wsRateLimitMilliseconds = 250 - // withdrawals status codes description const ( EmailSent = iota diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index 951de419..7e6fe658 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -258,7 +258,7 @@ func (b *Binance) Setup(exch *config.Exchange) error { return b.Websocket.SetupNewConnection(stream.ConnectionSetup{ ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - RateLimit: wsRateLimitMilliseconds, + RateLimit: request.NewWeightedRateLimitByDuration(250 * time.Millisecond), }) } diff --git a/exchanges/binanceus/binanceus_types.go b/exchanges/binanceus/binanceus_types.go index 1cab9043..07fb569a 100644 --- a/exchanges/binanceus/binanceus_types.go +++ b/exchanges/binanceus/binanceus_types.go @@ -29,8 +29,6 @@ var ( BinanceRequestParamsOrderLimitMarker = RequestParamsOrderType("LIMIT_MAKER") ) -const wsRateLimitMilliseconds = 300 - // crypto withdrawals status codes description const ( EmailSent = iota diff --git a/exchanges/binanceus/binanceus_websocket.go b/exchanges/binanceus/binanceus_websocket.go index d10cfe84..a7c18899 100644 --- a/exchanges/binanceus/binanceus_websocket.go +++ b/exchanges/binanceus/binanceus_websocket.go @@ -576,7 +576,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error { for i := range channelsToSubscribe { payload.Params = append(payload.Params, channelsToSubscribe[i].Channel) if i%50 == 0 && i != 0 { - err := bi.Websocket.Conn.SendJSONMessage(payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) if err != nil { return err } @@ -584,7 +584,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error { } } if len(payload.Params) > 0 { - err := bi.Websocket.Conn.SendJSONMessage(payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) if err != nil { return err } @@ -600,7 +600,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error for i := range channelsToUnsubscribe { payload.Params = append(payload.Params, channelsToUnsubscribe[i].Channel) if i%50 == 0 && i != 0 { - err := bi.Websocket.Conn.SendJSONMessage(payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) if err != nil { return err } @@ -608,7 +608,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error } } if len(payload.Params) > 0 { - err := bi.Websocket.Conn.SendJSONMessage(payload) + err := bi.Websocket.Conn.SendJSONMessage(context.TODO(), payload) if err != nil { return err } diff --git a/exchanges/binanceus/binanceus_wrapper.go b/exchanges/binanceus/binanceus_wrapper.go index 8314c22d..bbe7d6fc 100644 --- a/exchanges/binanceus/binanceus_wrapper.go +++ b/exchanges/binanceus/binanceus_wrapper.go @@ -188,7 +188,7 @@ func (bi *Binanceus) Setup(exch *config.Exchange) error { return bi.Websocket.SetupNewConnection(stream.ConnectionSetup{ ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - RateLimit: wsRateLimitMilliseconds, + RateLimit: request.NewWeightedRateLimitByDuration(300 * time.Millisecond), }) } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 8b0899cd..1a50702a 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1710,7 +1710,7 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() (subscription.List, error) { // ConfigureWS to send checksums and sequence numbers func (b *Bitfinex) ConfigureWS() error { - return b.Websocket.Conn.SendJSONMessage(map[string]interface{}{ + return b.Websocket.Conn.SendJSONMessage(context.TODO(), map[string]interface{}{ "event": "conf", "flags": bitfinexChecksumFlag + bitfinexWsSequenceFlag, }) @@ -1914,7 +1914,7 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error { AuthNonce: nonce, DeadManSwitch: 0, } - err = b.Websocket.AuthConn.SendJSONMessage(request) + err = b.Websocket.AuthConn.SendJSONMessage(ctx, request) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -2028,7 +2028,7 @@ func (b *Bitfinex) WsCancelMultiOrders(orderIDs []int64) error { OrderID: orderIDs, } request := makeRequestInterface(wsCancelMultipleOrders, cancel) - return b.Websocket.AuthConn.SendJSONMessage(request) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) } // WsCancelOrder authenticated cancel order request @@ -2079,13 +2079,13 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error { func (b *Bitfinex) WsCancelAllOrders() error { cancelAll := WsCancelAllOrdersRequest{All: 1} request := makeRequestInterface(wsCancelMultipleOrders, cancelAll) - return b.Websocket.AuthConn.SendJSONMessage(request) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) } // WsNewOffer authenticated new offer request func (b *Bitfinex) WsNewOffer(data *WsNewOfferRequest) error { request := makeRequestInterface(wsFundingOfferNew, data) - return b.Websocket.AuthConn.SendJSONMessage(request) + return b.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) } // WsCancelOffer authenticated cancel offer request diff --git a/exchanges/bithumb/bithumb_websocket.go b/exchanges/bithumb/bithumb_websocket.go index 929d68a6..b7783dd2 100644 --- a/exchanges/bithumb/bithumb_websocket.go +++ b/exchanges/bithumb/bithumb_websocket.go @@ -1,6 +1,7 @@ package bithumb import ( + "context" "encoding/json" "fmt" "net/http" @@ -203,7 +204,7 @@ func (b *Bithumb) Subscribe(channelsToSubscribe subscription.List) error { if s.Channel == "ticker" { req.TickTypes = wsDefaultTickTypes } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } diff --git a/exchanges/bithumb/bithumb_wrapper.go b/exchanges/bithumb/bithumb_wrapper.go index 8b30aee0..7c6fdb5c 100644 --- a/exchanges/bithumb/bithumb_wrapper.go +++ b/exchanges/bithumb/bithumb_wrapper.go @@ -32,8 +32,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) -const wsRateLimitMillisecond = 1000 - var errNotEnoughPairs = errors.New("at least one currency is required to fetch order history") // SetDefaults sets the basic defaults for Bithumb @@ -172,7 +170,7 @@ func (b *Bithumb) Setup(exch *config.Exchange) error { return b.Websocket.SetupNewConnection(stream.ConnectionSetup{ ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - RateLimit: wsRateLimitMillisecond, + RateLimit: request.NewWeightedRateLimitByDuration(time.Second), }) } diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 6779a97a..e604b6ed 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -599,7 +599,7 @@ func (b *Bitmex) Subscribe(subs subscription.List) error { req.Arguments = append(req.Arguments, cName+":"+p.String()) } } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(subs...) } @@ -618,7 +618,7 @@ func (b *Bitmex) Unsubscribe(subs subscription.List) error { req.Arguments = append(req.Arguments, cName+":"+p.String()) } } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.RemoveSubscriptions(subs...) } @@ -655,7 +655,7 @@ func (b *Bitmex) websocketSendAuth(ctx context.Context) error { sendAuth.Command = "authKeyExpires" sendAuth.Arguments = append(sendAuth.Arguments, creds.Key, timestamp, signature) - err = b.Websocket.Conn.SendJSONMessage(sendAuth) + err = b.Websocket.Conn.SendJSONMessage(ctx, sendAuth) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index ab887e42..efed7efb 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -292,7 +292,7 @@ func (b *Bitstamp) Subscribe(channelsToSubscribe subscription.List) error { req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(auth.UserID)) req.Data.Auth = auth.Token } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } @@ -314,7 +314,7 @@ func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe subscription.List) error { Channel: s.Channel, }, } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.RemoveSubscriptions(s) } diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index f6e415ab..46b5f1b9 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -374,7 +374,7 @@ func (b *BTCMarkets) Subscribe(subs subscription.List) error { r.Channels = []string{s.Channel} r.MarketIDs = s.Pairs.Strings() - err := b.Websocket.Conn.SendJSONMessage(r) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), r) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(s) } @@ -414,7 +414,7 @@ func (b *BTCMarkets) Unsubscribe(subs subscription.List) error { MarketIDs: s.Pairs.Strings(), } - err := b.Websocket.Conn.SendJSONMessage(req) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), req) if err == nil { err = b.Websocket.RemoveSubscriptions(s) } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 4bac4951..8f682b23 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -78,7 +78,7 @@ func (b *BTSE) WsAuthenticate(ctx context.Context) error { Operation: "authKeyExpires", Arguments: []string{creds.Key, nonce, sign}, } - return b.Websocket.Conn.SendJSONMessage(req) + return b.Websocket.Conn.SendJSONMessage(ctx, req) } func stringToOrderStatus(status string) (order.Status, error) { @@ -392,7 +392,7 @@ func (b *BTSE) Subscribe(channelsToSubscribe subscription.List) error { for i := range channelsToSubscribe { sub.Arguments = append(sub.Arguments, channelsToSubscribe[i].Channel) } - err := b.Websocket.Conn.SendJSONMessage(sub) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), sub) if err == nil { err = b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) } @@ -407,7 +407,7 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe subscription.List) error { unSub.Arguments = append(unSub.Arguments, channelsToUnsubscribe[i].Channel) } - err := b.Websocket.Conn.SendJSONMessage(unSub) + err := b.Websocket.Conn.SendJSONMessage(context.TODO(), unSub) if err == nil { err = b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...) } diff --git a/exchanges/bybit/bybit_inverse_websocket.go b/exchanges/bybit/bybit_inverse_websocket.go index 60f8981c..6ea2d83e 100644 --- a/exchanges/bybit/bybit_inverse_websocket.go +++ b/exchanges/bybit/bybit_inverse_websocket.go @@ -1,6 +1,7 @@ package bybit import ( + "context" "net/http" "github.com/gorilla/websocket" @@ -71,7 +72,7 @@ func (by *Bybit) handleInversePayloadSubscription(operation string, channelSubsc for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/bybit_linear_websocket.go b/exchanges/bybit/bybit_linear_websocket.go index a6ce8ade..8ee49abd 100644 --- a/exchanges/bybit/bybit_linear_websocket.go +++ b/exchanges/bybit/bybit_linear_websocket.go @@ -90,7 +90,7 @@ func (by *Bybit) handleLinearPayloadSubscription(operation string, channelSubscr for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/bybit_options_websocket.go b/exchanges/bybit/bybit_options_websocket.go index 72a86b0c..d387b513 100644 --- a/exchanges/bybit/bybit_options_websocket.go +++ b/exchanges/bybit/bybit_options_websocket.go @@ -1,6 +1,7 @@ package bybit import ( + "context" "encoding/json" "net/http" "strconv" @@ -78,7 +79,7 @@ func (by *Bybit) handleOptionsPayloadSubscription(operation string, channelSubsc for a := range payloads { // The options connection does not send the subscription request id back with the subscription notification payload // therefore the code doesn't wait for the response to check whether the subscription is successful or not. - err = by.Websocket.Conn.SendJSONMessage(payloads[a]) + err = by.Websocket.Conn.SendJSONMessage(context.TODO(), payloads[a]) if err != nil { return err } diff --git a/exchanges/bybit/ratelimit.go b/exchanges/bybit/ratelimit.go index 1bdf90c3..4cdcbff2 100644 --- a/exchanges/bybit/ratelimit.go +++ b/exchanges/bybit/ratelimit.go @@ -77,7 +77,7 @@ func GetRateLimit() request.RateLimitDefinitions { amendOrderEPL: request.NewRateLimitWithWeight(time.Second, 10, 10), cancelOrderEPL: request.NewRateLimitWithWeight(time.Second, 10, 10), cancelSpotEPL: request.NewRateLimitWithWeight(time.Second, 20, 20), - cancelAllEPL: request.NewRateLimitWithWeight(time.Second, 1, 1), + cancelAllEPL: request.NewWeightedRateLimitByDuration(time.Second), cancelAllSpotEPL: request.NewRateLimitWithWeight(time.Second, 20, 20), createBatchOrderEPL: request.NewRateLimitWithWeight(time.Second, 10, 10), amendBatchOrderEPL: request.NewRateLimitWithWeight(time.Second, 10, 10), @@ -109,7 +109,7 @@ func GetRateLimit() request.RateLimitDefinitions { interTransferEPL: request.NewRateLimitWithWeight(time.Minute, 20, 1), saveTransferSubMemberEPL: request.NewRateLimitWithWeight(time.Minute, 20, 1), universalTransferEPL: request.NewRateLimitWithWeight(time.Second, 5, 5), - createWithdrawalEPL: request.NewRateLimitWithWeight(time.Second, 1, 1), + createWithdrawalEPL: request.NewWeightedRateLimitByDuration(time.Second), cancelWithdrawalEPL: request.NewRateLimitWithWeight(time.Minute, 60, 1), userCreateSubMemberEPL: request.NewRateLimitWithWeight(time.Second, 5, 5), userCreateSubAPIKeyEPL: request.NewRateLimitWithWeight(time.Second, 5, 5), diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 4765eba5..647612b1 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -423,7 +423,7 @@ func (c *CoinbasePro) Subscribe(subs subscription.List) error { r.Channels = append(r.Channels, s.Channel) } } - err := c.Websocket.Conn.SendJSONMessage(r) + err := c.Websocket.Conn.SendJSONMessage(context.TODO(), r) if err == nil { err = c.Websocket.AddSuccessfulSubscriptions(subs...) } @@ -459,7 +459,7 @@ func (c *CoinbasePro) Unsubscribe(subs subscription.List) error { ProductIDs: s.Pairs.Strings(), }) } - err := c.Websocket.Conn.SendJSONMessage(r) + err := c.Websocket.Conn.SendJSONMessage(context.TODO(), r) if err == nil { err = c.Websocket.RemoveSubscriptions(subs...) } diff --git a/exchanges/coinut/coinut.go b/exchanges/coinut/coinut.go index f80a353b..2f17789b 100644 --- a/exchanges/coinut/coinut.go +++ b/exchanges/coinut/coinut.go @@ -43,7 +43,6 @@ const ( coinutStatusOK = "OK" coinutMaxNonce = 16777215 // See https://github.com/coinut/api/wiki/Websocket-API#nonce - wsRateLimitInMilliseconds = 33 ) var errLookupInstrumentID = errors.New("unable to lookup instrument ID") diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index ec74b7d2..10bb52e8 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -618,7 +618,7 @@ func (c *COINUT) Subscribe(subs subscription.List) error { Subscribe: true, Nonce: getNonce(), } - err = c.Websocket.Conn.SendJSONMessage(subscribe) + err = c.Websocket.Conn.SendJSONMessage(context.TODO(), subscribe) if err == nil { err = c.Websocket.AddSuccessfulSubscriptions(s) } diff --git a/exchanges/coinut/coinut_wrapper.go b/exchanges/coinut/coinut_wrapper.go index 2a8fd264..1dd0be29 100644 --- a/exchanges/coinut/coinut_wrapper.go +++ b/exchanges/coinut/coinut_wrapper.go @@ -151,7 +151,7 @@ func (c *COINUT) Setup(exch *config.Exchange) error { return c.Websocket.SetupNewConnection(stream.ConnectionSetup{ ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - RateLimit: wsRateLimitInMilliseconds, + RateLimit: request.NewWeightedRateLimitByDuration(33 * time.Millisecond), }) } diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index 9c096022..bec47df0 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -113,7 +113,7 @@ func (d *Deribit) WsConnect() error { d.Websocket.SetCanUseAuthenticatedEndpoints(false) } } - return d.Websocket.Conn.SendJSONMessage(setHeartBeatMessage) + return d.Websocket.Conn.SendJSONMessage(context.TODO(), setHeartBeatMessage) } func (d *Deribit) wsLogin(ctx context.Context) error { @@ -187,7 +187,7 @@ func (d *Deribit) wsHandleData(respRaw []byte) error { return fmt.Errorf("%s - err %s could not parse websocket data: %s", d.Name, err, respRaw) } if response.Method == "heartbeat" { - return d.Websocket.Conn.SendJSONMessage(pingMessage) + return d.Websocket.Conn.SendJSONMessage(context.TODO(), pingMessage) } if response.ID > 2 { if !d.Websocket.Match.IncomingWithData(response.ID, respRaw) { diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 367e3137..a436ede3 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -30,7 +30,7 @@ import ( const ( gateioWebsocketEndpoint = "wss://api.gateio.ws/ws/v4/" - gateioWebsocketRateLimit = 120 + gateioWebsocketRateLimit = 120 * time.Millisecond spotPingChannel = "spot.ping" spotPongChannel = "spot.pong" diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index d918762d..0d03b3c4 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -227,7 +227,7 @@ func (g *Gateio) Setup(exch *config.Exchange) error { } return g.Websocket.SetupNewConnection(stream.ConnectionSetup{ URL: gateioWebsocketEndpoint, - RateLimit: gateioWebsocketRateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, BespokeGenerateMessageID: g.GenerateWebsocketMessageID, diff --git a/exchanges/gateio/gateio_ws_delivery_futures.go b/exchanges/gateio/gateio_ws_delivery_futures.go index 1a41e21a..a2908c95 100644 --- a/exchanges/gateio/gateio_ws_delivery_futures.go +++ b/exchanges/gateio/gateio_ws_delivery_futures.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/log" @@ -63,7 +64,7 @@ func (g *Gateio) WsDeliveryFuturesConnect() error { } err = g.Websocket.SetupNewConnection(stream.ConnectionSetup{ URL: deliveryRealBTCTradingURL, - RateLimit: gateioWebsocketRateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit), ResponseCheckTimeout: g.Config.WebsocketResponseCheckTimeout, ResponseMaxLimit: g.Config.WebsocketResponseMaxLimit, Authenticated: true, diff --git a/exchanges/gateio/gateio_ws_futures.go b/exchanges/gateio/gateio_ws_futures.go index 167e9884..a6f4207f 100644 --- a/exchanges/gateio/gateio_ws_futures.go +++ b/exchanges/gateio/gateio_ws_futures.go @@ -19,6 +19,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -82,7 +83,7 @@ func (g *Gateio) WsFuturesConnect() error { err = g.Websocket.SetupNewConnection(stream.ConnectionSetup{ URL: futuresWebsocketBtcURL, - RateLimit: gateioWebsocketRateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit), ResponseCheckTimeout: g.Config.WebsocketResponseCheckTimeout, ResponseMaxLimit: g.Config.WebsocketResponseMaxLimit, Authenticated: true, diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index 566611bc..6c811ccd 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -112,7 +112,7 @@ func (g *Gemini) manageSubs(subs subscription.List, op wsSubOp) error { }) } - if err := g.Websocket.Conn.SendJSONMessage(req); err != nil { + if err := g.Websocket.Conn.SendJSONMessage(context.TODO(), req); err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 68f68de9..ac5a439f 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -27,7 +27,6 @@ import ( const ( hitbtcWebsocketAddress = "wss://api.hitbtc.com/api/2/ws" rpcVersion = "2.0" - rateLimit = 20 errAuthFailed = 1002 ) @@ -524,7 +523,7 @@ func (h *HitBTC) Subscribe(channelsToSubscribe subscription.List) error { r.Params.Limit = 100 } - err := h.Websocket.Conn.SendJSONMessage(r) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), r) if err == nil { err = h.Websocket.AddSuccessfulSubscriptions(s) } @@ -560,7 +559,7 @@ func (h *HitBTC) Unsubscribe(subs subscription.List) error { r.Params.Limit = 100 } - err := h.Websocket.Conn.SendJSONMessage(r) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), r) if err == nil { err = h.Websocket.RemoveSubscriptions(s) } @@ -600,7 +599,7 @@ func (h *HitBTC) wsLogin(ctx context.Context) error { ID: h.Websocket.Conn.GenerateMessageID(false), } - err = h.Websocket.Conn.SendJSONMessage(request) + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index 3e1bfc69..4d23ba01 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -169,7 +169,7 @@ func (h *HitBTC) Setup(exch *config.Exchange) error { } return h.Websocket.SetupNewConnection(stream.ConnectionSetup{ - RateLimit: rateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(20 * time.Millisecond), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, }) diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 3c86ac62..268c6dd4 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -53,7 +53,6 @@ const ( authOp = "auth" loginDelay = 50 * time.Millisecond - rateLimit = 20 ) // Instantiates a communications channel between websocket connections @@ -225,7 +224,7 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { OP: "pong", TS: init.TS, } - err := h.Websocket.AuthConn.SendJSONMessage(authPing) + err := h.Websocket.AuthConn.SendJSONMessage(context.TODO(), authPing) if err != nil { log.Errorln(log.ExchangeSys, err) } @@ -445,7 +444,7 @@ func (h *HUOBI) wsHandleData(respRaw []byte) error { } func (h *HUOBI) sendPingResponse(pong int64) { - err := h.Websocket.Conn.SendJSONMessage(WsPong{Pong: pong}) + err := h.Websocket.Conn.SendJSONMessage(context.TODO(), WsPong{Pong: pong}) if err != nil { log.Errorln(log.ExchangeSys, err) } @@ -565,7 +564,7 @@ func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error { wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel, channelsToSubscribe[i].Channel) } else { - err = h.Websocket.Conn.SendJSONMessage(WsRequest{ + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), WsRequest{ Subscribe: channelsToSubscribe[i].Channel, }) } @@ -599,7 +598,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error { wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel, channelsToUnsubscribe[i].Channel) } else { - err = h.Websocket.Conn.SendJSONMessage(WsRequest{ + err = h.Websocket.Conn.SendJSONMessage(context.TODO(), WsRequest{ Unsubscribe: channelsToUnsubscribe[i].Channel, }) } @@ -648,7 +647,7 @@ func (h *HUOBI) wsLogin(ctx context.Context) error { return err } request.Signature = crypto.Base64Encode(hmac) - err = h.Websocket.AuthConn.SendJSONMessage(request) + err = h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -673,7 +672,7 @@ func (h *HUOBI) wsAuthenticatedSubscribe(creds *account.Credentials, operation, return err } request.Signature = crypto.Base64Encode(hmac) - return h.Websocket.AuthConn.SendJSONMessage(request) + return h.Websocket.AuthConn.SendJSONMessage(context.TODO(), request) } func (h *HUOBI) wsGetAccountsList(ctx context.Context) (*WsAuthenticatedAccountsListResponse, error) { diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index 3c100230..3efcc74c 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -220,7 +220,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error { } err = h.Websocket.SetupNewConnection(stream.ConnectionSetup{ - RateLimit: rateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(20 * time.Millisecond), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, }) @@ -229,7 +229,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error { } return h.Websocket.SetupNewConnection(stream.ConnectionSetup{ - RateLimit: rateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(20 * time.Millisecond), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, URL: wsAccountsOrdersURL, diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 1646adb0..eab69d7b 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -52,7 +52,6 @@ const ( krakenWsAddOrderStatus = "addOrderStatus" krakenWsCancelOrderStatus = "cancelOrderStatus" krakenWsCancelAllOrderStatus = "cancelAllStatus" - krakenWsRateLimit = 50 krakenWsPingDelay = time.Second * 27 krakenWsOrderbookDepth = 1000 ) diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index b847a49e..4f0861e4 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -232,7 +232,7 @@ func (k *Kraken) Setup(exch *config.Exchange) error { } err = k.Websocket.SetupNewConnection(stream.ConnectionSetup{ - RateLimit: krakenWsRateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(50 * time.Millisecond), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, URL: krakenWSURL, @@ -242,7 +242,7 @@ func (k *Kraken) Setup(exch *config.Exchange) error { } return k.Websocket.SetupNewConnection(stream.ConnectionSetup{ - RateLimit: krakenWsRateLimit, + RateLimit: request.NewWeightedRateLimitByDuration(50 * time.Millisecond), ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, URL: krakenAuthWSURL, diff --git a/exchanges/kucoin/kucoin_wrapper.go b/exchanges/kucoin/kucoin_wrapper.go index 0d6c2068..28570819 100644 --- a/exchanges/kucoin/kucoin_wrapper.go +++ b/exchanges/kucoin/kucoin_wrapper.go @@ -217,7 +217,7 @@ func (ku *Kucoin) Setup(exch *config.Exchange) error { return ku.Websocket.SetupNewConnection(stream.ConnectionSetup{ ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: exch.WebsocketResponseMaxLimit, - RateLimit: 500, + RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1), }) } diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index 1758d17d..61544953 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -292,7 +292,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { }, }, } - err = ok.Websocket.AuthConn.SendJSONMessage(request) + err = ok.Websocket.AuthConn.SendJSONMessage(ctx, request) if err != nil { return err } @@ -481,7 +481,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L if len(authChunk) > maxConnByteLen { authRequests.Arguments = authRequests.Arguments[:len(authRequests.Arguments)-1] i-- - err = ok.Websocket.AuthConn.SendJSONMessage(authRequests) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), authRequests) if err != nil { return err } @@ -505,7 +505,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L } if len(chunk) > maxConnByteLen { i-- - err = ok.Websocket.Conn.SendJSONMessage(request) + err = ok.Websocket.Conn.SendJSONMessage(context.TODO(), request) if err != nil { return err } @@ -525,13 +525,13 @@ func (ok *Okx) handleSubscription(operation string, subscriptions subscription.L } if len(request.Arguments) > 0 { - if err := ok.Websocket.Conn.SendJSONMessage(request); err != nil { + if err := ok.Websocket.Conn.SendJSONMessage(context.TODO(), request); err != nil { return err } } if len(authRequests.Arguments) > 0 && ok.Websocket.CanUseAuthenticatedEndpoints() { - if err := ok.Websocket.AuthConn.SendJSONMessage(authRequests); err != nil { + if err := ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), authRequests); err != nil { return err } } @@ -1369,7 +1369,7 @@ func (ok *Okx) WsPlaceOrder(arg *PlaceOrderRequestParam) (*OrderData, error) { Arguments: []PlaceOrderRequestParam{*arg}, Operation: okxOpOrder, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1428,7 +1428,7 @@ func (ok *Okx) WsPlaceMultipleOrder(args []PlaceOrderRequestParam) ([]OrderData, Arguments: args, Operation: okxOpBatchOrders, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1498,7 +1498,7 @@ func (ok *Okx) WsCancelOrder(arg CancelOrderRequestParam) (*OrderData, error) { Arguments: []CancelOrderRequestParam{arg}, Operation: okxOpCancelOrder, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1558,7 +1558,7 @@ func (ok *Okx) WsCancelMultipleOrder(args []CancelOrderRequestParam) ([]OrderDat Arguments: args, Operation: okxOpBatchCancelOrders, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1634,7 +1634,7 @@ func (ok *Okx) WsAmendOrder(arg *AmendOrderRequestParams) (*OrderData, error) { Operation: okxOpAmendOrder, Arguments: []AmendOrderRequestParams{*arg}, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1696,7 +1696,7 @@ func (ok *Okx) WsAmendMultipleOrders(args []AmendOrderRequestParams) ([]OrderDat Operation: okxOpBatchAmendOrders, Arguments: args, } - err = ok.Websocket.AuthConn.SendJSONMessage(input) + err = ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) if err != nil { return nil, err } @@ -1849,7 +1849,7 @@ func (ok *Okx) wsChannelSubscription(operation, channel string, assetType asset. } ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() - return ok.Websocket.Conn.SendJSONMessage(input) + return ok.Websocket.Conn.SendJSONMessage(context.TODO(), input) } // Private Channel Websocket methods @@ -1915,7 +1915,7 @@ func (ok *Okx) wsAuthChannelSubscription(operation, channel string, assetType as } ok.WsRequestSemaphore <- 1 defer func() { <-ok.WsRequestSemaphore }() - return ok.Websocket.AuthConn.SendJSONMessage(input) + return ok.Websocket.AuthConn.SendJSONMessage(context.TODO(), input) } // WsAccountSubscription retrieve account information. Data will be pushed when triggered by diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index 6eb8b708..13c3d87e 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -227,7 +227,7 @@ func (ok *Okx) Setup(exch *config.Exchange) error { URL: okxAPIWebsocketPublicURL, ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: okxWebsocketResponseMaxLimit, - RateLimit: 500, + RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1), }); err != nil { return err } @@ -237,7 +237,7 @@ func (ok *Okx) Setup(exch *config.Exchange) error { ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: okxWebsocketResponseMaxLimit, Authenticated: true, - RateLimit: 500, + RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1), }) } diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index a5407915..65ca4330 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -604,7 +604,7 @@ func (p *Poloniex) manageSubs(subs subscription.List, op wsOp) error { } req.Channel = s.Pairs[0].String() } - err = p.Websocket.Conn.SendJSONMessage(req) + err = p.Websocket.Conn.SendJSONMessage(context.TODO(), req) } if err == nil { if op == wsSubscribeOp { @@ -635,7 +635,7 @@ func (p *Poloniex) wsSendAuthorisedCommand(secret, key string, op wsOp) error { Key: key, Payload: nonce, } - return p.Websocket.Conn.SendJSONMessage(request) + return p.Websocket.Conn.SendJSONMessage(context.TODO(), request) } func (p *Poloniex) processAccountMarginPosition(notification []interface{}) error { diff --git a/exchanges/request/limit.go b/exchanges/request/limit.go index cc0f193f..bdb09649 100644 --- a/exchanges/request/limit.go +++ b/exchanges/request/limit.go @@ -80,6 +80,12 @@ func NewRateLimitWithWeight(interval time.Duration, actions int, weight Weight) return GetRateLimiterWithWeight(NewRateLimit(interval, actions), weight) } +// NewWeightedRateLimitByDuration creates a new RateLimit based of time +// interval. This equates to 1 action per interval. The weight is set to 1. +func NewWeightedRateLimitByDuration(interval time.Duration) *RateLimiterWithWeight { + return NewRateLimitWithWeight(interval, 1, 1) +} + // GetRateLimiterWithWeight couples a rate limiter with a weight count into an // accepted defined rate limiter with weight struct func GetRateLimiterWithWeight(l *rate.Limiter, weight Weight) *RateLimiterWithWeight { @@ -107,12 +113,24 @@ func (r *Requester) InitiateRateLimit(ctx context.Context, e EndpointLimit) erro rateLimiter := r.limiter[e] + err := RateLimit(ctx, rateLimiter) + if err != nil { + return fmt.Errorf("cannot rate limit request %w for endpoint %d", err, e) + } + + return nil +} + +// RateLimit is a function that will rate limit a request based on the rate +// limiter provided. It will return an error if the context is cancelled or +// deadline exceeded. +func RateLimit(ctx context.Context, rateLimiter *RateLimiterWithWeight) error { if rateLimiter == nil { - return fmt.Errorf("cannot rate limit request %w for endpoint %d", errSpecificRateLimiterIsNil, e) + return errSpecificRateLimiterIsNil } if rateLimiter.Weight <= 0 { - return fmt.Errorf("cannot rate limit request %w for endpoint %d", errInvalidWeightCount, e) + return errInvalidWeightCount } var finalDelay time.Duration diff --git a/exchanges/request/request_test.go b/exchanges/request/request_test.go index 93532d0c..b52c7c4f 100644 --- a/exchanges/request/request_test.go +++ b/exchanges/request/request_test.go @@ -31,8 +31,8 @@ var serverLimit *RateLimiterWithWeight func TestMain(m *testing.M) { serverLimitInterval := time.Millisecond * 500 - serverLimit = NewRateLimitWithWeight(serverLimitInterval, 1, 1) - serverLimitRetry := NewRateLimitWithWeight(serverLimitInterval, 1, 1) + serverLimit = NewWeightedRateLimitByDuration(serverLimitInterval) + serverLimitRetry := NewWeightedRateLimitByDuration(serverLimitInterval) sm := http.NewServeMux() sm.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -201,7 +201,7 @@ func TestCheckRequest(t *testing.T) { } var globalshell = RateLimitDefinitions{ - Auth: NewRateLimitWithWeight(time.Millisecond*600, 1, 1), + Auth: NewWeightedRateLimitByDuration(time.Millisecond * 600), UnAuth: NewRateLimitWithWeight(time.Second*1, 100, 1)} func TestDoRequest(t *testing.T) { diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index 36bc3f29..b7f16a6a 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -9,13 +9,14 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" ) // Connection defines a streaming services connection type Connection interface { Dial(*websocket.Dialer, http.Header) error ReadMessage() Response - SendJSONMessage(any) error + SendJSONMessage(ctx context.Context, payload any) error SetupPingHandler(PingHandler) // GenerateMessageID generates a message ID for the individual connection. // If a bespoke function is set (by using SetupNewConnection) it will use @@ -24,7 +25,7 @@ type Connection interface { GenerateMessageID(highPrecision bool) int64 SendMessageReturnResponse(ctx context.Context, signature any, request any) ([]byte, error) SendMessageReturnResponses(ctx context.Context, signature any, request any, expected int) ([][]byte, error) - SendRawMessage(messageType int, message []byte) error + SendRawMessage(ctx context.Context, messageType int, message []byte) error SetURL(string) SetProxy(string) GetURL() string @@ -41,7 +42,7 @@ type Response struct { type ConnectionSetup struct { ResponseCheckTimeout time.Duration ResponseMaxLimit time.Duration - RateLimit int64 + RateLimit *request.RateLimiterWithWeight URL string Authenticated bool ConnectionLevelReporter Reporter diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index ad3c52d8..4fa512e0 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -207,7 +207,7 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error { if c.ResponseCheckTimeout == 0 && c.ResponseMaxLimit == 0 && - c.RateLimit == 0 && + c.RateLimit == nil && c.URL == "" && c.ConnectionLevelReporter == nil && c.BespokeGenerateMessageID == nil { diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index f0598622..a37ad789 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -17,6 +17,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/log" ) @@ -54,51 +55,46 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header } // SendJSONMessage sends a JSON encoded message over the connection -func (w *WebsocketConnection) SendJSONMessage(data interface{}) error { - if !w.IsConnected() { - return fmt.Errorf("%s websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) - } - - w.writeControl.Lock() - defer w.writeControl.Unlock() - - if w.Verbose { - if msg, err := json.Marshal(data); err == nil { // WriteJSON will error for us anyway - log.Debugf(log.WebsocketMgr, "%s websocket connection: sending message: %s\n", w.ExchangeName, msg) +func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, data interface{}) error { + return w.writeToConn(ctx, func() error { + if w.Verbose { + if msg, err := json.Marshal(data); err == nil { // WriteJSON will error for us anyway + log.Debugf(log.WebsocketMgr, "%s websocket connection: sending message: %s\n", w.ExchangeName, msg) + } } - } - - if w.RateLimit > 0 { - time.Sleep(time.Duration(w.RateLimit) * time.Millisecond) - if !w.IsConnected() { - return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) - } - } - return w.Connection.WriteJSON(data) + return w.Connection.WriteJSON(data) + }) } // SendRawMessage sends a message over the connection without JSON encoding it -func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error { +func (w *WebsocketConnection) SendRawMessage(ctx context.Context, messageType int, message []byte) error { + return w.writeToConn(ctx, func() error { + if w.Verbose { + log.Debugf(log.WebsocketMgr, "%v websocket connection: sending message [%s]\n", w.ExchangeName, message) + } + return w.Connection.WriteMessage(messageType, message) + }) +} + +func (w *WebsocketConnection) writeToConn(ctx context.Context, writeConn func() error) error { if !w.IsConnected() { return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) } - - w.writeControl.Lock() - defer w.writeControl.Unlock() - - if w.Verbose { - log.Debugf(log.WebsocketMgr, "%v websocket connection: sending message [%s]\n", w.ExchangeName, message) - } - if w.RateLimit > 0 { - time.Sleep(time.Duration(w.RateLimit) * time.Millisecond) - if !w.IsConnected() { - return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) + if w.RateLimit != nil { + err := request.RateLimit(ctx, w.RateLimit) + if err != nil { + return fmt.Errorf("%s websocket connection: rate limit error: %w", w.ExchangeName, err) } } + // This lock acts as a rolling gate to prevent WriteMessage panics. Acquire after rate limit check. + w.writeControl.Lock() + defer w.writeControl.Unlock() + // NOTE: Secondary check to ensure the connection is still active after + // semacquire and potential rate limit. if !w.IsConnected() { return fmt.Errorf("%v websocket connection: cannot send message to a disconnected websocket", w.ExchangeName) } - return w.Connection.WriteMessage(messageType, message) + return writeConn() } // SetupPingHandler will automatically send ping or pong messages based on @@ -129,7 +125,7 @@ func (w *WebsocketConnection) SetupPingHandler(handler PingHandler) { ticker.Stop() return case <-ticker.C: - err := w.SendRawMessage(handler.MessageType, handler.Message) + err := w.SendRawMessage(context.TODO(), handler.MessageType, handler.Message) if err != nil { log.Errorf(log.WebsocketMgr, "%v websocket connection: ping handler failed to send message [%s]", @@ -303,7 +299,7 @@ func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, si } start := time.Now() - err = w.SendRawMessage(websocket.TextMessage, outbound) + err = w.SendRawMessage(ctx, websocket.TextMessage, outbound) if err != nil { return nil, err } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 5496ebcf..f3010ce9 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -24,6 +24,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" ) @@ -629,7 +630,7 @@ func TestDial(t *testing.T) { ExchangeName: "test1", Verbose: true, URL: websocketTestURL, - RateLimit: 10, + RateLimit: request.NewWeightedRateLimitByDuration(10 * time.Millisecond), ResponseMaxLimit: 7000000000, }, }, @@ -677,7 +678,7 @@ func TestSendMessage(t *testing.T) { ExchangeName: "test1", Verbose: true, URL: websocketTestURL, - RateLimit: 10, + RateLimit: request.NewWeightedRateLimitByDuration(10 * time.Millisecond), ResponseMaxLimit: 7000000000, }, }, @@ -713,11 +714,11 @@ func TestSendMessage(t *testing.T) { } t.Fatal(err) } - err = testData.WC.SendJSONMessage(Ping) + err = testData.WC.SendJSONMessage(context.Background(), Ping) if err != nil { t.Error(err) } - err = testData.WC.SendRawMessage(websocket.TextMessage, []byte(Ping)) + err = testData.WC.SendRawMessage(context.Background(), websocket.TextMessage, []byte(Ping)) if err != nil { t.Error(err) } diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 07594991..fd872ab7 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -9,6 +9,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/exchanges/fill" "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" + "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -132,7 +133,7 @@ type WebsocketConnection struct { // writes methods writeControl sync.Mutex - RateLimit int64 + RateLimit *request.RateLimiterWithWeight ExchangeName string URL string ProxyURL string