Deribit: Replace bespoke message IDs with uuid v7 (#1995)

* Deribit: Switch to string IDs

Switch from int to string IDs so we can use UUID.v7 instead of (the only) local high precision message id implementation

* Deribit: Dedup wsResponse / wsResponse

* Deribit: Use uuid v7 for IDs

This moves away from centralising message ids.
There's no real benefit to moving them to a central generator, since we
can one-line it, and reduce our testing plane and complexity.
And it's more concise for exchanges to say "I'm using this UUID".

* Deribit: Handle errors from StartHeartbeat

* Deribit: Simplify WS ID matching

* Exchanges: Add MessageID function to base
This commit is contained in:
Gareth Kirwan
2025-09-12 12:52:03 +07:00
committed by GitHub
parent 4ac0519a4c
commit 6907dfa6a8
7 changed files with 97 additions and 69 deletions

View File

@@ -63,6 +63,8 @@ var (
errRefreshTokenRequired = errors.New("refresh token is required")
errSubjectIDRequired = errors.New("subject id is required")
errMissingSignature = errors.New("missing signature")
errStartingHeartbeat = errors.New("error starting heartbeat")
errSendingHeartbeat = errors.New("error sending heartbeat")
websocketRequestTimeout = time.Second * 30
@@ -844,7 +846,7 @@ type TransactionsData struct {
// response
type wsInput struct {
JSONRPCVersion string `json:"jsonrpc,omitempty"`
ID int64 `json:"id,omitempty"`
ID string `json:"id,omitempty"`
Method string `json:"method"`
Params map[string]any `json:"params,omitempty"`
}
@@ -853,7 +855,7 @@ type wsInput struct {
// response
type WsRequest struct {
JSONRPCVersion string `json:"jsonrpc,omitempty"`
ID int64 `json:"id,omitempty"`
ID string `json:"id,omitempty"`
Method string `json:"method"`
Params any `json:"params,omitempty"`
}
@@ -862,16 +864,22 @@ type WsRequest struct {
// response
type WsSubscriptionInput struct {
JSONRPCVersion string `json:"jsonrpc,omitempty"`
ID int64 `json:"id,omitempty"`
ID string `json:"id,omitempty"`
Method string `json:"method"`
Params map[string][]string `json:"params,omitempty"`
}
type wsResponse struct {
JSONRPCVersion string `json:"jsonrpc,omitempty"`
ID int64 `json:"id,omitempty"`
Result any `json:"result,omitempty"`
Error struct {
ID string `json:"id,omitempty"`
Method string `json:"method"`
Params struct {
Data any `json:"data"`
Channel string `json:"channel"`
Type string `json:"type"` // Used in heartbeat and test_request messages
} `json:"params"`
Result any `json:"result,omitempty"`
Error struct {
Message string `json:"message,omitempty"`
Code int64 `json:"code,omitempty"`
Data any `json:"data"`
@@ -880,7 +888,7 @@ type wsResponse struct {
type wsLoginResponse struct {
JSONRPCVersion string `json:"jsonrpc"`
ID int64 `json:"id"`
ID string `json:"id"`
Method string `json:"method"`
Result map[string]any `json:"result"`
Error *UnmarshalError `json:"error"`
@@ -888,7 +896,7 @@ type wsLoginResponse struct {
type wsSubscriptionResponse struct {
JSONRPCVersion string `json:"jsonrpc"`
ID int64 `json:"id"`
ID string `json:"id"`
Method string `json:"method"`
Result []string `json:"result"`
}
@@ -1063,23 +1071,6 @@ type BlockTradeMoveResponse struct {
Amount float64 `json:"amount"`
}
// WsResponse represents generalized websocket subscription push data and immediate websocket call responses.
type WsResponse struct {
ID int64 `json:"id,omitempty"`
Params struct {
Data any `json:"data"`
Channel string `json:"channel"`
// Used in heartbead and test_request messages.
Type string `json:"type"`
} `json:"params"`
Method string `json:"method"`
JSONRPCVersion string `json:"jsonrpc"`
// for status "ok" and "version" push data messages
Result any `json:"result"`
}
// VersionInformation represents websocket version information
type VersionInformation struct {
Version string `json:"version"`

View File

@@ -105,23 +105,6 @@ var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.All, Channel: subscription.MyTradesChannel, Interval: kline.HundredMilliseconds, Authenticated: true},
}
var (
pingMessage = WsSubscriptionInput{
ID: 2,
JSONRPCVersion: rpcVersion,
Method: "public/test",
Params: map[string][]string{},
}
setHeartBeatMessage = wsInput{
ID: 1,
JSONRPCVersion: rpcVersion,
Method: "public/set_heartbeat",
Params: map[string]any{
"interval": 15,
},
}
)
// WsConnect starts a new connection with the websocket API
func (e *Exchange) WsConnect() error {
ctx := context.TODO()
@@ -129,20 +112,42 @@ func (e *Exchange) WsConnect() error {
return websocket.ErrWebsocketNotEnabled
}
var dialer gws.Dialer
err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{})
if err != nil {
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
return err
}
e.Websocket.Wg.Add(1)
go e.wsReadData(ctx)
go e.wsStartHeartbeat(ctx)
if e.Websocket.CanUseAuthenticatedEndpoints() {
err = e.wsLogin(ctx)
if err != nil {
if err := e.wsLogin(ctx); err != nil {
log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", e.Name, err)
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
return e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, setHeartBeatMessage)
return nil
}
func (e *Exchange) wsStartHeartbeat(ctx context.Context) {
msg := wsInput{
ID: e.MessageID(),
JSONRPCVersion: rpcVersion,
Method: "public/set_heartbeat",
Params: map[string]any{
"interval": 15,
},
}
respRaw, err := e.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, msg.ID, msg)
if err != nil {
log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errStartingHeartbeat, err)
return
}
var resp wsResponse
if err := json.Unmarshal(respRaw, &resp); err != nil {
log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errStartingHeartbeat, err)
}
if resp.Error.Code != 0 || resp.Error.Message != "" {
log.Errorf(log.ExchangeSys, "%s %s code: %d message: %s", e.Name, errStartingHeartbeat, resp.Error.Code, resp.Error.Message)
}
}
func (e *Exchange) wsLogin(ctx context.Context) error {
@@ -165,7 +170,7 @@ func (e *Exchange) wsLogin(ctx context.Context) error {
req := wsInput{
JSONRPCVersion: rpcVersion,
Method: "public/auth",
ID: e.Websocket.Conn.GenerateMessageID(false),
ID: e.MessageID(),
Params: map[string]any{
"grant_type": "client_signature",
"client_id": creds.Key,
@@ -208,21 +213,20 @@ func (e *Exchange) wsReadData(ctx context.Context) {
}
func (e *Exchange) wsHandleData(ctx context.Context, respRaw []byte) error {
var response WsResponse
var response wsResponse
err := json.Unmarshal(respRaw, &response)
if err != nil {
return fmt.Errorf("%s - err %s could not parse websocket data: %s", e.Name, err, respRaw)
}
if response.Method == "heartbeat" {
return e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, pingMessage)
go e.wsSendHeartbeat(ctx)
return nil
}
if response.ID > 2 {
if !e.Websocket.Match.IncomingWithData(response.ID, respRaw) {
return fmt.Errorf("can't send ws incoming data to Matched channel with RequestID: %d", response.ID)
if response.ID != "" {
if strings.HasPrefix(response.ID, "hb-") {
return nil
}
return nil
} else if response.ID > 0 {
return nil
return e.Websocket.Match.RequireMatchWithData(response.ID, respRaw)
}
channels := strings.Split(response.Params.Channel, ".")
switch channels[0] {
@@ -321,11 +325,22 @@ func (e *Exchange) wsHandleData(ctx context.Context, respRaw []byte) error {
return nil
}
func (e *Exchange) wsSendHeartbeat(ctx context.Context) {
msg := WsSubscriptionInput{
ID: "hb-" + e.MessageID(),
JSONRPCVersion: rpcVersion,
Method: "public/test",
}
if err := e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, msg); err != nil {
log.Errorf(log.ExchangeSys, "%v %s: %s\n", e.Name, errSendingHeartbeat, err)
}
}
func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error {
if len(channels) != 4 && len(channels) != 5 {
return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, "."))
}
var response WsResponse
var response wsResponse
orderData := []WsOrder{}
response.Params.Data = orderData
err := json.Unmarshal(respRaw, &response)
@@ -374,7 +389,7 @@ func (e *Exchange) processUserOrderChanges(respRaw []byte, channels []string) er
if len(channels) < 4 || len(channels) > 5 {
return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, "."))
}
var response WsResponse
var response wsResponse
changeData := &wsChanges{}
response.Params.Data = changeData
err := json.Unmarshal(respRaw, &response)
@@ -454,7 +469,7 @@ func (e *Exchange) processQuoteTicker(respRaw []byte, channels []string) error {
if err != nil {
return err
}
var response WsResponse
var response wsResponse
quoteTicker := &wsQuoteTickerInformation{}
response.Params.Data = quoteTicker
err = json.Unmarshal(respRaw, &response)
@@ -484,7 +499,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error {
if len(channels) < 3 || len(channels) > 5 {
return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, "."))
}
var response WsResponse
var response wsResponse
var tradeList []wsTrade
response.Params.Data = &tradeList
err := json.Unmarshal(respRaw, &response)
@@ -532,7 +547,7 @@ func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) e
if err != nil {
return err
}
var response WsResponse
var response wsResponse
incrementalTicker := &WsIncrementalTicker{}
response.Params.Data = incrementalTicker
err = json.Unmarshal(respRaw, &response)
@@ -568,7 +583,7 @@ func (e *Exchange) processTicker(respRaw []byte, channels []string) error {
if err != nil {
return err
}
var response WsResponse
var response wsResponse
tickerPriceResponse := &wsTicker{}
response.Params.Data = tickerPriceResponse
err = json.Unmarshal(respRaw, &response)
@@ -601,7 +616,7 @@ func (e *Exchange) processTicker(respRaw []byte, channels []string) error {
}
func (e *Exchange) processData(respRaw []byte, result any) error {
var response WsResponse
var response wsResponse
response.Params.Data = result
err := json.Unmarshal(respRaw, &response)
if err != nil {
@@ -619,7 +634,7 @@ func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error {
if err != nil {
return err
}
var response WsResponse
var response wsResponse
candleData := &wsCandlestickData{}
response.Params.Data = candleData
err = json.Unmarshal(respRaw, &response)
@@ -641,7 +656,7 @@ func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error {
}
func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error {
var response WsResponse
var response wsResponse
orderbookData := &wsOrderbook{}
response.Params.Data = orderbookData
err := json.Unmarshal(respRaw, &response)
@@ -817,7 +832,7 @@ func (e *Exchange) handleSubscription(ctx context.Context, method string, subs s
r := WsSubscriptionInput{
JSONRPCVersion: rpcVersion,
ID: e.Websocket.Conn.GenerateMessageID(false),
ID: e.MessageID(),
Method: method,
Params: map[string][]string{"channels": subs.QualifiedChannels()},
}

View File

@@ -2304,7 +2304,7 @@ func (e *Exchange) SendWSRequest(ctx context.Context, epl request.EndpointLimit,
}
input := &WsRequest{
JSONRPCVersion: rpcVersion,
ID: e.Websocket.Conn.GenerateMessageID(true),
ID: e.MessageID(),
Method: method,
Params: params,
}

View File

@@ -15,6 +15,7 @@ import (
"time"
"unicode"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
@@ -1959,3 +1960,9 @@ func (*Base) WebsocketSubmitOrder(context.Context, *order.Submit) (*order.Submit
func (*Base) WebsocketSubmitOrders(context.Context, []*order.Submit) (responses []*order.SubmitResponse, err error) {
return nil, common.ErrFunctionNotSupported
}
// MessageID returns a universally unique id using UUID V7
// In the future additional params may be added to method signature to provide context for the message id for overriding exchange implementations
func (b *Base) MessageID() string {
return uuid.Must(uuid.NewV7()).String()
}

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thrasher-corp/gocryptotrader/common"
@@ -2865,11 +2866,22 @@ func TestCheckOrderExecutionLimits(t *testing.T) {
}
func TestWebsocketSubmitOrder(t *testing.T) {
t.Parallel()
_, err := (&Base{}).WebsocketSubmitOrder(t.Context(), nil)
require.ErrorIs(t, err, common.ErrFunctionNotSupported)
}
func TestWebsocketSubmitOrders(t *testing.T) {
t.Parallel()
_, err := (&Base{}).WebsocketSubmitOrders(t.Context(), nil)
require.ErrorIs(t, err, common.ErrFunctionNotSupported)
}
func TestMessageID(t *testing.T) {
t.Parallel()
id := (new(Base)).MessageID()
require.NotEmpty(t, id, "MessageID must return a non-empty message ID")
u, err := uuid.FromString(id)
require.NoError(t, err, "MessageID must return a valid UUID")
assert.Equal(t, byte(0x7), u.Version(), "MessageID should return a V7 uuid")
}

View File

@@ -40,9 +40,7 @@ type IBotExchange interface {
GetEnabledFeatures() FeaturesEnabled
GetSupportedFeatures() FeaturesSupported
// GetTradingRequirements returns trading requirements for the exchange
GetTradingRequirements() protocol.TradingRequirements
GetCachedTicker(p currency.Pair, a asset.Item) (*ticker.Price, error)
UpdateTicker(ctx context.Context, p currency.Pair, a asset.Item) (*ticker.Price, error)
UpdateTickers(ctx context.Context, a asset.Item) error