diff --git a/docs/ADD_NEW_EXCHANGE.md b/docs/ADD_NEW_EXCHANGE.md index 31e3d2e3..580cc048 100644 --- a/docs/ADD_NEW_EXCHANGE.md +++ b/docs/ADD_NEW_EXCHANGE.md @@ -359,6 +359,11 @@ Alternatively you can use `request.WithVerbose(context.Background())` as the `co Ensure each endpoint is implemented and has an associated test to improve test coverage and increase confidence +#### Message IDs + +Use e.MessageID() to get a UUIDv7 if the exchange supports unique string IDs. Otherwise override MessageID with a suitable alternative. +For example: Consider common.Counter for simple integer IDs if uniqueness isn't critical. + #### Authenticated functions Authenticated request function is created based on the way the exchange documentation specifies. For example, see the [Binance Spot API - Endpoint Security Types](https://developers.binance.com/docs/binance-spot-api-docs/rest-api/endpoint-security-type). diff --git a/exchanges/deribit/deribit_types.go b/exchanges/deribit/deribit_types.go index d5ede232..15815d82 100644 --- a/exchanges/deribit/deribit_types.go +++ b/exchanges/deribit/deribit_types.go @@ -63,6 +63,8 @@ var ( errRefreshTokenRequired = errors.New("refresh token is required") errSubjectIDRequired = errors.New("subject id is required") errMissingSignature = errors.New("missing signature") + errStartingHeartbeat = errors.New("error starting heartbeat") + errSendingHeartbeat = errors.New("error sending heartbeat") websocketRequestTimeout = time.Second * 30 @@ -844,7 +846,7 @@ type TransactionsData struct { // response type wsInput struct { JSONRPCVersion string `json:"jsonrpc,omitempty"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` Method string `json:"method"` Params map[string]any `json:"params,omitempty"` } @@ -853,7 +855,7 @@ type wsInput struct { // response type WsRequest struct { JSONRPCVersion string `json:"jsonrpc,omitempty"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` Method string `json:"method"` Params any `json:"params,omitempty"` } @@ -862,16 +864,22 @@ type WsRequest struct { // response type WsSubscriptionInput struct { JSONRPCVersion string `json:"jsonrpc,omitempty"` - ID int64 `json:"id,omitempty"` + ID string `json:"id,omitempty"` Method string `json:"method"` Params map[string][]string `json:"params,omitempty"` } type wsResponse struct { JSONRPCVersion string `json:"jsonrpc,omitempty"` - ID int64 `json:"id,omitempty"` - Result any `json:"result,omitempty"` - Error struct { + ID string `json:"id,omitempty"` + Method string `json:"method"` + Params struct { + Data any `json:"data"` + Channel string `json:"channel"` + Type string `json:"type"` // Used in heartbeat and test_request messages + } `json:"params"` + Result any `json:"result,omitempty"` + Error struct { Message string `json:"message,omitempty"` Code int64 `json:"code,omitempty"` Data any `json:"data"` @@ -880,7 +888,7 @@ type wsResponse struct { type wsLoginResponse struct { JSONRPCVersion string `json:"jsonrpc"` - ID int64 `json:"id"` + ID string `json:"id"` Method string `json:"method"` Result map[string]any `json:"result"` Error *UnmarshalError `json:"error"` @@ -888,7 +896,7 @@ type wsLoginResponse struct { type wsSubscriptionResponse struct { JSONRPCVersion string `json:"jsonrpc"` - ID int64 `json:"id"` + ID string `json:"id"` Method string `json:"method"` Result []string `json:"result"` } @@ -1063,23 +1071,6 @@ type BlockTradeMoveResponse struct { Amount float64 `json:"amount"` } -// WsResponse represents generalized websocket subscription push data and immediate websocket call responses. -type WsResponse struct { - ID int64 `json:"id,omitempty"` - Params struct { - Data any `json:"data"` - Channel string `json:"channel"` - - // Used in heartbead and test_request messages. - Type string `json:"type"` - } `json:"params"` - Method string `json:"method"` - JSONRPCVersion string `json:"jsonrpc"` - - // for status "ok" and "version" push data messages - Result any `json:"result"` -} - // VersionInformation represents websocket version information type VersionInformation struct { Version string `json:"version"` diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index 5f810dcb..9d987bbc 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -105,23 +105,6 @@ var defaultSubscriptions = subscription.List{ {Enabled: true, Asset: asset.All, Channel: subscription.MyTradesChannel, Interval: kline.HundredMilliseconds, Authenticated: true}, } -var ( - pingMessage = WsSubscriptionInput{ - ID: 2, - JSONRPCVersion: rpcVersion, - Method: "public/test", - Params: map[string][]string{}, - } - setHeartBeatMessage = wsInput{ - ID: 1, - JSONRPCVersion: rpcVersion, - Method: "public/set_heartbeat", - Params: map[string]any{ - "interval": 15, - }, - } -) - // WsConnect starts a new connection with the websocket API func (e *Exchange) WsConnect() error { ctx := context.TODO() @@ -129,20 +112,42 @@ func (e *Exchange) WsConnect() error { return websocket.ErrWebsocketNotEnabled } var dialer gws.Dialer - err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}) - if err != nil { + if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil { return err } e.Websocket.Wg.Add(1) go e.wsReadData(ctx) + go e.wsStartHeartbeat(ctx) if e.Websocket.CanUseAuthenticatedEndpoints() { - err = e.wsLogin(ctx) - if err != nil { + if err := e.wsLogin(ctx); err != nil { log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", e.Name, err) e.Websocket.SetCanUseAuthenticatedEndpoints(false) } } - return e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, setHeartBeatMessage) + return nil +} + +func (e *Exchange) wsStartHeartbeat(ctx context.Context) { + msg := wsInput{ + ID: e.MessageID(), + JSONRPCVersion: rpcVersion, + Method: "public/set_heartbeat", + Params: map[string]any{ + "interval": 15, + }, + } + respRaw, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, msg.ID, msg) + if err != nil { + log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errStartingHeartbeat, err) + return + } + var resp wsResponse + if err := json.Unmarshal(respRaw, &resp); err != nil { + log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errStartingHeartbeat, err) + } + if resp.Error.Code != 0 || resp.Error.Message != "" { + log.Errorf(log.ExchangeSys, "%s %s code: %d message: %s", e.Name, errStartingHeartbeat, resp.Error.Code, resp.Error.Message) + } } func (e *Exchange) wsLogin(ctx context.Context) error { @@ -165,7 +170,7 @@ func (e *Exchange) wsLogin(ctx context.Context) error { req := wsInput{ JSONRPCVersion: rpcVersion, Method: "public/auth", - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), Params: map[string]any{ "grant_type": "client_signature", "client_id": creds.Key, @@ -208,21 +213,20 @@ func (e *Exchange) wsReadData(ctx context.Context) { } func (e *Exchange) wsHandleData(ctx context.Context, respRaw []byte) error { - var response WsResponse + var response wsResponse err := json.Unmarshal(respRaw, &response) if err != nil { return fmt.Errorf("%s - err %s could not parse websocket data: %s", e.Name, err, respRaw) } if response.Method == "heartbeat" { - return e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, pingMessage) + go e.wsSendHeartbeat(ctx) + return nil } - if response.ID > 2 { - if !e.Websocket.Match.IncomingWithData(response.ID, respRaw) { - return fmt.Errorf("can't send ws incoming data to Matched channel with RequestID: %d", response.ID) + if response.ID != "" { + if strings.HasPrefix(response.ID, "hb-") { + return nil } - return nil - } else if response.ID > 0 { - return nil + return e.Websocket.Match.RequireMatchWithData(response.ID, respRaw) } channels := strings.Split(response.Params.Channel, ".") switch channels[0] { @@ -321,11 +325,22 @@ func (e *Exchange) wsHandleData(ctx context.Context, respRaw []byte) error { return nil } +func (e *Exchange) wsSendHeartbeat(ctx context.Context) { + msg := WsSubscriptionInput{ + ID: "hb-" + e.MessageID(), + JSONRPCVersion: rpcVersion, + Method: "public/test", + } + if err := e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, msg); err != nil { + log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errSendingHeartbeat, err) + } +} + func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error { if len(channels) != 4 && len(channels) != 5 { return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) } - var response WsResponse + var response wsResponse orderData := []WsOrder{} response.Params.Data = orderData err := json.Unmarshal(respRaw, &response) @@ -374,7 +389,7 @@ func (e *Exchange) processUserOrderChanges(respRaw []byte, channels []string) er if len(channels) < 4 || len(channels) > 5 { return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) } - var response WsResponse + var response wsResponse changeData := &wsChanges{} response.Params.Data = changeData err := json.Unmarshal(respRaw, &response) @@ -454,7 +469,7 @@ func (e *Exchange) processQuoteTicker(respRaw []byte, channels []string) error { if err != nil { return err } - var response WsResponse + var response wsResponse quoteTicker := &wsQuoteTickerInformation{} response.Params.Data = quoteTicker err = json.Unmarshal(respRaw, &response) @@ -484,7 +499,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error { if len(channels) < 3 || len(channels) > 5 { return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) } - var response WsResponse + var response wsResponse var tradeList []wsTrade response.Params.Data = &tradeList err := json.Unmarshal(respRaw, &response) @@ -532,7 +547,7 @@ func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) e if err != nil { return err } - var response WsResponse + var response wsResponse incrementalTicker := &WsIncrementalTicker{} response.Params.Data = incrementalTicker err = json.Unmarshal(respRaw, &response) @@ -568,7 +583,7 @@ func (e *Exchange) processTicker(respRaw []byte, channels []string) error { if err != nil { return err } - var response WsResponse + var response wsResponse tickerPriceResponse := &wsTicker{} response.Params.Data = tickerPriceResponse err = json.Unmarshal(respRaw, &response) @@ -601,7 +616,7 @@ func (e *Exchange) processTicker(respRaw []byte, channels []string) error { } func (e *Exchange) processData(respRaw []byte, result any) error { - var response WsResponse + var response wsResponse response.Params.Data = result err := json.Unmarshal(respRaw, &response) if err != nil { @@ -619,7 +634,7 @@ func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error { if err != nil { return err } - var response WsResponse + var response wsResponse candleData := &wsCandlestickData{} response.Params.Data = candleData err = json.Unmarshal(respRaw, &response) @@ -641,7 +656,7 @@ func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error { } func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { - var response WsResponse + var response wsResponse orderbookData := &wsOrderbook{} response.Params.Data = orderbookData err := json.Unmarshal(respRaw, &response) @@ -817,7 +832,7 @@ func (e *Exchange) handleSubscription(ctx context.Context, method string, subs s r := WsSubscriptionInput{ JSONRPCVersion: rpcVersion, - ID: e.Websocket.Conn.GenerateMessageID(false), + ID: e.MessageID(), Method: method, Params: map[string][]string{"channels": subs.QualifiedChannels()}, } diff --git a/exchanges/deribit/deribit_ws_endpoints.go b/exchanges/deribit/deribit_ws_endpoints.go index 96e79eb2..c647b149 100644 --- a/exchanges/deribit/deribit_ws_endpoints.go +++ b/exchanges/deribit/deribit_ws_endpoints.go @@ -2304,7 +2304,7 @@ func (e *Exchange) SendWSRequest(ctx context.Context, epl request.EndpointLimit, } input := &WsRequest{ JSONRPCVersion: rpcVersion, - ID: e.Websocket.Conn.GenerateMessageID(true), + ID: e.MessageID(), Method: method, Params: params, } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index d94b30b2..f6681e39 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -15,6 +15,7 @@ import ( "time" "unicode" + "github.com/gofrs/uuid" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/config" @@ -1959,3 +1960,9 @@ func (*Base) WebsocketSubmitOrder(context.Context, *order.Submit) (*order.Submit func (*Base) WebsocketSubmitOrders(context.Context, []*order.Submit) (responses []*order.SubmitResponse, err error) { return nil, common.ErrFunctionNotSupported } + +// MessageID returns a universally unique id using UUID V7 +// In the future additional params may be added to method signature to provide context for the message id for overriding exchange implementations +func (b *Base) MessageID() string { + return uuid.Must(uuid.NewV7()).String() +} diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index 9b40642a..0d3e3eb4 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" @@ -2865,11 +2866,22 @@ func TestCheckOrderExecutionLimits(t *testing.T) { } func TestWebsocketSubmitOrder(t *testing.T) { + t.Parallel() _, err := (&Base{}).WebsocketSubmitOrder(t.Context(), nil) require.ErrorIs(t, err, common.ErrFunctionNotSupported) } func TestWebsocketSubmitOrders(t *testing.T) { + t.Parallel() _, err := (&Base{}).WebsocketSubmitOrders(t.Context(), nil) require.ErrorIs(t, err, common.ErrFunctionNotSupported) } + +func TestMessageID(t *testing.T) { + t.Parallel() + id := (new(Base)).MessageID() + require.NotEmpty(t, id, "MessageID must return a non-empty message ID") + u, err := uuid.FromString(id) + require.NoError(t, err, "MessageID must return a valid UUID") + assert.Equal(t, byte(0x7), u.Version(), "MessageID should return a V7 uuid") +} diff --git a/exchanges/interfaces.go b/exchanges/interfaces.go index 584a6b4e..399e64a9 100644 --- a/exchanges/interfaces.go +++ b/exchanges/interfaces.go @@ -40,9 +40,7 @@ type IBotExchange interface { GetEnabledFeatures() FeaturesEnabled GetSupportedFeatures() FeaturesSupported - // GetTradingRequirements returns trading requirements for the exchange GetTradingRequirements() protocol.TradingRequirements - GetCachedTicker(p currency.Pair, a asset.Item) (*ticker.Price, error) UpdateTicker(ctx context.Context, p currency.Pair, a asset.Item) (*ticker.Price, error) UpdateTickers(ctx context.Context, a asset.Item) error