diff --git a/cmd/documentation/exchange_templates/exchange_websocket_readme.tmpl b/cmd/documentation/exchange_templates/exchange_websocket_readme.tmpl index 35131949..c06929e2 100644 --- a/cmd/documentation/exchange_templates/exchange_websocket_readme.tmpl +++ b/cmd/documentation/exchange_templates/exchange_websocket_readme.tmpl @@ -109,7 +109,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Unsubscriber: e.SpotUnsubscribe, GenerateSubscriptions: e.GenerateDefaultSubscriptionsSpot, Connector: e.WsConnectSpot, - BespokeGenerateMessageID: e.GenerateWebsocketMessageID, }); err != nil { return err } @@ -125,7 +124,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Unsubscriber: e.FuturesUnsubscribe, GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) }, Connector: e.WsFuturesConnect, - BespokeGenerateMessageID: e.GenerateWebsocketMessageID, }); err != nil { return err } diff --git a/docs/ADD_NEW_EXCHANGE.md b/docs/ADD_NEW_EXCHANGE.md index b772228d..ac630590 100644 --- a/docs/ADD_NEW_EXCHANGE.md +++ b/docs/ADD_NEW_EXCHANGE.md @@ -361,8 +361,9 @@ Ensure each endpoint is implemented and has an associated test to improve test c #### Message IDs -Use e.MessageID() to get a UUIDv7 if the exchange supports unique string IDs. Otherwise override MessageID with a suitable alternative. -For example: Consider common.Counter for simple integer IDs if uniqueness isn't critical. +* e.MessageID() to get a UUIDv7 if the exchange supports unique string IDs +* e.MessageSequence() to get a simple integer ID if uniqueness is not critical +* Otherwise override MessageID with a suitable alternative #### Authenticated functions diff --git a/exchange/websocket/README.md b/exchange/websocket/README.md index 67855de7..aeec5cfc 100644 --- a/exchange/websocket/README.md +++ b/exchange/websocket/README.md @@ -127,7 +127,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Unsubscriber: e.SpotUnsubscribe, GenerateSubscriptions: e.GenerateDefaultSubscriptionsSpot, Connector: e.WsConnectSpot, - BespokeGenerateMessageID: e.GenerateWebsocketMessageID, }); err != nil { return err } @@ -143,7 +142,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Unsubscriber: e.FuturesUnsubscribe, GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) }, Connector: e.WsFuturesConnect, - BespokeGenerateMessageID: e.GenerateWebsocketMessageID, }); err != nil { return err } diff --git a/exchange/websocket/connection.go b/exchange/websocket/connection.go index 82473f65..c5b9376b 100644 --- a/exchange/websocket/connection.go +++ b/exchange/websocket/connection.go @@ -5,11 +5,9 @@ import ( "compress/flate" "compress/gzip" "context" - "crypto/rand" "errors" "fmt" "io" - "math/big" "net" "net/http" "net/url" @@ -38,10 +36,6 @@ type Connection interface { Dial(context.Context, *gws.Dialer, http.Header) error ReadMessage() Response SetupPingHandler(request.EndpointLimit, PingHandler) - // GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set - // (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function - // defined in websocket_connection.go. - GenerateMessageID(highPrecision bool) int64 // SendMessageReturnResponse will send a WS message to the connection and wait for response SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error) // SendMessageReturnResponses will send a WS message to the connection and wait for N responses @@ -94,10 +88,6 @@ type ConnectionSetup struct { // received from the exchange's websocket server. This function should // handle the incoming message and pass it to the appropriate data handler. Handler func(ctx context.Context, conn Connection, incoming []byte) error - // RequestIDGenerator is a function that returns a unique message ID. - // This is useful for when an exchange connection requires a unique or - // structured message ID for each message sent. - RequestIDGenerator func() int64 // Authenticate will be called to authenticate the connection Authenticate func(ctx context.Context, conn Connection) error // MessageFilter defines the criteria used to match messages to a specific connection. @@ -135,7 +125,6 @@ type connection struct { ResponseMaxLimit time.Duration Traffic chan struct{} readMessageErrors chan error - requestIDGenerator func() int64 } // Dial sets proxy urls and then connects to the websocket @@ -337,33 +326,6 @@ func (c *connection) parseBinaryResponse(resp []byte) ([]byte, error) { return standardMessage, reader.Close() } -// GenerateMessageID generates a message ID for the individual connection. -// If a bespoke function is set (by using SetupNewConnection) it will use that, -// otherwise it will use the defaultGenerateMessageID function. -func (c *connection) GenerateMessageID(highPrec bool) int64 { - if c.requestIDGenerator != nil { - return c.requestIDGenerator() - } - return c.defaultGenerateMessageID(highPrec) -} - -// defaultGenerateMessageID generates the default message ID -func (c *connection) defaultGenerateMessageID(highPrec bool) int64 { - var minValue int64 = 1e8 - var maxValue int64 = 2e8 - if highPrec { - maxValue = 2e12 - minValue = 1e12 - } - // utilization of hard coded positive numbers and default crypto/rand - // io.reader will panic on error instead of returning - randomNumber, err := rand.Int(rand.Reader, big.NewInt(maxValue-minValue+1)) - if err != nil { - panic(err) - } - return randomNumber.Int64() + minValue -} - // Shutdown shuts down and closes specific connection func (c *connection) Shutdown() error { if err := common.NilGuard(c, c.Connection); err != nil { diff --git a/exchange/websocket/manager.go b/exchange/websocket/manager.go index f0c6c0fc..853c8665 100644 --- a/exchange/websocket/manager.go +++ b/exchange/websocket/manager.go @@ -305,7 +305,7 @@ func (m *Manager) SetupNewConnection(c *ConnectionSetup) error { return err } - if c.ResponseCheckTimeout == 0 && c.ResponseMaxLimit == 0 && c.RateLimit == nil && c.URL == "" && c.ConnectionLevelReporter == nil && c.RequestIDGenerator == nil { + if c.ResponseCheckTimeout == 0 && c.ResponseMaxLimit == 0 && c.RateLimit == nil && c.URL == "" && c.ConnectionLevelReporter == nil { return fmt.Errorf("%w: %w", errConnSetup, errExchangeConfigEmpty) } @@ -401,7 +401,6 @@ func (m *Manager) getConnectionFromSetup(c *ConnectionSetup) *connection { Match: match, RateLimit: c.RateLimit, Reporter: c.ConnectionLevelReporter, - requestIDGenerator: c.RequestIDGenerator, RateLimitDefinitions: m.rateLimitDefinitions, } } diff --git a/exchange/websocket/manager_test.go b/exchange/websocket/manager_test.go index af24cee6..15217caa 100644 --- a/exchange/websocket/manager_test.go +++ b/exchange/websocket/manager_test.go @@ -564,7 +564,7 @@ func TestSendMessageReturnResponse(t *testing.T) { Subscription: testRequestData{ Name: "ticker", }, - RequestID: wc.GenerateMessageID(false), + RequestID: 12345, } _, err = wc.SendMessageReturnResponse(t.Context(), request.Unset, req.RequestID, req) @@ -758,37 +758,6 @@ func TestCanUseAuthenticatedWebsocketForWrapper(t *testing.T) { assert.True(t, ws.CanUseAuthenticatedWebsocketForWrapper(), "CanUseAuthenticatedWebsocketForWrapper should return true") } -func TestGenerateMessageID(t *testing.T) { - t.Parallel() - wc := connection{} - const spins = 1000 - ids := make([]int64, spins) - for i := range spins { - id := wc.GenerateMessageID(true) - assert.NotContains(t, ids, id, "GenerateMessageID should not generate the same ID twice") - ids[i] = id - } - - wc.requestIDGenerator = func() int64 { return 42 } - assert.EqualValues(t, 42, wc.GenerateMessageID(true), "GenerateMessageID should use bespokeGenerateMessageID") -} - -// 7002502 166.7 ns/op 48 B/op 3 allocs/op -func BenchmarkGenerateMessageID_High(b *testing.B) { - wc := connection{} - for b.Loop() { - _ = wc.GenerateMessageID(true) - } -} - -// 6536250 186.1 ns/op 48 B/op 3 allocs/op -func BenchmarkGenerateMessageID_Low(b *testing.B) { - wc := connection{} - for b.Loop() { - _ = wc.GenerateMessageID(false) - } -} - func TestCheckWebsocketURL(t *testing.T) { err := checkWebsocketURL("") assert.ErrorIs(t, err, errInvalidWebsocketURL, "checkWebsocketURL should error correctly on empty string") @@ -1135,7 +1104,7 @@ func TestLatency(t *testing.T) { Event: "subscribe", Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, Subscription: testRequestData{Name: "ticker"}, - RequestID: wc.GenerateMessageID(false), + RequestID: 12346, } _, err = wc.SendMessageReturnResponse(t.Context(), request.Unset, req.RequestID, req) diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 64a06277..ba76d96d 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -1998,7 +1998,7 @@ func TestSubscribe(t *testing.T) { var req WsPayload require.NoError(tb, json.Unmarshal(msg, &req), "Unmarshal must not error") require.ElementsMatch(tb, req.Params, exp, "Params must have correct channels") - return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":null,"id":%d}`, req.ID)) + return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":null,"id":"%s"}`, req.ID)) } e = testexch.MockWsInstance[Exchange](t, mockws.CurryWsMockUpgrader(t, mock)) } else { @@ -2020,7 +2020,7 @@ func TestSubscribeBadResp(t *testing.T) { var req WsPayload err := json.Unmarshal(msg, &req) require.NoError(tb, err, "Unmarshal must not error") - return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":{"error":"carrots"},"id":%d}`, req.ID)) + return w.WriteMessage(gws.TextMessage, fmt.Appendf(nil, `{"result":{"error":"carrots"},"id":"%s"}`, req.ID)) } b := testexch.MockWsInstance[Exchange](t, mockws.CurryWsMockUpgrader(t, mock)) err := b.Subscribe(channels) diff --git a/exchanges/binance/binance_types.go b/exchanges/binance/binance_types.go index 8e3ef17b..c6a43e01 100644 --- a/exchanges/binance/binance_types.go +++ b/exchanges/binance/binance_types.go @@ -837,7 +837,7 @@ type WsListStatusData struct { type WsPayload struct { Method string `json:"method"` Params []string `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // CrossMarginInterestData stores cross margin data for borrowing diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 2a47a9ba..4d98d888 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -166,7 +166,7 @@ func (e *Exchange) wsReadData() { } func (e *Exchange) wsHandleData(respRaw []byte) error { - if id, err := jsonparser.GetInt(respRaw, "id"); err == nil { + if id, err := jsonparser.GetString(respRaw, "id"); err == nil { if e.Websocket.Match.IncomingWithData(id, respRaw) { return nil } @@ -570,7 +570,7 @@ func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription. } req := WsPayload{ - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), Method: op, Params: subs.QualifiedChannels(), } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index c68de06c..37d28196 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1716,7 +1716,7 @@ func (e *Exchange) subscribeToChan(ctx context.Context, subs subscription.List) // subId is a single round-trip identifier that provides linking sub requests to chanIDs // Although docs only mention subId for wsBookChannel, it works for all chans - subID := strconv.FormatInt(e.Websocket.Conn.GenerateMessageID(false), 10) + subID := e.MessageID() req["subId"] = subID // Add a temporary Key so we can find this Sub when we get the resp without delay or context switch @@ -1829,7 +1829,7 @@ func (e *Exchange) WsSendAuth(ctx context.Context) error { // WsNewOrder authenticated new order request func (e *Exchange) WsNewOrder(ctx context.Context, data *WsNewOrderRequest) (string, error) { - data.CustomID = e.Websocket.AuthConn.GenerateMessageID(false) + data.CustomID = e.MessageSequence() req := makeRequestInterface(wsOrderNew, data) resp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, data.CustomID, req) if err != nil { diff --git a/exchanges/bybit/bybit.go b/exchanges/bybit/bybit.go index 475a95cd..b1f7c1c7 100644 --- a/exchanges/bybit/bybit.go +++ b/exchanges/bybit/bybit.go @@ -29,8 +29,7 @@ import ( type Exchange struct { exchange.Base - messageIDSeq common.Counter - account accountTypeHolder + account accountTypeHolder } const ( diff --git a/exchanges/bybit/bybit_test.go b/exchanges/bybit/bybit_test.go index 7eb2dab4..f4111258 100644 --- a/exchanges/bybit/bybit_test.go +++ b/exchanges/bybit/bybit_test.go @@ -2936,7 +2936,6 @@ type FixtureConnection struct { websocket.Connection } -func (d *FixtureConnection) GenerateMessageID(bool) int64 { return 1337 } func (d *FixtureConnection) SetupPingHandler(request.EndpointLimit, websocket.PingHandler) {} func (d *FixtureConnection) Dial(context.Context, *gws.Dialer, http.Header) error { return d.dialError } diff --git a/exchanges/bybit/bybit_websocket.go b/exchanges/bybit/bybit_websocket.go index ba48085b..a7bdfde2 100644 --- a/exchanges/bybit/bybit_websocket.go +++ b/exchanges/bybit/bybit_websocket.go @@ -102,7 +102,7 @@ func (e *Exchange) WsConnect(ctx context.Context, conn websocket.Connection) err // WebsocketAuthenticatePrivateConnection sends an authentication message to the private websocket for inbound account // data func (e *Exchange) WebsocketAuthenticatePrivateConnection(ctx context.Context, conn websocket.Connection) error { - req, err := e.GetAuthenticationPayload(ctx, strconv.FormatInt(conn.GenerateMessageID(false), 10)) + req, err := e.GetAuthenticationPayload(ctx, e.MessageID()) if err != nil { return err } @@ -170,7 +170,7 @@ func (e *Exchange) GetAuthenticationPayload(ctx context.Context, requestID strin }, nil } -func (e *Exchange) handleSubscriptions(conn websocket.Connection, operation string, subs subscription.List) (args []SubscriptionArgument, err error) { +func (e *Exchange) handleSubscriptions(_ websocket.Connection, operation string, subs subscription.List) (args []SubscriptionArgument, err error) { subs, err = subs.ExpandTemplates(e) if err != nil { return @@ -181,7 +181,7 @@ func (e *Exchange) handleSubscriptions(conn websocket.Connection, operation stri args = append(args, SubscriptionArgument{ auth: b[0].Authenticated, Operation: operation, - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), Arguments: b.QualifiedChannels(), associatedSubs: b, }) @@ -719,7 +719,7 @@ func hasPotentialDelimiter(a asset.Item) bool { // TODO: Remove this function when template expansion is across all assets func (e *Exchange) submitDirectSubscription(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { - payloads, err := e.directSubscriptionPayload(conn, a, operation, channelsToSubscribe) + payloads, err := e.directSubscriptionPayload(a, operation, channelsToSubscribe) if err != nil { return err } @@ -757,17 +757,17 @@ func (e *Exchange) submitDirectSubscription(ctx context.Context, conn websocket. } // TODO: Remove this function when template expansion is across all assets -func (e *Exchange) directSubscriptionPayload(conn websocket.Connection, assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) { +func (e *Exchange) directSubscriptionPayload(assetType asset.Item, operation string, channelsToSubscribe subscription.List) ([]SubscriptionArgument, error) { var args []SubscriptionArgument arg := SubscriptionArgument{ Operation: operation, - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), Arguments: []string{}, } authArg := SubscriptionArgument{ auth: true, Operation: operation, - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), Arguments: []string{}, } @@ -812,7 +812,7 @@ func (e *Exchange) directSubscriptionPayload(conn websocket.Connection, assetTyp args = append(args, arg) arg = SubscriptionArgument{ Operation: operation, - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), Arguments: []string{}, } } diff --git a/exchanges/bybit/bybit_websocket_requests.go b/exchanges/bybit/bybit_websocket_requests.go index 0e1478b7..be19d65e 100644 --- a/exchanges/bybit/bybit_websocket_requests.go +++ b/exchanges/bybit/bybit_websocket_requests.go @@ -75,9 +75,6 @@ func (e *Exchange) sendWebsocketTradeRequest(ctx context.Context, op, orderLinkI return nil, err } - tn := time.Now() - requestID := strconv.FormatInt(outbound.GenerateMessageID(false), 10) - // Set up a listener to wait for the response to come back from the inbound connection. The request is sent through // the outbound trade connection, the response can come back through the inbound private connection before the // outbound connection sends its acknowledgement. @@ -86,9 +83,10 @@ func (e *Exchange) sendWebsocketTradeRequest(ctx context.Context, op, orderLinkI return nil, err } + requestID := e.MessageID() outResp, err := outbound.SendMessageReturnResponse(ctx, limit, requestID, WebsocketGeneralPayload{ RequestID: requestID, - Header: map[string]string{"X-BAPI-TIMESTAMP": strconv.FormatInt(tn.UnixMilli(), 10)}, + Header: map[string]string{"X-BAPI-TIMESTAMP": strconv.FormatInt(time.Now().UnixMilli(), 10)}, Operation: op, Arguments: []any{payload}, }) diff --git a/exchanges/bybit/bybit_wrapper.go b/exchanges/bybit/bybit_wrapper.go index d6364a7a..cdf94c82 100644 --- a/exchanges/bybit/bybit_wrapper.go +++ b/exchanges/bybit/bybit_wrapper.go @@ -255,7 +255,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleData(conn, asset.Spot, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }); err != nil { return err } @@ -277,7 +276,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleData(conn, asset.Options, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }); err != nil { return err } @@ -305,8 +303,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleData(conn, asset.USDTMarginedFutures, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, - MessageFilter: asset.USDTMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types. + MessageFilter: asset.USDTMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types. }); err != nil { return err } @@ -334,8 +331,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleData(conn, asset.USDCMarginedFutures, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, - MessageFilter: asset.USDCMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types. + MessageFilter: asset.USDCMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types. }); err != nil { return err } @@ -357,7 +353,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleData(conn, asset.CoinMarginedFutures, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }); err != nil { return err } @@ -376,7 +371,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Handler: func(_ context.Context, conn websocket.Connection, resp []byte) error { return e.wsHandleTradeData(conn, resp) }, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, Authenticate: e.WebsocketAuthenticateTradeConnection, MessageFilter: OutboundTradeConnection, SubscriptionsNotRequired: true, @@ -400,7 +394,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Subscriber: e.authSubscribe, Unsubscriber: e.authUnsubscribe, Handler: e.wsHandleAuthenticatedData, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, Authenticate: e.WebsocketAuthenticatePrivateConnection, MessageFilter: InboundPrivateConnection, }) diff --git a/exchanges/exchange.go b/exchanges/exchange.go index f30114e8..1067e428 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -1960,3 +1960,9 @@ func (*Base) WebsocketCancelOrder(context.Context, *order.Cancel) error { func (b *Base) MessageID() string { return uuid.Must(uuid.NewV7()).String() } + +// MessageSequence returns a sequential message sequence number from common.Counter +// It is not universally unique but should be unique and sequential within each *Base instance +func (b *Base) MessageSequence() int64 { + return b.messageSequence.IncrementAndGet() +} diff --git a/exchanges/exchange_types.go b/exchanges/exchange_types.go index b8ed92b7..5d4b092e 100644 --- a/exchanges/exchange_types.go +++ b/exchanges/exchange_types.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" @@ -257,6 +258,7 @@ type Base struct { AssetWebsocketSupport *currencystate.States + messageSequence common.Counter } // url lookup consts diff --git a/exchanges/gateio/gateio_test.go b/exchanges/gateio/gateio_test.go index afe4b62a..490b5830 100644 --- a/exchanges/gateio/gateio_test.go +++ b/exchanges/gateio/gateio_test.go @@ -2768,7 +2768,6 @@ func TestGetSettlementCurrency(t *testing.T) { type FixtureConnection struct{ websocket.Connection } -func (d *FixtureConnection) GenerateMessageID(bool) int64 { return 1337 } func (d *FixtureConnection) SendMessageReturnResponse(context.Context, request.EndpointLimit, any, any) ([]byte, error) { return []byte(`{"time":1726121320,"time_ms":1726121320745,"id":1,"conn_id":"f903779a148987ca","trace_id":"d8ee37cd14347e4ed298d44e69aedaa7","channel":"spot.tickers","event":"subscribe","payload":["BRETT_USDT"],"result":{"status":"success"},"requestId":"d8ee37cd14347e4ed298d44e69aedaa7"}`), nil } @@ -2778,12 +2777,12 @@ func TestHandleSubscriptions(t *testing.T) { subs := subscription.List{{Channel: subscription.OrderbookChannel}} - err := e.handleSubscription(t.Context(), &FixtureConnection{}, subscribeEvent, subs, func(context.Context, websocket.Connection, string, subscription.List) ([]WsInput, error) { + err := e.handleSubscription(t.Context(), &FixtureConnection{}, subscribeEvent, subs, func(context.Context, string, subscription.List) ([]WsInput, error) { return []WsInput{{}}, nil }) require.NoError(t, err) - err = e.handleSubscription(t.Context(), &FixtureConnection{}, unsubscribeEvent, subs, func(context.Context, websocket.Connection, string, subscription.List) ([]WsInput, error) { + err = e.handleSubscription(t.Context(), &FixtureConnection{}, unsubscribeEvent, subs, func(context.Context, string, subscription.List) ([]WsInput, error) { return []WsInput{{}}, nil }) require.NoError(t, err) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 3a449bf6..23173658 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -124,7 +124,7 @@ func (e *Exchange) websocketLogin(ctx context.Context, conn websocket.Connection signature := hex.EncodeToString(mac.Sum(nil)) payload := WebsocketPayload{ - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), APIKey: creds.Key, Signature: signature, Timestamp: strconv.FormatInt(tn, 10), @@ -640,7 +640,7 @@ func (e *Exchange) manageSubs(ctx context.Context, event string, conn websocket. for _, s := range subs { if err := func() error { - msg, err := e.manageSubReq(ctx, event, conn, s) + msg, err := e.manageSubReq(ctx, event, s) if err != nil { return err } @@ -667,9 +667,9 @@ func (e *Exchange) manageSubs(ctx context.Context, event string, conn websocket. } // manageSubReq constructs the subscription management message for a subscription -func (e *Exchange) manageSubReq(ctx context.Context, event string, conn websocket.Connection, s *subscription.Subscription) (*WsInput, error) { +func (e *Exchange) manageSubReq(ctx context.Context, event string, s *subscription.Subscription) (*WsInput, error) { req := &WsInput{ - ID: conn.GenerateMessageID(false), + ID: e.MessageSequence(), Event: event, Channel: channelName(s), Time: time.Now().Unix(), @@ -886,11 +886,11 @@ const subTplText = ` ` // GeneratePayload returns the payload for a websocket message -type GeneratePayload func(ctx context.Context, conn websocket.Connection, event string, channelsToSubscribe subscription.List) ([]WsInput, error) +type GeneratePayload func(ctx context.Context, event string, channelsToSubscribe subscription.List) ([]WsInput, error) // handleSubscription sends a websocket message to receive data from the channel func (e *Exchange) handleSubscription(ctx context.Context, conn websocket.Connection, event string, channelsToSubscribe subscription.List, generatePayload GeneratePayload) error { - payloads, err := generatePayload(ctx, conn, event, channelsToSubscribe) + payloads, err := generatePayload(ctx, event, channelsToSubscribe) if err != nil { return err } @@ -941,7 +941,7 @@ func (e *Exchange) SendWebsocketRequest(ctx context.Context, epl request.Endpoin Channel: channel, Event: "api", Payload: WebsocketPayload{ - RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10), + RequestID: e.MessageID(), RequestParam: paramPayload, Timestamp: strconv.FormatInt(tn, 10), }, diff --git a/exchanges/gateio/gateio_websocket_delivery_futures.go b/exchanges/gateio/gateio_websocket_delivery_futures.go index ea65bb04..e26bcc53 100644 --- a/exchanges/gateio/gateio_websocket_delivery_futures.go +++ b/exchanges/gateio/gateio_websocket_delivery_futures.go @@ -111,7 +111,7 @@ func (e *Exchange) DeliveryFuturesUnsubscribe(ctx context.Context, conn websocke return e.handleSubscription(ctx, conn, unsubscribeEvent, channelsToUnsubscribe, e.generateDeliveryFuturesPayload) } -func (e *Exchange) generateDeliveryFuturesPayload(ctx context.Context, conn websocket.Connection, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { +func (e *Exchange) generateDeliveryFuturesPayload(ctx context.Context, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { if len(channelsToSubscribe) == 0 { return nil, errors.New("cannot generate payload, no channels supplied") } @@ -194,7 +194,7 @@ func (e *Exchange) generateDeliveryFuturesPayload(ctx context.Context, conn webs } } outbound = append(outbound, WsInput{ - ID: conn.GenerateMessageID(false), + ID: e.MessageSequence(), Event: event, Channel: channelsToSubscribe[i].Channel, Payload: params, diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 1dd6eceb..61bd28a7 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -194,7 +194,7 @@ func (e *Exchange) WsHandleFuturesData(ctx context.Context, conn websocket.Conne } } -func (e *Exchange) generateFuturesPayload(ctx context.Context, conn websocket.Connection, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { +func (e *Exchange) generateFuturesPayload(ctx context.Context, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { if len(channelsToSubscribe) == 0 { return nil, errors.New("cannot generate payload, no channels supplied") } @@ -280,7 +280,7 @@ func (e *Exchange) generateFuturesPayload(ctx context.Context, conn websocket.Co } } outbound = append(outbound, WsInput{ - ID: conn.GenerateMessageID(false), + ID: e.MessageSequence(), Event: event, Channel: channelsToSubscribe[i].Channel, Payload: params, diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 0d4e0ed9..7c8c55e2 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -160,7 +160,7 @@ getEnabledPairs: return subscriptions, nil } -func (e *Exchange) generateOptionsPayload(ctx context.Context, conn websocket.Connection, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { +func (e *Exchange) generateOptionsPayload(ctx context.Context, event string, channelsToSubscribe subscription.List) ([]WsInput, error) { if len(channelsToSubscribe) == 0 { return nil, errors.New("cannot generate payload, no channels supplied") } @@ -262,7 +262,7 @@ func (e *Exchange) generateOptionsPayload(ctx context.Context, conn websocket.Co params...) } payloads[i] = WsInput{ - ID: conn.GenerateMessageID(false), + ID: e.MessageSequence(), Event: event, Channel: channelsToSubscribe[i].Channel, Payload: params, diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index d02d3493..8c126d03 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -219,7 +219,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { Connector: e.WsConnectSpot, Authenticate: e.authenticateSpot, MessageFilter: asset.Spot, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }) if err != nil { return err @@ -237,10 +236,9 @@ func (e *Exchange) Setup(exch *config.Exchange) error { GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(asset.USDTMarginedFutures) }, - Connector: e.WsFuturesConnect, - Authenticate: e.authenticateFutures, - MessageFilter: asset.USDTMarginedFutures, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, + Connector: e.WsFuturesConnect, + Authenticate: e.authenticateFutures, + MessageFilter: asset.USDTMarginedFutures, }) if err != nil { return err @@ -259,9 +257,8 @@ func (e *Exchange) Setup(exch *config.Exchange) error { GenerateSubscriptions: func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(asset.CoinMarginedFutures) }, - Connector: e.WsFuturesConnect, - MessageFilter: asset.CoinMarginedFutures, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, + Connector: e.WsFuturesConnect, + MessageFilter: asset.CoinMarginedFutures, }) if err != nil { return err @@ -281,7 +278,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { GenerateSubscriptions: e.GenerateDeliveryFuturesDefaultSubscriptions, Connector: e.WsDeliveryFuturesConnect, MessageFilter: asset.DeliveryFutures, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }) if err != nil { return err @@ -298,7 +294,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { GenerateSubscriptions: e.GenerateOptionsDefaultSubscriptions, Connector: e.WsOptionsConnect, MessageFilter: asset.Options, - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }) } diff --git a/exchanges/hitbtc/hitbtc_test.go b/exchanges/hitbtc/hitbtc_test.go index 7f96c0b4..5ae57be1 100644 --- a/exchanges/hitbtc/hitbtc_test.go +++ b/exchanges/hitbtc/hitbtc_test.go @@ -568,7 +568,7 @@ func TestWsGetCurrenciesJSON(t *testing.T) { "delisted": false, "payoutFee": "0.001" }, - "id": 123 + "id": "c4ce77f5-1c50-435a-b623-4961191ca129" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -589,7 +589,7 @@ func TestWsGetSymbolsJSON(t *testing.T) { "provideLiquidityRate": "-0.0001", "feeCurrency": "BTC" }, - "id": 123 + "id": "1c847290-b366-412b-b8f5-dc630ed5b147" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -744,7 +744,7 @@ func TestWsSubmitOrderJSON(t *testing.T) { "updatedAt": "2017-10-20T12:29:43.166Z", "reportType": "new" }, - "id": 123 + "id": "99f55c70-1166-49a7-87e9-3b54a00ad893" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -771,7 +771,7 @@ func TestWsCancelOrderJSON(t *testing.T) { "updatedAt": "2017-10-20T12:31:26.174Z", "reportType": "canceled" }, - "id": 123 + "id": "2ce46937-2770-4453-ac99-ee87939bf5bb" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -799,7 +799,7 @@ func TestWsCancelReplaceJSON(t *testing.T) { "reportType": "replaced", "originalRequestClientOrderId": "9cbe79cb6f864b71a811402a48d4b5b1" }, - "id": 123 + "id": "91e925d3-3b95-4e29-8ae7-938fd5006709" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -827,7 +827,7 @@ func TestWsGetTradesRequestResponse(t *testing.T) { "reserved": "0.00200000" } ], - "id": 123 + "id": "4b1f1391-215e-4d12-972c-5cea9d50edf4" }`) err := e.wsHandleData(pressXToJSON) if err != nil { @@ -857,7 +857,7 @@ func TestWsGetActiveOrdersRequestJSON(t *testing.T) { "originalRequestClientOrderId": "9cbe79cb6f864b71a811402a48d4b5b1" } ], - "id": 123 + "id": "9e67b440-2eec-445a-be3a-e81f962c8391" }`) err := e.wsHandleData(pressXToJSON) if err != nil { diff --git a/exchanges/hitbtc/hitbtc_types.go b/exchanges/hitbtc/hitbtc_types.go index 7d0f54bb..50bf6a82 100644 --- a/exchanges/hitbtc/hitbtc_types.go +++ b/exchanges/hitbtc/hitbtc_types.go @@ -283,7 +283,7 @@ type capture struct { Method string `json:"method,omitempty"` Result any `json:"result"` Error ResponseError `json:"error"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` } // ResponseError contains error codes from JSON responses @@ -297,7 +297,7 @@ type WsRequest struct { JSONRPCVersion string `json:"jsonrpc,omitempty"` Method string `json:"method"` Params *WsParams `json:"params,omitempty"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` } // WsParams are websocket params for a request @@ -359,7 +359,7 @@ type WsTrade struct { type WsLoginRequest struct { Method string `json:"method"` Params WsLoginData `json:"params"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` } // WsLoginData sets credentials for WsLoginRequest @@ -378,17 +378,17 @@ type wsActiveOrdersResponse struct { type wsReportResponse struct { OrderData wsOrderData `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } type wsOrderResponse struct { OrderData wsOrderData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` } type wsActiveOrderRequestResponse struct { OrderData []wsOrderData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` } // wsOrderData Active order data for WsActiveOrdersResponse @@ -446,12 +446,12 @@ type WsReportResponseData struct { type WsSubmitOrderRequest struct { Method string `json:"method"` Params WsSubmitOrderRequestData `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // WsSubmitOrderRequestData WS request data type WsSubmitOrderRequestData struct { - ClientOrderID int64 `json:"clientOrderId,string,omitempty"` + ClientOrderID string `json:"clientOrderId,omitempty"` Symbol string `json:"symbol"` Side string `json:"side"` Price float64 `json:"price,string"` @@ -461,7 +461,7 @@ type WsSubmitOrderRequestData struct { // WsSubmitOrderSuccessResponse WS response type WsSubmitOrderSuccessResponse struct { Result WsSubmitOrderSuccessResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -499,7 +499,7 @@ type WsSubmitOrderErrorResponseData struct { // WsCancelOrderResponse WS response type WsCancelOrderResponse struct { Result WsCancelOrderResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -524,7 +524,7 @@ type WsCancelOrderResponseData struct { // WsReplaceOrderResponse WS response type WsReplaceOrderResponse struct { Result WsReplaceOrderResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -550,7 +550,7 @@ type WsReplaceOrderResponseData struct { // WsGetActiveOrdersResponse WS response type WsGetActiveOrdersResponse struct { Result []WsGetActiveOrdersResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -576,7 +576,7 @@ type WsGetActiveOrdersResponseData struct { // WsGetTradingBalanceResponse WS response type WsGetTradingBalanceResponse struct { Result []WsGetTradingBalanceResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -591,7 +591,7 @@ type WsGetTradingBalanceResponseData struct { type WsCancelOrderRequest struct { Method string `json:"method"` Params WsCancelOrderRequestData `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // WsCancelOrderRequestData WS request data @@ -603,7 +603,7 @@ type WsCancelOrderRequestData struct { type WsReplaceOrderRequest struct { Method string `json:"method"` Params WsReplaceOrderRequestData `json:"params"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` } // WsReplaceOrderRequestData WS request data @@ -618,7 +618,7 @@ type WsReplaceOrderRequestData struct { type WsGetCurrenciesRequest struct { Method string `json:"method"` Params WsGetCurrenciesRequestParameters `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // WsGetCurrenciesRequestParameters parameters @@ -629,7 +629,7 @@ type WsGetCurrenciesRequestParameters struct { // WsGetCurrenciesResponse currency response type WsGetCurrenciesResponse struct { Result WsGetCurrenciesResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -652,7 +652,7 @@ type WsGetCurrenciesResponseData struct { type WsGetSymbolsRequest struct { Method string `json:"method"` Params WsGetSymbolsRequestParameters `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // WsGetSymbolsRequestParameters request parameters @@ -663,7 +663,7 @@ type WsGetSymbolsRequestParameters struct { // WsGetSymbolsResponse symbol response type WsGetSymbolsResponse struct { Result WsGetSymbolsResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } @@ -683,7 +683,7 @@ type WsGetSymbolsResponseData struct { type WsGetTradesRequest struct { Method string `json:"method"` Params WsGetTradesRequestParameters `json:"params"` - ID int64 `json:"id"` + ID string `json:"id"` } // WsGetTradesRequestParameters trade request params @@ -698,7 +698,7 @@ type WsGetTradesRequestParameters struct { type WsGetTradesResponse struct { Jsonrpc string `json:"jsonrpc"` Result WsGetTradesResponseData `json:"result"` - ID int64 `json:"id"` + ID string `json:"id"` Error ResponseError `json:"error"` } diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 1e04f844..949a7091 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -102,7 +102,7 @@ func (e *Exchange) wsGetTableName(respRaw []byte) (string, error) { if init.Error.Code == errAuthFailed { e.Websocket.SetCanUseAuthenticatedEndpoints(false) } - if init.ID > 0 { + if init.ID != "" { if e.Websocket.Match.IncomingWithData(init.ID, respRaw) { return "", nil } @@ -519,7 +519,7 @@ func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription. for _, s := range subs { r := WsRequest{ JSONRPCVersion: rpcVersion, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } if err := json.Unmarshal([]byte(s.QualifiedChannel), &r); err != nil { errs = common.AppendError(errs, err) @@ -565,7 +565,7 @@ func (e *Exchange) wsLogin(ctx context.Context) error { Nonce: n, Signature: hex.EncodeToString(hmac), }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } err = e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, req) @@ -583,7 +583,7 @@ func (e *Exchange) wsPlaceOrder(ctx context.Context, pair currency.Pair, side st return nil, fmt.Errorf("%v not authenticated, cannot place order", e.Name) } - id := e.Websocket.Conn.GenerateMessageID(false) + id := e.MessageID() fPair, err := e.FormatExchangeCurrency(pair, asset.Spot) if err != nil { return nil, err @@ -625,7 +625,7 @@ func (e *Exchange) wsCancelOrder(ctx context.Context, clientOrderID string) (*Ws Params: WsCancelOrderRequestData{ ClientOrderID: clientOrderID, }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -655,7 +655,7 @@ func (e *Exchange) wsReplaceOrder(ctx context.Context, clientOrderID string, qua Quantity: quantity, Price: price, }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -680,7 +680,7 @@ func (e *Exchange) wsGetActiveOrders(ctx context.Context) (*wsActiveOrdersRespon req := WsReplaceOrderRequest{ Method: "getOrders", Params: WsReplaceOrderRequestData{}, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -705,7 +705,7 @@ func (e *Exchange) wsGetTradingBalance(ctx context.Context) (*WsGetTradingBalanc req := WsReplaceOrderRequest{ Method: "getTradingBalance", Params: WsReplaceOrderRequestData{}, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -729,7 +729,7 @@ func (e *Exchange) wsGetCurrencies(ctx context.Context, currencyItem currency.Co Params: WsGetCurrenciesRequestParameters{ Currency: currencyItem, }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -758,7 +758,7 @@ func (e *Exchange) wsGetSymbols(ctx context.Context, c currency.Pair) (*WsGetSym Params: WsGetSymbolsRequestParameters{ Symbol: fPair.String(), }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { @@ -790,7 +790,7 @@ func (e *Exchange) wsGetTrades(ctx context.Context, c currency.Pair, limit int64 Sort: sort, By: by, }, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), } resp, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req) if err != nil { diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index 11405f2c..977599a0 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -443,7 +443,7 @@ func (e *Exchange) SubmitOrder(ctx context.Context, o *order.Submit) (*order.Sub if err != nil { return nil, err } - orderID = strconv.FormatInt(response.ID, 10) + orderID = response.ID if response.Result.CumQuantity == o.Amount { status = order.Filled } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 48dc0820..5f3f99d1 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -813,7 +813,7 @@ func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription. reqFmt := currency.PairFormat{Uppercase: true, Delimiter: "/"} r := &WebsocketSubRequest{ Event: op, - RequestID: e.Websocket.Conn.GenerateMessageID(false), + RequestID: e.MessageSequence(), Subscription: WebsocketSubscriptionData{ Name: s.QualifiedChannel, Depth: s.Levels, @@ -1040,7 +1040,7 @@ func (e *Exchange) wsAddOrder(ctx context.Context, req *WsAddOrderRequest) (stri if req == nil { return "", common.ErrNilPointer } - req.RequestID = e.Websocket.AuthConn.GenerateMessageID(false) + req.RequestID = e.MessageSequence() req.Event = krakenWsAddOrder req.Token = e.websocketAuthToken() jsonResp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, req.RequestID, req) @@ -1079,7 +1079,7 @@ func (e *Exchange) wsCancelOrders(ctx context.Context, orderIDs []string) error // wsCancelOrder cancels an open order func (e *Exchange) wsCancelOrder(ctx context.Context, orderID string) error { - id := e.Websocket.AuthConn.GenerateMessageID(false) + id := e.MessageSequence() req := WsCancelOrderRequest{ Event: krakenWsCancelOrder, Token: e.websocketAuthToken(), @@ -1110,14 +1110,13 @@ func (e *Exchange) wsCancelOrder(ctx context.Context, orderID string) error { // wsCancelAllOrders cancels all opened orders // Returns number (count param) of affected orders or 0 if no open orders found func (e *Exchange) wsCancelAllOrders(ctx context.Context) (*WsCancelOrderResponse, error) { - id := e.Websocket.AuthConn.GenerateMessageID(false) req := WsCancelOrderRequest{ Event: krakenWsCancelAll, Token: e.websocketAuthToken(), - RequestID: id, + RequestID: e.MessageSequence(), } - jsonResp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, id, req) + jsonResp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, req.RequestID, req) if err != nil { return &WsCancelOrderResponse{}, err } diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index f89c7819..f266e1c0 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "slices" - "strconv" "strings" "sync" "text/template" @@ -227,7 +226,7 @@ func (e *Exchange) wsHandleData(ctx context.Context, respData []byte) error { return nil } if resp.ID != "" { - return e.Websocket.Match.RequireMatchWithData("msgID:"+resp.ID, respData) + return e.Websocket.Match.RequireMatchWithData(resp.ID, respData) } topicInfo := strings.Split(resp.Topic, ":") switch topicInfo[0] { @@ -1023,15 +1022,14 @@ func (e *Exchange) Unsubscribe(subscriptions subscription.List) error { func (e *Exchange) manageSubscriptions(ctx context.Context, subs subscription.List, operation string) error { var errs error for _, s := range subs { - msgID := strconv.FormatInt(e.Websocket.Conn.GenerateMessageID(false), 10) req := WsSubscriptionInput{ - ID: msgID, + ID: e.MessageID(), Type: operation, Topic: s.QualifiedChannel, PrivateChannel: s.Authenticated, Response: true, } - if respRaw, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, "msgID:"+msgID, req); err != nil { + if respRaw, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req); err != nil { errs = common.AppendError(errs, err) } else { rType, err := jsonparser.GetUnsafeString(respRaw, "type") diff --git a/exchanges/okx/okx.go b/exchanges/okx/okx.go index bd3f8672..a05b2683 100644 --- a/exchanges/okx/okx.go +++ b/exchanges/okx/okx.go @@ -33,7 +33,6 @@ import ( type Exchange struct { exchange.Base - messageIDSeq common.Counter instrumentsInfoMapLock sync.Mutex instrumentsInfoMap map[string][]Instrument } diff --git a/exchanges/okx/okx_wrapper.go b/exchanges/okx/okx_wrapper.go index a0951579..45cc2b05 100644 --- a/exchanges/okx/okx_wrapper.go +++ b/exchanges/okx/okx_wrapper.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/gofrs/uuid" "github.com/shopspring/decimal" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" @@ -224,7 +225,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, ResponseMaxLimit: websocketResponseMaxLimit, RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1), - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }); err != nil { return err } @@ -235,7 +235,6 @@ func (e *Exchange) Setup(exch *config.Exchange) error { ResponseMaxLimit: websocketResponseMaxLimit, Authenticated: true, RateLimit: request.NewRateLimitWithWeight(time.Second, 2, 1), - RequestIDGenerator: e.messageIDSeq.IncrementAndGet, }) } @@ -3038,3 +3037,8 @@ func (e *Exchange) GetCurrencyTradeURL(ctx context.Context, a asset.Item, cp cur return "", fmt.Errorf("%w %q", asset.ErrNotSupported, a) } } + +// MessageID returns a universally unique ID using UUID V7, with hyphens removed to fit the maximum 32-character field for okx +func (e *Exchange) MessageID() string { + return strings.Replace(uuid.Must(uuid.NewV7()).String(), "-", "", 4) +} diff --git a/exchanges/okx/ws_requests.go b/exchanges/okx/ws_requests.go index b4084e3c..881e7883 100644 --- a/exchanges/okx/ws_requests.go +++ b/exchanges/okx/ws_requests.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "strconv" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/encoding/json" @@ -28,10 +27,8 @@ func (e *Exchange) WSPlaceOrder(ctx context.Context, arg *PlaceOrderRequestParam return nil, err } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - if err := e.SendAuthenticatedWebsocketRequest(ctx, placeOrderEPL, id, "order", []PlaceOrderRequestParam{*arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, placeOrderEPL, e.MessageID(), "order", []PlaceOrderRequestParam{*arg}, &resp); err != nil { return nil, err } return singleItem(resp) @@ -49,10 +46,8 @@ func (e *Exchange) WSPlaceMultipleOrders(ctx context.Context, args []PlaceOrderR } } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - return resp, e.SendAuthenticatedWebsocketRequest(ctx, placeMultipleOrdersEPL, id, "batch-orders", args, &resp) + return resp, e.SendAuthenticatedWebsocketRequest(ctx, placeMultipleOrdersEPL, e.MessageID(), "batch-orders", args, &resp) } // WSCancelOrder cancels an order @@ -67,10 +62,8 @@ func (e *Exchange) WSCancelOrder(ctx context.Context, arg *CancelOrderRequestPar return nil, order.ErrOrderIDNotSet } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelOrderEPL, id, "cancel-order", []CancelOrderRequestParam{*arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelOrderEPL, e.MessageID(), "cancel-order", []CancelOrderRequestParam{*arg}, &resp); err != nil { return nil, err } @@ -92,10 +85,8 @@ func (e *Exchange) WSCancelMultipleOrders(ctx context.Context, args []CancelOrde } } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - return resp, e.SendAuthenticatedWebsocketRequest(ctx, cancelMultipleOrdersEPL, id, "batch-cancel-orders", args, &resp) + return resp, e.SendAuthenticatedWebsocketRequest(ctx, cancelMultipleOrdersEPL, e.MessageID(), "batch-cancel-orders", args, &resp) } // WSAmendOrder amends an order @@ -113,10 +104,8 @@ func (e *Exchange) WSAmendOrder(ctx context.Context, arg *AmendOrderRequestParam return nil, errInvalidNewSizeOrPriceInformation } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - if err := e.SendAuthenticatedWebsocketRequest(ctx, amendOrderEPL, id, "amend-order", []AmendOrderRequestParams{*arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, amendOrderEPL, e.MessageID(), "amend-order", []AmendOrderRequestParams{*arg}, &resp); err != nil { return nil, err } return singleItem(resp) @@ -140,10 +129,8 @@ func (e *Exchange) WSAmendMultipleOrders(ctx context.Context, args []AmendOrderR } } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*OrderData - return resp, e.SendAuthenticatedWebsocketRequest(ctx, amendMultipleOrdersEPL, id, "batch-amend-orders", args, &resp) + return resp, e.SendAuthenticatedWebsocketRequest(ctx, amendMultipleOrdersEPL, e.MessageID(), "batch-amend-orders", args, &resp) } // WSMassCancelOrders cancels all MMP pending orders of an instrument family. Only applicable to Option in Portfolio Margin mode, and MMP privilege is required. @@ -161,12 +148,10 @@ func (e *Exchange) WSMassCancelOrders(ctx context.Context, args []CancelMassReqP } } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resps []*struct { Result bool `json:"result"` } - if err := e.SendAuthenticatedWebsocketRequest(ctx, amendOrderEPL, id, "mass-cancel", args, &resps); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, amendOrderEPL, e.MessageID(), "mass-cancel", args, &resps); err != nil { return err } @@ -188,10 +173,8 @@ func (e *Exchange) WSPlaceSpreadOrder(ctx context.Context, arg *SpreadOrderParam return nil, err } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*SpreadOrderResponse - if err := e.SendAuthenticatedWebsocketRequest(ctx, placeSpreadOrderEPL, id, "sprd-order", []SpreadOrderParam{*arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, placeSpreadOrderEPL, e.MessageID(), "sprd-order", []SpreadOrderParam{*arg}, &resp); err != nil { return nil, err } @@ -210,10 +193,8 @@ func (e *Exchange) WSAmendSpreadOrder(ctx context.Context, arg *AmendSpreadOrder return nil, errSizeOrPriceIsRequired } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*SpreadOrderResponse - if err := e.SendAuthenticatedWebsocketRequest(ctx, amendSpreadOrderEPL, id, "sprd-amend-order", []AmendSpreadOrderParam{*arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, amendSpreadOrderEPL, e.MessageID(), "sprd-amend-order", []AmendSpreadOrderParam{*arg}, &resp); err != nil { return nil, err } @@ -234,10 +215,8 @@ func (e *Exchange) WSCancelSpreadOrder(ctx context.Context, orderID, clientOrder arg["clOrdId"] = clientOrderID } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resp []*SpreadOrderResponse - if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelSpreadOrderEPL, id, "sprd-cancel-order", []map[string]string{arg}, &resp); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelSpreadOrderEPL, e.MessageID(), "sprd-cancel-order", []map[string]string{arg}, &resp); err != nil { return nil, err } @@ -251,10 +230,8 @@ func (e *Exchange) WSCancelAllSpreadOrders(ctx context.Context, spreadID string) arg["sprdId"] = spreadID } - id := strconv.FormatInt(e.Websocket.AuthConn.GenerateMessageID(false), 10) - var resps []*ResponseResult - if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelAllSpreadOrderEPL, id, "sprd-mass-cancel", []map[string]string{arg}, &resps); err != nil { + if err := e.SendAuthenticatedWebsocketRequest(ctx, cancelAllSpreadOrderEPL, e.MessageID(), "sprd-mass-cancel", []map[string]string{arg}, &resps); err != nil { return err } diff --git a/exchanges/okx/ws_requests_test.go b/exchanges/okx/ws_requests_test.go index 48893d63..5b388c93 100644 --- a/exchanges/okx/ws_requests_test.go +++ b/exchanges/okx/ws_requests_test.go @@ -4,6 +4,7 @@ import ( "errors" "testing" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" @@ -281,3 +282,20 @@ func TestSingleItem(t *testing.T) { require.NoError(t, err) require.NotNil(t, got) } + +func TestMessageID(t *testing.T) { + t.Parallel() + id := new(Exchange).MessageID() + require.Len(t, id, 32, "Must return the correct length of message id") + u, err := uuid.FromString(id) + require.NoError(t, err, "MessageID must return a valid UUID") + assert.Equal(t, byte(0x7), u.Version(), "MessageID should return a V7 uuid") +} + +// BenchmarkMessageID-8 4736883 259.9 ns/op 96 B/op 4 allocs/op +func BenchmarkMessageID(b *testing.B) { + e := new(Exchange) + for b.Loop() { + _ = e.MessageID() + } +}