From 7fa2592e31497d16771642245af688046b3dbea7 Mon Sep 17 00:00:00 2001 From: Bea Date: Tue, 4 Mar 2025 13:06:07 +0700 Subject: [PATCH] Kraken: Fix sending trades to the websocket DataHandler (#1813) * Send trades down the DataHandler When the tradeFeed is enabled, send the trades down the DataHandler Add TestWSProcessTrades * Update assertions * Add check against null references in slices * Add an error for ws parsing field to common * Update kraken websocket ProcessTrades Send individual trades down the DataHandler Send multiple trades in test Test error if the trade length is too short Nits * Fix nits --- common/common.go | 1 + exchanges/bitfinex/bitfinex_test.go | 2 +- exchanges/bitfinex/bitfinex_websocket.go | 20 +++++------ exchanges/bitstamp/bitstamp_websocket.go | 7 ++-- exchanges/kraken/kraken_test.go | 36 +++++++++++++++++++ exchanges/kraken/kraken_websocket.go | 42 ++++++++++++++++------ exchanges/kraken/testdata/wsAllTrades.json | 2 ++ testdata/configtest.json | 4 ++- 8 files changed, 88 insertions(+), 26 deletions(-) create mode 100644 exchanges/kraken/testdata/wsAllTrades.json diff --git a/common/common.go b/common/common.go index 65c05172..7fb5cf92 100644 --- a/common/common.go +++ b/common/common.go @@ -72,6 +72,7 @@ var ( ErrUnknownError = errors.New("unknown error") ErrGettingField = errors.New("error getting field") ErrSettingField = errors.New("error setting field") + ErrParsingWSField = errors.New("error parsing websocket field") ) var ( diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 1340b7cb..81b6db82 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -1998,7 +1998,7 @@ func TestGetErrResp(t *testing.T) { seen++ switch seen { case 1: // no event - assert.ErrorIs(t, testErr, errParsingWSField, "Message with no event Should get correct error type") + assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event should get correct error type") assert.ErrorContains(t, testErr, "'event'", "Message with no event error should contain missing field name") assert.ErrorContains(t, testErr, "nightjar", "Message with no event error should contain the message") case 2: // with {} for event diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index d5adeed6..d39a00fb 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -508,7 +508,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { func (b *Bitfinex) handleWSEvent(respRaw []byte) error { event, err := jsonparser.GetUnsafeString(respRaw, "event") if err != nil { - return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, respRaw) } switch event { case wsEventSubscribed: @@ -516,7 +516,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventUnsubscribed: chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId") if err != nil { - return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'chanId': %w from message: %s", common.ErrParsingWSField, err, respRaw) } err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw) if err != nil { @@ -539,7 +539,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventAuth: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw) } if status == "OK" { var glob map[string]interface{} @@ -551,7 +551,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { } else { errCode, err := jsonparser.GetInt(respRaw, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, respRaw) } return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) } @@ -561,7 +561,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { case wsEventConf: status, err := jsonparser.GetUnsafeString(respRaw, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw) } if status != "OK" { return fmt.Errorf("WS configure channel error; Status: %s", status) @@ -578,7 +578,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error { func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { subID, err := jsonparser.GetUnsafeString(respRaw, "subId") if err != nil { - return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw) + return fmt.Errorf("%w 'subId': %w from message: %s", common.ErrParsingWSField, err, respRaw) } c := b.Websocket.GetSubscription(subID) @@ -588,7 +588,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { chanID, err := jsonparser.GetInt(respRaw, "chanId") if err != nil { - return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pairs) + return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, common.ErrParsingWSField, err, c.Channel, c.Pairs) } // Note: chanID's int type avoids conflicts with the string type subID key because of the type difference @@ -1814,19 +1814,19 @@ func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error { func (b *Bitfinex) getErrResp(resp []byte) error { event, err := jsonparser.GetUnsafeString(resp, "event") if err != nil { - return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp) + return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, resp) } if event != "error" { return nil } errCode, err := jsonparser.GetInt(resp, "code") if err != nil { - log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp) + log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, resp) } var apiErr error if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil { - log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp) + log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, common.ErrParsingWSField, e2, resp) apiErr = common.ErrUnknownError } else { apiErr = errors.New(msg) diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index c8339963..404c795d 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -33,7 +33,6 @@ const ( ) var ( - errParsingWSField = errors.New("error parsing WS field") errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel") errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens") errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores") @@ -102,7 +101,7 @@ func (b *Bitstamp) wsReadData() { func (b *Bitstamp) wsHandleData(respRaw []byte) error { event, err := jsonparser.GetUnsafeString(respRaw, "event") if err != nil { - return fmt.Errorf("%w `event`: %w", errParsingWSField, err) + return fmt.Errorf("%w `event`: %w", common.ErrParsingWSField, err) } event = strings.TrimPrefix(event, "bts:") @@ -132,7 +131,7 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error { func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error { channel, err := jsonparser.GetUnsafeString(respRaw, "channel") if err != nil { - return fmt.Errorf("%w `channel`: %w", errParsingWSField, err) + return fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err) } event = strings.TrimSuffix(event, "scription_succeeded") return b.Websocket.Match.RequireMatchWithData(event+":"+channel, respRaw) @@ -402,7 +401,7 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) { channel, err := jsonparser.GetUnsafeString(respRaw, "channel") if err != nil { - return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err) + return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err) } authParts := strings.Split(channel, "-") diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 234ac536..99a75865 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -27,6 +27,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket" @@ -1221,6 +1222,41 @@ func TestWsHandleData(t *testing.T) { testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData) } +func TestWSProcessTrades(t *testing.T) { + t.Parallel() + + k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(k), "Test instance Setup must not error") + err := k.Websocket.AddSubscriptions(k.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{spotTestPair}, Channel: subscription.AllTradesChannel, Key: 18788}) + require.NoError(t, err, "AddSubscriptions must not error") + testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", k.wsHandleData) + close(k.Websocket.DataHandler) + + invalid := []any{"trades", []any{[]interface{}{"95873.80000", "0.00051182", "1708731380.3791859"}}} + pair := currency.NewPair(currency.XBT, currency.USD) + err = k.wsProcessTrades(invalid, pair) + require.ErrorContains(t, err, "unexpected trade data length") + + expJSON := []string{ + `{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"BUY","Price":95873.80000,"Amount":0.00051182,"Timestamp":"2025-02-23T23:29:40.379185914Z"}`, + `{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"SELL","Price":95940.90000,"Amount":0.00011069,"Timestamp":"2025-02-24T02:01:12.853682041Z"}`, + } + require.Len(t, k.Websocket.DataHandler, len(expJSON), "Must see correct number of trades") + for resp := range k.Websocket.DataHandler { + switch v := resp.(type) { + case trade.Data: + i := 1 - len(k.Websocket.DataHandler) + exp := trade.Data{Exchange: k.Name, CurrencyPair: spotTestPair} + require.NoErrorf(t, json.Unmarshal([]byte(expJSON[i]), &exp), "Must not error unmarshalling json %d: %s", i, expJSON[i]) + require.Equalf(t, exp, v, "Trade [%d] must be correct", i) + case error: + t.Error(v) + default: + t.Errorf("Unexpected type in DataHandler: %T (%s)", v, v) + } + } +} + func TestWsOpenOrders(t *testing.T) { t.Parallel() k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index bb1c3711..8103ce9e 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -79,7 +79,6 @@ func init() { var ( authToken string - errParsingWSField = errors.New("error parsing WS field") errCancellingOrder = errors.New("error cancelling order") errSubPairMissing = errors.New("pair missing from subscription response") errInvalidChecksum = errors.New("invalid checksum") @@ -531,7 +530,9 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { if !ok { return errors.New("received invalid trade data") } - if !k.IsSaveTradeDataEnabled() { + saveTradeData := k.IsSaveTradeDataEnabled() + tradeFeed := k.IsTradeFeedEnabled() + if !saveTradeData && !tradeFeed { return nil } trades := make([]trade.Data, len(data)) @@ -540,24 +541,37 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { if !ok { return errors.New("unidentified trade data received") } - timeData, err := strconv.ParseFloat(t[2].(string), 64) + if len(t) < 4 { + return fmt.Errorf("%w; unexpected trade data length: %d", common.ErrParsingWSField, len(t)) + } + ts, ok := t[2].(string) + if !ok { + return common.GetTypeAssertError("string", t[2], "trade.time") + } + timeData, err := strconv.ParseFloat(ts, 64) if err != nil { return err } - - price, err := strconv.ParseFloat(t[0].(string), 64) + p, ok := t[0].(string) + if !ok { + return common.GetTypeAssertError("string", t[0], "trade.price") + } + price, err := strconv.ParseFloat(p, 64) if err != nil { return err } - - amount, err := strconv.ParseFloat(t[1].(string), 64) + v, ok := t[1].(string) + if !ok { + return common.GetTypeAssertError("string", t[1], "trade.volume") + } + amount, err := strconv.ParseFloat(v, 64) if err != nil { return err } var tSide = order.Buy s, ok := t[3].(string) if !ok { - return common.GetTypeAssertError("string", t[3], "side") + return common.GetTypeAssertError("string", t[3], "trade.side") } if s == "s" { tSide = order.Sell @@ -573,7 +587,15 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error { Side: tSide, } } - return trade.AddTradesToBuffer(k.Name, trades...) + if tradeFeed { + for i := range trades { + k.Websocket.DataHandler <- trades[i] + } + } + if saveTradeData { + return trade.AddTradesToBuffer(k.Name, trades...) + } + return nil } // wsProcessOrderBook handles both partial and full orderbook updates @@ -1331,7 +1353,7 @@ func (k *Kraken) wsCancelOrder(orderID string) error { status, err := jsonparser.GetUnsafeString(resp, "status") if err != nil { - return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, resp) + return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, resp) } else if status == "ok" { return nil } diff --git a/exchanges/kraken/testdata/wsAllTrades.json b/exchanges/kraken/testdata/wsAllTrades.json new file mode 100644 index 00000000..db862606 --- /dev/null +++ b/exchanges/kraken/testdata/wsAllTrades.json @@ -0,0 +1,2 @@ +[119930881,[["95873.80000","0.00051182","1740353380.379186","b","l",""]],"trade","XBT/USD"] +[119930881,[["95940.90000","0.00011069","1740362472.853682","s","l",""]],"trade","XBT/USD"] diff --git a/testdata/configtest.json b/testdata/configtest.json index 960f5519..eaf1073c 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -1968,7 +1968,9 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": true + "websocketAPI": true, + "saveTradeData": false, + "tradeFeed": true } }, "bankAccounts": [