From 676b2e0367751cb56323959c47bbc7d85830726a Mon Sep 17 00:00:00 2001 From: Bea Date: Tue, 8 Apr 2025 08:01:50 +0700 Subject: [PATCH] Deribit: Fix sending trades to the websocket DataHandler (#1856) * Fix sending trades to the DataHandler Additionally set trade direction type to order.Side and set the time to UTC * Amend the len check to make it scalable * Fix the nit --- exchanges/deribit/deribit_test.go | 47 +++++++++++++++++++++ exchanges/deribit/deribit_types.go | 3 +- exchanges/deribit/deribit_websocket.go | 30 ++++++++----- exchanges/deribit/testdata/wsAllTrades.json | 2 + 4 files changed, 71 insertions(+), 11 deletions(-) create mode 100644 exchanges/deribit/testdata/wsAllTrades.json diff --git a/exchanges/deribit/deribit_test.go b/exchanges/deribit/deribit_test.go index 559747a6..4c273a4d 100644 --- a/exchanges/deribit/deribit_test.go +++ b/exchanges/deribit/deribit_test.go @@ -712,6 +712,53 @@ func TestWSRetrieveLastTradesByInstrumentAndTime(t *testing.T) { } } +func TestWSProcessTrades(t *testing.T) { + t.Parallel() + + d := new(Deribit) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(d), "Setup instance must not error") + testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", d.wsHandleData) + close(d.Websocket.DataHandler) + + p, a, err := d.getAssetPairByInstrument("BTC-PERPETUAL") + require.NoError(t, err, "getAssetPairByInstrument must not error") + + exp := []trade.Data{ + { + Exchange: d.Name, + CurrencyPair: p, + Timestamp: time.UnixMilli(1742627465811).UTC(), + Price: 84295.5, + Amount: 8430.0, + Side: order.Buy, + TID: "356130997", + AssetType: a, + }, + { + Exchange: d.Name, + CurrencyPair: p, + Timestamp: time.UnixMilli(1742627361899).UTC(), + Price: 84319.0, + Amount: 580.0, + Side: order.Sell, + TID: "356130979", + AssetType: a, + }, + } + require.Len(t, d.Websocket.DataHandler, len(exp), "Must see the correct number of trades") + for resp := range d.Websocket.DataHandler { + switch v := resp.(type) { + case trade.Data: + i := 1 - len(d.Websocket.DataHandler) + require.Equalf(t, exp[i], v, "Trade [%d] must be correct", i) + case error: + t.Error(v) + default: + t.Errorf("Unexpected type in DataHandler: %T(%s)", v, v) + } + } +} + func TestGetOrderbookData(t *testing.T) { t.Parallel() _, err := d.GetOrderbook(context.Background(), "", 0) diff --git a/exchanges/deribit/deribit_types.go b/exchanges/deribit/deribit_types.go index bc89e8a1..0a271dad 100644 --- a/exchanges/deribit/deribit_types.go +++ b/exchanges/deribit/deribit_types.go @@ -6,6 +6,7 @@ import ( "time" "github.com/thrasher-corp/gocryptotrader/encoding/json" + "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/types" ) @@ -1326,7 +1327,7 @@ type wsTrade struct { MarkPrice float64 `json:"mark_price"` InstrumentName string `json:"instrument_name"` IndexPrice float64 `json:"index_price"` - Direction string `json:"direction"` + Direction order.Side `json:"direction"` Amount float64 `json:"amount"` } diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index e0fb3ae4..1f3f2dc7 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -475,6 +475,12 @@ func (d *Deribit) processQuoteTicker(respRaw []byte, channels []string) error { } func (d *Deribit) processTrades(respRaw []byte, channels []string) error { + tradeFeed := d.IsTradeFeedEnabled() + saveTradeData := d.IsSaveTradeDataEnabled() + if !tradeFeed && !saveTradeData { + return nil + } + if len(channels) < 3 || len(channels) > 5 { return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) } @@ -488,30 +494,34 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error { if len(tradeList) == 0 { return fmt.Errorf("%v, empty list of trades found", common.ErrNoResponse) } - tradeDatas := make([]trade.Data, len(tradeList)) - for x := range tradeDatas { + tradesData := make([]trade.Data, len(tradeList)) + for x := range tradesData { var cp currency.Pair var a asset.Item cp, a, err = d.getAssetPairByInstrument(tradeList[x].InstrumentName) if err != nil { return err } - side, err := order.StringToOrderSide(tradeList[x].Direction) - if err != nil { - return err - } - tradeDatas[x] = trade.Data{ + tradesData[x] = trade.Data{ CurrencyPair: cp, Exchange: d.Name, - Timestamp: tradeList[x].Timestamp.Time(), + Timestamp: tradeList[x].Timestamp.Time().UTC(), Price: tradeList[x].Price, Amount: tradeList[x].Amount, - Side: side, + Side: tradeList[x].Direction, TID: tradeList[x].TradeID, AssetType: a, } } - return trade.AddTradesToBuffer(tradeDatas...) + if tradeFeed { + for i := range tradesData { + d.Websocket.DataHandler <- tradesData[i] + } + } + if saveTradeData { + return trade.AddTradesToBuffer(tradesData...) + } + return nil } func (d *Deribit) processIncrementalTicker(respRaw []byte, channels []string) error { diff --git a/exchanges/deribit/testdata/wsAllTrades.json b/exchanges/deribit/testdata/wsAllTrades.json new file mode 100644 index 00000000..e349e815 --- /dev/null +++ b/exchanges/deribit/testdata/wsAllTrades.json @@ -0,0 +1,2 @@ +{"jsonrpc":"2.0","method":"subscription","params":{"channel":"trades.BTC-PERPETUAL.agg2","data":[{"timestamp":1742627465811,"price":84295.5,"direction":"buy","index_price":84319.98,"instrument_name":"BTC-PERPETUAL","trade_seq":242126092,"amount":8430.0,"mark_price":84292.11,"tick_direction":2,"contracts":843.0,"trade_id":"356130997"}]}} +{"jsonrpc":"2.0","method":"subscription","params":{"channel":"trades.BTC-PERPETUAL.agg2","data":[{"timestamp":1742627361899,"price":84319.0,"direction":"sell","index_price":84348.37,"instrument_name":"BTC-PERPETUAL","trade_seq":242126084,"amount":580.0,"mark_price":84319.46,"tick_direction":2,"contracts":58.0,"trade_id":"356130979"}]}} \ No newline at end of file