Kraken: Fix sending trades to the websocket DataHandler (#1813)

* Send trades down the DataHandler

When the tradeFeed is enabled, send the trades down the DataHandler
Add TestWSProcessTrades

* Update assertions

* Add check against null references in slices

* Add an error for ws parsing field to common

* Update kraken websocket ProcessTrades

Send individual trades down the DataHandler
Send multiple trades in test
Test error if the trade length is too short
Nits

* Fix nits
This commit is contained in:
Bea
2025-03-04 13:06:07 +07:00
committed by GitHub
parent 3d2455b5fb
commit 7fa2592e31
8 changed files with 88 additions and 26 deletions

View File

@@ -1998,7 +1998,7 @@ func TestGetErrResp(t *testing.T) {
seen++
switch seen {
case 1: // no event
assert.ErrorIs(t, testErr, errParsingWSField, "Message with no event Should get correct error type")
assert.ErrorIs(t, testErr, common.ErrParsingWSField, "Message with no event should get correct error type")
assert.ErrorContains(t, testErr, "'event'", "Message with no event error should contain missing field name")
assert.ErrorContains(t, testErr, "nightjar", "Message with no event error should contain the message")
case 2: // with {} for event

View File

@@ -508,7 +508,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
switch event {
case wsEventSubscribed:
@@ -516,7 +516,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
case wsEventUnsubscribed:
chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'chanId': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw)
if err != nil {
@@ -539,7 +539,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
case wsEventAuth:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
if status == "OK" {
var glob map[string]interface{}
@@ -551,7 +551,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
} else {
errCode, err := jsonparser.GetInt(respRaw, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw)
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, respRaw)
}
return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode)
}
@@ -561,7 +561,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
case wsEventConf:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
if status != "OK" {
return fmt.Errorf("WS configure channel error; Status: %s", status)
@@ -578,7 +578,7 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
subID, err := jsonparser.GetUnsafeString(respRaw, "subId")
if err != nil {
return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw)
return fmt.Errorf("%w 'subId': %w from message: %s", common.ErrParsingWSField, err, respRaw)
}
c := b.Websocket.GetSubscription(subID)
@@ -588,7 +588,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pairs)
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, common.ErrParsingWSField, err, c.Channel, c.Pairs)
}
// Note: chanID's int type avoids conflicts with the string type subID key because of the type difference
@@ -1814,19 +1814,19 @@ func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error {
func (b *Bitfinex) getErrResp(resp []byte) error {
event, err := jsonparser.GetUnsafeString(resp, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp)
return fmt.Errorf("%w 'event': %w from message: %s", common.ErrParsingWSField, err, resp)
}
if event != "error" {
return nil
}
errCode, err := jsonparser.GetInt(resp, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp)
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, common.ErrParsingWSField, err, resp)
}
var apiErr error
if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil {
log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp)
log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, common.ErrParsingWSField, e2, resp)
apiErr = common.ErrUnknownError
} else {
apiErr = errors.New(msg)

View File

@@ -33,7 +33,6 @@ const (
)
var (
errParsingWSField = errors.New("error parsing WS field")
errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel")
errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens")
errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores")
@@ -102,7 +101,7 @@ func (b *Bitstamp) wsReadData() {
func (b *Bitstamp) wsHandleData(respRaw []byte) error {
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w `event`: %w", errParsingWSField, err)
return fmt.Errorf("%w `event`: %w", common.ErrParsingWSField, err)
}
event = strings.TrimPrefix(event, "bts:")
@@ -132,7 +131,7 @@ func (b *Bitstamp) wsHandleData(respRaw []byte) error {
func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
return fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err)
}
event = strings.TrimSuffix(event, "scription_succeeded")
return b.Websocket.Match.RequireMatchWithData(event+":"+channel, respRaw)
@@ -402,7 +401,7 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err
func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", common.ErrParsingWSField, err)
}
authParts := strings.Split(channel, "-")

View File

@@ -27,6 +27,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
@@ -1221,6 +1222,41 @@ func TestWsHandleData(t *testing.T) {
testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", k.wsHandleData)
}
func TestWSProcessTrades(t *testing.T) {
t.Parallel()
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(k), "Test instance Setup must not error")
err := k.Websocket.AddSubscriptions(k.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{spotTestPair}, Channel: subscription.AllTradesChannel, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", k.wsHandleData)
close(k.Websocket.DataHandler)
invalid := []any{"trades", []any{[]interface{}{"95873.80000", "0.00051182", "1708731380.3791859"}}}
pair := currency.NewPair(currency.XBT, currency.USD)
err = k.wsProcessTrades(invalid, pair)
require.ErrorContains(t, err, "unexpected trade data length")
expJSON := []string{
`{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"BUY","Price":95873.80000,"Amount":0.00051182,"Timestamp":"2025-02-23T23:29:40.379185914Z"}`,
`{"AssetType":"spot","CurrencyPair":"XBT/USD","Side":"SELL","Price":95940.90000,"Amount":0.00011069,"Timestamp":"2025-02-24T02:01:12.853682041Z"}`,
}
require.Len(t, k.Websocket.DataHandler, len(expJSON), "Must see correct number of trades")
for resp := range k.Websocket.DataHandler {
switch v := resp.(type) {
case trade.Data:
i := 1 - len(k.Websocket.DataHandler)
exp := trade.Data{Exchange: k.Name, CurrencyPair: spotTestPair}
require.NoErrorf(t, json.Unmarshal([]byte(expJSON[i]), &exp), "Must not error unmarshalling json %d: %s", i, expJSON[i])
require.Equalf(t, exp, v, "Trade [%d] must be correct", i)
case error:
t.Error(v)
default:
t.Errorf("Unexpected type in DataHandler: %T (%s)", v, v)
}
}
}
func TestWsOpenOrders(t *testing.T) {
t.Parallel()
k := new(Kraken) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes

View File

@@ -79,7 +79,6 @@ func init() {
var (
authToken string
errParsingWSField = errors.New("error parsing WS field")
errCancellingOrder = errors.New("error cancelling order")
errSubPairMissing = errors.New("pair missing from subscription response")
errInvalidChecksum = errors.New("invalid checksum")
@@ -531,7 +530,9 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if !ok {
return errors.New("received invalid trade data")
}
if !k.IsSaveTradeDataEnabled() {
saveTradeData := k.IsSaveTradeDataEnabled()
tradeFeed := k.IsTradeFeedEnabled()
if !saveTradeData && !tradeFeed {
return nil
}
trades := make([]trade.Data, len(data))
@@ -540,24 +541,37 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if !ok {
return errors.New("unidentified trade data received")
}
timeData, err := strconv.ParseFloat(t[2].(string), 64)
if len(t) < 4 {
return fmt.Errorf("%w; unexpected trade data length: %d", common.ErrParsingWSField, len(t))
}
ts, ok := t[2].(string)
if !ok {
return common.GetTypeAssertError("string", t[2], "trade.time")
}
timeData, err := strconv.ParseFloat(ts, 64)
if err != nil {
return err
}
price, err := strconv.ParseFloat(t[0].(string), 64)
p, ok := t[0].(string)
if !ok {
return common.GetTypeAssertError("string", t[0], "trade.price")
}
price, err := strconv.ParseFloat(p, 64)
if err != nil {
return err
}
amount, err := strconv.ParseFloat(t[1].(string), 64)
v, ok := t[1].(string)
if !ok {
return common.GetTypeAssertError("string", t[1], "trade.volume")
}
amount, err := strconv.ParseFloat(v, 64)
if err != nil {
return err
}
var tSide = order.Buy
s, ok := t[3].(string)
if !ok {
return common.GetTypeAssertError("string", t[3], "side")
return common.GetTypeAssertError("string", t[3], "trade.side")
}
if s == "s" {
tSide = order.Sell
@@ -573,7 +587,15 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
Side: tSide,
}
}
return trade.AddTradesToBuffer(k.Name, trades...)
if tradeFeed {
for i := range trades {
k.Websocket.DataHandler <- trades[i]
}
}
if saveTradeData {
return trade.AddTradesToBuffer(k.Name, trades...)
}
return nil
}
// wsProcessOrderBook handles both partial and full orderbook updates
@@ -1331,7 +1353,7 @@ func (k *Kraken) wsCancelOrder(orderID string) error {
status, err := jsonparser.GetUnsafeString(resp, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, resp)
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, resp)
} else if status == "ok" {
return nil
}

View File

@@ -0,0 +1,2 @@
[119930881,[["95873.80000","0.00051182","1740353380.379186","b","l",""]],"trade","XBT/USD"]
[119930881,[["95940.90000","0.00011069","1740362472.853682","s","l",""]],"trade","XBT/USD"]