diff --git a/exchanges/bitstamp/bitstamp_test.go b/exchanges/bitstamp/bitstamp_test.go index 5037b6c3..f66a2ca1 100644 --- a/exchanges/bitstamp/bitstamp_test.go +++ b/exchanges/bitstamp/bitstamp_test.go @@ -16,7 +16,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/banking" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -760,11 +762,11 @@ func TestWsTrade(t *testing.T) { func TestWsOrderbook(t *testing.T) { pressXToJSON := []byte(`{"data": {"timestamp": "1580336834", "microtimestamp": "1580336834607546", "bids": [["9328.28", "0.05925332"], ["9327.34", "0.43120000"], ["9327.29", "0.63470860"], ["9326.59", "0.41114619"], ["9326.38", "1.06910000"], ["9323.91", "2.67930000"], ["9322.69", "0.80000000"], ["9322.57", "0.03000000"], ["9322.31", "1.36010820"], ["9319.54", "0.03090000"], ["9318.97", "0.28000000"], ["9317.61", "0.02910000"], ["9316.39", "1.08000000"], ["9316.20", "2.00000000"], ["9315.48", "1.00000000"], ["9314.72", "0.11197459"], ["9314.47", "0.32207398"], ["9312.53", "0.03961501"], ["9312.29", "1.00000000"], ["9311.78", "0.03060000"], ["9311.69", "0.32217221"], ["9310.98", "3.29000000"], ["9310.18", "0.01304192"], ["9310.13", "0.02500000"], ["9309.04", "1.00000000"], ["9309.00", "0.05000000"], ["9308.96", "0.03030000"], ["9308.91", "0.32227154"], ["9307.52", "0.32191362"], ["9307.25", "2.44280000"], ["9305.92", "3.00000000"], ["9305.62", "2.37600000"], ["9305.60", "0.21815312"], ["9305.54", "2.80000000"], ["9305.13", "0.05000000"], ["9305.02", "2.90917302"], ["9303.68", "0.02316372"], ["9303.53", "12.55000000"], ["9303.00", "0.02191430"], ["9302.94", "2.38250000"], ["9302.37", "0.01000000"], ["9301.85", "2.50000000"], ["9300.89", "0.02000000"], ["9300.40", "4.10000000"], ["9300.00", "0.33936139"], ["9298.48", "1.45200000"], ["9297.80", "0.42380000"], ["9295.44", "4.54689328"], ["9295.43", "3.20000000"], ["9295.00", "0.28669566"], ["9291.66", "14.09931321"], ["9290.13", "2.87254900"], ["9290.00", "0.67530840"], ["9285.37", "0.38033002"], ["9285.15", "5.37993528"], ["9285.00", "0.09419278"], ["9283.71", "0.15679830"], ["9280.33", "12.55000000"], ["9280.13", "3.20310000"], ["9280.00", "1.36477909"], ["9276.01", "0.00707488"], ["9275.75", "0.56974291"], ["9275.00", "5.88000000"], ["9274.00", "0.00754205"], ["9271.68", "0.01400000"], ["9271.11", "15.37188500"], ["9270.00", "0.06674325"], ["9268.79", "24.54320000"], ["9257.18", "12.55000000"], ["9256.30", "0.17876365"], ["9255.71", "13.82642967"], ["9254.79", "0.96329407"], ["9250.00", "0.78214958"], ["9245.34", "4.90200000"], ["9245.13", "0.10000000"], ["9240.00", "0.44383459"], ["9238.84", "13.16615207"], ["9234.11", "0.43317656"], ["9234.10", "12.55000000"], ["9231.28", "11.79290000"], ["9230.09", "4.15059441"], ["9227.69", "0.00791097"], ["9225.00", "0.44768346"], ["9224.49", "0.85857203"], ["9223.50", "5.61001041"], ["9216.01", "0.03222653"], ["9216.00", "0.05000000"], ["9213.54", "0.71253866"], ["9212.50", "2.86768195"], ["9211.07", "12.55000000"], ["9210.00", "0.54288817"], ["9208.00", "1.00000000"], ["9206.06", "2.62587578"], ["9205.98", "15.40000000"], ["9205.52", "0.01710603"], ["9205.37", "0.03524953"], ["9205.11", "0.15000000"], ["9205.00", "0.01534763"], ["9204.76", "7.00600000"], ["9203.00", "0.01090000"]], "asks": [["9337.10", "0.03000000"], ["9340.85", "2.67820000"], ["9340.95", "0.02900000"], ["9341.17", "1.00000000"], ["9341.41", "2.13966390"], ["9341.61", "0.20000000"], ["9341.97", "0.11199911"], ["9341.98", "3.00000000"], ["9342.26", "0.32112762"], ["9343.87", "1.00000000"], ["9344.17", "3.57250000"], ["9345.04", "0.32103450"], ["9345.41", "4.90000000"], ["9345.69", "1.03000000"], ["9345.80", "0.03000000"], ["9346.00", "0.10200000"], ["9346.69", "0.02397394"], ["9347.41", "1.00000000"], ["9347.82", "0.32094177"], ["9348.23", "0.02880000"], ["9348.62", "11.96287551"], ["9349.31", "2.44270000"], ["9349.47", "0.96000000"], ["9349.86", "4.50000000"], ["9350.37", "0.03300000"], ["9350.57", "0.34682266"], ["9350.60", "0.32085527"], ["9351.45", "0.31147923"], ["9352.31", "0.28000000"], ["9352.86", "9.80000000"], ["9353.73", "0.02360739"], ["9354.00", "0.45000000"], ["9354.12", "0.03000000"], ["9354.29", "3.82446861"], ["9356.20", "0.64000000"], ["9356.90", "0.02316372"], ["9357.30", "2.50000000"], ["9357.70", "2.38240000"], ["9358.92", "6.00000000"], ["9359.97", "0.34898075"], ["9359.98", "2.30000000"], ["9362.56", "2.37600000"], ["9365.00", "0.64000000"], ["9365.16", "1.70030306"], ["9365.27", "3.03000000"], ["9369.99", "2.47102665"], ["9370.00", "3.15688574"], ["9370.21", "2.32720000"], ["9371.78", "13.20000000"], ["9371.89", "0.96293482"], ["9375.08", "4.74762500"], ["9384.34", "1.45200000"], ["9384.49", "16.42310000"], ["9385.66", "0.34382112"], ["9388.19", "0.00268265"], ["9392.20", "0.20980000"], ["9392.40", "0.10320000"], ["9393.00", "0.20980000"], ["9395.40", "0.40000000"], ["9398.86", "24.54310000"], ["9400.00", "0.05489988"], ["9400.33", "0.00495100"], ["9400.45", "0.00484700"], ["9402.92", "17.20000000"], ["9404.18", "10.00000000"], ["9418.89", "16.38000000"], ["9419.41", "3.06700000"], ["9420.40", "12.50000000"], ["9421.11", "0.10500000"], ["9434.47", "0.03215805"], ["9434.48", "0.28285714"], ["9434.49", "15.83000000"], ["9435.13", "0.15000000"], ["9438.93", "0.00368800"], ["9439.19", "0.69343985"], ["9442.86", "0.10000000"], ["9443.96", "12.50000000"], ["9444.00", "0.06004471"], ["9444.97", "0.01494896"], ["9447.00", "0.01234000"], ["9448.97", "0.14500000"], ["9449.00", "0.05000000"], ["9450.00", "11.13426018"], ["9451.87", "15.90000000"], ["9452.00", "0.20000000"], ["9454.25", "0.01100000"], ["9454.51", "0.02409062"], ["9455.05", "0.00600063"], ["9456.00", "0.27965118"], ["9456.10", "0.17000000"], ["9459.00", "0.00320000"], ["9459.98", "0.02460685"], ["9459.99", "8.11000000"], ["9460.00", "0.08500000"], ["9464.36", "0.56957951"], ["9464.54", "0.69158059"], ["9465.00", "21.00002015"], ["9467.57", "12.50000000"], ["9468.00", "0.08800000"], ["9469.09", "13.94000000"]]}, "event": "data", "channel": "order_book_btcusd"}`) err := b.wsHandleData(pressXToJSON) - require.NoError(t, err, "TestWsOrderbook must not error") + require.NoError(t, err, "wsHandleData must not error") pressXToJSON = []byte(`{"data": {"timestamp": "1580336834", "microtimestamp": "1580336834607546", "bids": [["9328.28", "0.05925332"], ["9327.34", "0.43120000"], ["9327.29", "0.63470860"], ["9326.59", "0.41114619"], ["9326.38", "1.06910000"], ["9323.91", "2.67930000"], ["9322.69", "0.80000000"], ["9322.57", "0.03000000"], ["9322.31", "1.36010820"], ["9319.54", "0.03090000"], ["9318.97", "0.28000000"], ["9317.61", "0.02910000"], ["9316.39", "1.08000000"], ["9316.20", "2.00000000"], ["9315.48", "1.00000000"], ["9314.72", "0.11197459"], ["9314.47", "0.32207398"], ["9312.53", "0.03961501"], ["9312.29", "1.00000000"], ["9311.78", "0.03060000"], ["9311.69", "0.32217221"], ["9310.98", "3.29000000"], ["9310.18", "0.01304192"], ["9310.13", "0.02500000"], ["9309.04", "1.00000000"], ["9309.00", "0.05000000"], ["9308.96", "0.03030000"], ["9308.91", "0.32227154"], ["9307.52", "0.32191362"], ["9307.25", "2.44280000"], ["9305.92", "3.00000000"], ["9305.62", "2.37600000"], ["9305.60", "0.21815312"], ["9305.54", "2.80000000"], ["9305.13", "0.05000000"], ["9305.02", "2.90917302"], ["9303.68", "0.02316372"], ["9303.53", "12.55000000"], ["9303.00", "0.02191430"], ["9302.94", "2.38250000"], ["9302.37", "0.01000000"], ["9301.85", "2.50000000"], ["9300.89", "0.02000000"], ["9300.40", "4.10000000"], ["9300.00", "0.33936139"], ["9298.48", "1.45200000"], ["9297.80", "0.42380000"], ["9295.44", "4.54689328"], ["9295.43", "3.20000000"], ["9295.00", "0.28669566"], ["9291.66", "14.09931321"], ["9290.13", "2.87254900"], ["9290.00", "0.67530840"], ["9285.37", "0.38033002"], ["9285.15", "5.37993528"], ["9285.00", "0.09419278"], ["9283.71", "0.15679830"], ["9280.33", "12.55000000"], ["9280.13", "3.20310000"], ["9280.00", "1.36477909"], ["9276.01", "0.00707488"], ["9275.75", "0.56974291"], ["9275.00", "5.88000000"], ["9274.00", "0.00754205"], ["9271.68", "0.01400000"], ["9271.11", "15.37188500"], ["9270.00", "0.06674325"], ["9268.79", "24.54320000"], ["9257.18", "12.55000000"], ["9256.30", "0.17876365"], ["9255.71", "13.82642967"], ["9254.79", "0.96329407"], ["9250.00", "0.78214958"], ["9245.34", "4.90200000"], ["9245.13", "0.10000000"], ["9240.00", "0.44383459"], ["9238.84", "13.16615207"], ["9234.11", "0.43317656"], ["9234.10", "12.55000000"], ["9231.28", "11.79290000"], ["9230.09", "4.15059441"], ["9227.69", "0.00791097"], ["9225.00", "0.44768346"], ["9224.49", "0.85857203"], ["9223.50", "5.61001041"], ["9216.01", "0.03222653"], ["9216.00", "0.05000000"], ["9213.54", "0.71253866"], ["9212.50", "2.86768195"], ["9211.07", "12.55000000"], ["9210.00", "0.54288817"], ["9208.00", "1.00000000"], ["9206.06", "2.62587578"], ["9205.98", "15.40000000"], ["9205.52", "0.01710603"], ["9205.37", "0.03524953"], ["9205.11", "0.15000000"], ["9205.00", "0.01534763"], ["9204.76", "7.00600000"], ["9203.00", "0.01090000"]], "asks": [["9337.10", "0.03000000"], ["9340.85", "2.67820000"], ["9340.95", "0.02900000"], ["9341.17", "1.00000000"], ["9341.41", "2.13966390"], ["9341.61", "0.20000000"], ["9341.97", "0.11199911"], ["9341.98", "3.00000000"], ["9342.26", "0.32112762"], ["9343.87", "1.00000000"], ["9344.17", "3.57250000"], ["9345.04", "0.32103450"], ["9345.41", "4.90000000"], ["9345.69", "1.03000000"], ["9345.80", "0.03000000"], ["9346.00", "0.10200000"], ["9346.69", "0.02397394"], ["9347.41", "1.00000000"], ["9347.82", "0.32094177"], ["9348.23", "0.02880000"], ["9348.62", "11.96287551"], ["9349.31", "2.44270000"], ["9349.47", "0.96000000"], ["9349.86", "4.50000000"], ["9350.37", "0.03300000"], ["9350.57", "0.34682266"], ["9350.60", "0.32085527"], ["9351.45", "0.31147923"], ["9352.31", "0.28000000"], ["9352.86", "9.80000000"], ["9353.73", "0.02360739"], ["9354.00", "0.45000000"], ["9354.12", "0.03000000"], ["9354.29", "3.82446861"], ["9356.20", "0.64000000"], ["9356.90", "0.02316372"], ["9357.30", "2.50000000"], ["9357.70", "2.38240000"], ["9358.92", "6.00000000"], ["9359.97", "0.34898075"], ["9359.98", "2.30000000"], ["9362.56", "2.37600000"], ["9365.00", "0.64000000"], ["9365.16", "1.70030306"], ["9365.27", "3.03000000"], ["9369.99", "2.47102665"], ["9370.00", "3.15688574"], ["9370.21", "2.32720000"], ["9371.78", "13.20000000"], ["9371.89", "0.96293482"], ["9375.08", "4.74762500"], ["9384.34", "1.45200000"], ["9384.49", "16.42310000"], ["9385.66", "0.34382112"], ["9388.19", "0.00268265"], ["9392.20", "0.20980000"], ["9392.40", "0.10320000"], ["9393.00", "0.20980000"], ["9395.40", "0.40000000"], ["9398.86", "24.54310000"], ["9400.00", "0.05489988"], ["9400.33", "0.00495100"], ["9400.45", "0.00484700"], ["9402.92", "17.20000000"], ["9404.18", "10.00000000"], ["9418.89", "16.38000000"], ["9419.41", "3.06700000"], ["9420.40", "12.50000000"], ["9421.11", "0.10500000"], ["9434.47", "0.03215805"], ["9434.48", "0.28285714"], ["9434.49", "15.83000000"], ["9435.13", "0.15000000"], ["9438.93", "0.00368800"], ["9439.19", "0.69343985"], ["9442.86", "0.10000000"], ["9443.96", "12.50000000"], ["9444.00", "0.06004471"], ["9444.97", "0.01494896"], ["9447.00", "0.01234000"], ["9448.97", "0.14500000"], ["9449.00", "0.05000000"], ["9450.00", "11.13426018"], ["9451.87", "15.90000000"], ["9452.00", "0.20000000"], ["9454.25", "0.01100000"], ["9454.51", "0.02409062"], ["9455.05", "0.00600063"], ["9456.00", "0.27965118"], ["9456.10", "0.17000000"], ["9459.00", "0.00320000"], ["9459.98", "0.02460685"], ["9459.99", "8.11000000"], ["9460.00", "0.08500000"], ["9464.36", "0.56957951"], ["9464.54", "0.69158059"], ["9465.00", "21.00002015"], ["9467.57", "12.50000000"], ["9468.00", "0.08800000"], ["9469.09", "13.94000000"]]}, "event": "data", "channel": ""}`) err = b.wsHandleData(pressXToJSON) - require.Equal(t, errWSPairParsingError, err, "TestWsOrderbook must error parsing error") + require.ErrorIs(t, err, errChannelUnderscores, "wsHandleData must error parsing channel") } func TestWsOrderbook2(t *testing.T) { @@ -1035,3 +1037,48 @@ func TestGetCurrencyTradeURL(t *testing.T) { assert.NotEmpty(t, resp) } } + +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + b.Websocket.SetCanUseAuthenticatedEndpoints(true) + require.True(t, b.Websocket.CanUseAuthenticatedEndpoints(), "CanUseAuthenticatedEndpoints must return true") + subs, err := b.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + exp := subscription.List{} + pairs, err := b.GetEnabledPairs(asset.Spot) + require.NoError(t, err, "GetEnabledPairs must not error") + for _, baseSub := range b.Features.Subscriptions { + for _, p := range pairs.Format(currency.PairFormat{Uppercase: false}) { + s := baseSub.Clone() + s.Pairs = currency.Pairs{p} + s.QualifiedChannel = channelName(s) + "_" + p.String() + exp = append(exp, s) + } + } + testsubs.EqualLists(t, exp, subs) + assert.PanicsWithError(t, + "subscription channel not supported: wibble", + func() { channelName(&subscription.Subscription{Channel: "wibble"}) }, + "should panic on invalid channel", + ) +} + +func TestSubscribe(t *testing.T) { + t.Parallel() + b := new(Bitstamp) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + subs, err := b.Features.Subscriptions.ExpandTemplates(b) + require.NoError(t, err, "ExpandTemplates must not error") + b.Features.Subscriptions = subscription.List{} + testexch.SetupWs(t, b) + err = b.Subscribe(subs) + require.NoError(t, err, "Subscribe must not error") + for _, s := range subs { + assert.Equalf(t, subscription.SubscribedState, s.State(), "Subscription %s should be subscribed", s) + } + err = b.Unsubscribe(subs) + require.NoError(t, err, "UnSubscribe must not error") + for _, s := range subs { + assert.Equalf(t, subscription.UnsubscribedState, s.State(), "Subscription %s should be subscribed", s) + } +} diff --git a/exchanges/bitstamp/bitstamp_types.go b/exchanges/bitstamp/bitstamp_types.go index 36f29e62..6590a096 100644 --- a/exchanges/bitstamp/bitstamp_types.go +++ b/exchanges/bitstamp/bitstamp_types.go @@ -1,8 +1,6 @@ package bitstamp import ( - "errors" - "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/types" ) @@ -21,8 +19,6 @@ const ( SellOrder ) -var errWSPairParsingError = errors.New("unable to parse currency pair from wsResponse.Channel") - // Ticker holds ticker information type Ticker struct { Last float64 `json:"last,string"` @@ -220,10 +216,8 @@ type websocketData struct { } type websocketResponse struct { - Event string `json:"event"` - Channel string `json:"channel"` - channelType string - pair currency.Pair + Event string `json:"event"` + Channel string `json:"channel"` } type websocketTradeResponse struct { diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 08dbbf25..de45f2fb 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -8,13 +8,16 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/request" @@ -30,19 +33,28 @@ const ( ) var ( + errParsingWSField = errors.New("error parsing WS field") + errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel") + errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens") + errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores") + hbMsg = []byte(`{"event":"bts:heartbeat"}`) - - defaultSubChannels = []string{ - bitstampAPIWSTrades, - bitstampAPIWSOrderbook, - } - - defaultAuthSubChannels = []string{ - bitstampAPIWSMyOrders, - bitstampAPIWSMyTrades, - } ) +var defaultSubscriptions = subscription.List{ + {Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true}, + {Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true}, +} + +var subscriptionNames = map[string]string{ + subscription.OrderbookChannel: bitstampAPIWSOrderbook, + subscription.AllTradesChannel: bitstampAPIWSTrades, + subscription.MyOrdersChannel: bitstampAPIWSMyOrders, + subscription.MyTradesChannel: bitstampAPIWSMyTrades, +} + // WsConnect connects to a websocket feed func (b *Bitstamp) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { @@ -88,78 +100,55 @@ func (b *Bitstamp) wsReadData() { } func (b *Bitstamp) wsHandleData(respRaw []byte) error { - wsResponse := &websocketResponse{} - if err := json.Unmarshal(respRaw, wsResponse); err != nil { - return err + event, err := jsonparser.GetUnsafeString(respRaw, "event") + if err != nil { + return fmt.Errorf("%w `event`: %w", errParsingWSField, err) } - if err := b.parseChannelName(wsResponse); err != nil { - return err - } - - switch wsResponse.Event { - case "bts:heartbeat": + event = strings.TrimPrefix(event, "bts:") + switch event { + case "heartbeat": return nil - case "bts:subscribe", "bts:subscription_succeeded": - if b.Verbose { - log.Debugf(log.ExchangeSys, "%v - Websocket subscription acknowledgement", b.Name) - } - case "bts:unsubscribe": - if b.Verbose { - log.Debugf(log.ExchangeSys, "%v - Websocket unsubscribe acknowledgement", b.Name) - } - case "bts:request_reconnect": - if b.Verbose { - log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.Name) - } + case "subscription_succeeded", "unsubscription_succeeded": + return b.handleWSSubscription(event, respRaw) + case "data": + return b.handleWSOrderbook(respRaw) + case "trade": + return b.handleWSTrade(respRaw) + case "order_created", "order_deleted", "order_changed": + return b.handleWSOrder(event, respRaw) + case "request_reconnect": go func() { - err := b.Websocket.Shutdown() - if err != nil { + if err := b.Websocket.Shutdown(); err != nil { // Connection monitor will reconnect log.Errorf(log.WebsocketMgr, "%s failed to shutdown websocket: %v", b.Name, err) } - }() // Connection monitor will reconnect - case "data": - if err := b.handleWSOrderbook(wsResponse, respRaw); err != nil { - return err - } - case "trade": - if err := b.handleWSTrade(wsResponse, respRaw); err != nil { - return err - } - case "order_created", "order_deleted", "order_changed": - // Only process MyOrders, not orders from the LiveOrder channel - if wsResponse.channelType == bitstampAPIWSMyOrders { - if err := b.handleWSOrder(wsResponse, respRaw); err != nil { - return err - } - } + }() default: b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)} } return nil } -func (b *Bitstamp) handleWSOrderbook(wsResp *websocketResponse, msg []byte) error { - if wsResp.pair.IsEmpty() { - return errWSPairParsingError - } - - wsOrderBookTemp := websocketOrderBookResponse{} - err := json.Unmarshal(msg, &wsOrderBookTemp) +func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error { + channel, err := jsonparser.GetUnsafeString(respRaw, "channel") if err != nil { - return err + return fmt.Errorf("%w `channel`: %w", errParsingWSField, err) } - - return b.wsUpdateOrderbook(&wsOrderBookTemp.Data, wsResp.pair, asset.Spot) + event = strings.TrimSuffix(event, "scription_succeeded") + if !b.Websocket.Match.IncomingWithData(event+":"+channel, respRaw) { + return fmt.Errorf("%w: %s", stream.ErrNoMessageListener, event+":"+channel) + } + return nil } -func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error { +func (b *Bitstamp) handleWSTrade(msg []byte) error { if !b.IsSaveTradeDataEnabled() { return nil } - if wsResp.pair.IsEmpty() { - return errWSPairParsingError + _, p, err := b.parseChannelName(msg) + if err != nil { + return err } wsTradeTemp := websocketTradeResponse{} @@ -173,7 +162,7 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error { } return trade.AddTradesToBuffer(b.Name, trade.Data{ Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0), - CurrencyPair: wsResp.pair, + CurrencyPair: p, AssetType: asset.Spot, Exchange: b.Name, Price: wsTradeTemp.Data.Price, @@ -183,7 +172,15 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error { }) } -func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error { +func (b *Bitstamp) handleWSOrder(event string, msg []byte) error { + channel, p, err := b.parseChannelName(msg) + if err != nil { + return err + } + if channel != bitstampAPIWSMyOrders { + return nil // Only process MyOrders, not orders from the LiveOrder channel + } + r := &websocketOrderResponse{} if err := json.Unmarshal(msg, &r); err != nil { return err @@ -194,7 +191,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error { } var status order.Status - switch wsResp.Event { + switch event { case "order_created": status = order.New case "order_changed": @@ -224,7 +221,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error { Status: status, AssetType: asset.Spot, Date: r.Order.Microtimestamp.Time(), - Pair: wsResp.pair, + Pair: p, } b.Websocket.DataHandler <- d @@ -232,101 +229,78 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error { return nil } -func (b *Bitstamp) generateDefaultSubscriptions() (subscription.List, error) { - enabledCurrencies, err := b.GetEnabledPairs(asset.Spot) +func (b *Bitstamp) generateSubscriptions() (subscription.List, error) { + return b.Features.Subscriptions.ExpandTemplates(b) +} + +// GetSubscriptionTemplate returns a subscription channel template +func (b *Bitstamp) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText) +} + +// Subscribe sends a websocket message to receive data from a list of channels +func (b *Bitstamp) Subscribe(subs subscription.List) error { + return b.manageSubsWithCreds(subs, "sub") +} + +// Unsubscribe sends a websocket message to stop receiving data from a list of channels +func (b *Bitstamp) Unsubscribe(subs subscription.List) error { + return b.manageSubsWithCreds(subs, "unsub") +} + +func (b *Bitstamp) manageSubsWithCreds(subs subscription.List, op string) error { + var errs error + var creds *WebsocketAuthResponse + if authed := subs.Private(); len(authed) > 0 { + creds, errs = b.FetchWSAuth(context.TODO()) + } + return common.AppendError(errs, b.ParallelChanOp(subs, func(s subscription.List) error { return b.manageSubs(s, op, creds) }, 1)) +} + +func (b *Bitstamp) manageSubs(subs subscription.List, op string, creds *WebsocketAuthResponse) error { + subs, errs := subs.ExpandTemplates(b) + for _, s := range subs { + req := websocketEventRequest{ + Event: "bts:" + op + "scribe", + Data: websocketData{ + Channel: s.QualifiedChannel, + }, + } + if s.Authenticated { + if creds == nil { + return request.ErrAuthRequestFailed + } + req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(creds.UserID)) + req.Data.Auth = creds.Token + } + _, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, op+":"+req.Data.Channel, req) + if err == nil { + if op == "sub" { + err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s) + } else { + err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s) + } + } + if err != nil { + errs = common.AppendError(errs, err) + } + } + + return errs +} + +func (b *Bitstamp) handleWSOrderbook(msg []byte) error { + _, p, err := b.parseChannelName(msg) if err != nil { - return nil, err - } - var subscriptions subscription.List - for i := range enabledCurrencies { - p, err := b.FormatExchangeCurrency(enabledCurrencies[i], asset.Spot) - if err != nil { - return nil, err - } - for j := range defaultSubChannels { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: defaultSubChannels[j] + "_" + p.String(), - Asset: asset.Spot, - Pairs: currency.Pairs{p}, - }) - } - if b.Websocket.CanUseAuthenticatedEndpoints() { - for j := range defaultAuthSubChannels { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: defaultAuthSubChannels[j] + "_" + p.String(), - Asset: asset.Spot, - Pairs: currency.Pairs{p}, - Params: map[string]interface{}{ - "auth": struct{}{}, - }, - }) - } - } - } - return subscriptions, nil -} - -// Subscribe sends a websocket message to receive data from the channel -func (b *Bitstamp) Subscribe(channelsToSubscribe subscription.List) error { - var errs error - var auth *WebsocketAuthResponse - - for i := range channelsToSubscribe { - if _, ok := channelsToSubscribe[i].Params["auth"]; ok { - var err error - auth, err = b.FetchWSAuth(context.TODO()) - if err != nil { - errs = common.AppendError(errs, err) - } - break - } + return err } - for _, s := range channelsToSubscribe { - req := websocketEventRequest{ - Event: "bts:subscribe", - Data: websocketData{ - Channel: s.Channel, - }, - } - if _, ok := s.Params["auth"]; ok && auth != nil { - req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(auth.UserID)) - req.Data.Auth = auth.Token - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s) - } - if err != nil { - errs = common.AppendError(errs, err) - } + wsOrderBookResp := websocketOrderBookResponse{} + if err := json.Unmarshal(msg, &wsOrderBookResp); err != nil { + return err } + update := &wsOrderBookResp.Data - return errs -} - -// Unsubscribe sends a websocket message to stop receiving data from the channel -func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe subscription.List) error { - var errs error - for _, s := range channelsToUnsubscribe { - req := websocketEventRequest{ - Event: "bts:unsubscribe", - Data: websocketData{ - Channel: s.Channel, - }, - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s) - } - if err != nil { - errs = common.AppendError(errs, err) - } - } - return errs -} - -func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair, assetType asset.Item) error { if len(update.Asks) == 0 && len(update.Bids) == 0 { return errors.New("no orderbook data") } @@ -336,7 +310,7 @@ func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair Asks: make(orderbook.Tranches, len(update.Asks)), Pair: p, LastUpdated: time.UnixMicro(update.Microtimestamp), - Asset: assetType, + Asset: asset.Spot, Exchange: b.Name, VerifyOrderbook: b.CanVerifyOrderbook, } @@ -427,35 +401,58 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err return resp, nil } -// parseChannel splits the ws response channel and sets the channel type and pair -func (b *Bitstamp) parseChannelName(r *websocketResponse) error { - if r.Channel == "" { - return nil +// parseChannelName splits the ws message channel and returns the channel name and pair +func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) { + channel, err := jsonparser.GetUnsafeString(respRaw, "channel") + if err != nil { + return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err) } - chanName := r.Channel - authParts := strings.Split(r.Channel, "-") + authParts := strings.Split(channel, "-") switch len(authParts) { case 1: // Not an auth channel case 3: - chanName = authParts[1] + channel = authParts[1] default: - return fmt.Errorf("channel name does not contain exactly 0 or 2 hyphens: %v", r.Channel) + return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelHyphens, channel) } - parts := strings.Split(chanName, "_") + parts := strings.Split(channel, "_") if len(parts) != 3 { - return fmt.Errorf("%w: channel name does not contain exactly 2 underscores: %v", errWSPairParsingError, r.Channel) + return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelUnderscores, channel) } - r.channelType = parts[0] + "_" + parts[1] - symbol := parts[2] - enabledPairs, err := b.GetEnabledPairs(asset.Spot) - if err == nil { - r.pair, err = enabledPairs.DeriveFrom(symbol) + if err != nil { + return "", currency.EMPTYPAIR, err } - return err + pair, err := enabledPairs.DeriveFrom(parts[2]) + if err != nil { + return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errParsingWSPair, err) + } + + return parts[0] + "_" + parts[1], pair, nil } + +// channelName converts global channel Names to exchange specific ones +// panics if name is not supported, so should be called within a recover chain +func channelName(s *subscription.Subscription) string { + if s, ok := subscriptionNames[s.Channel]; ok { + return s + } + panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel)) +} + +const subTplText = ` +{{ range $asset, $pairs := $.AssetPairs }} + {{- with $name := channelName $.S }} + {{- range $p := $pairs -}} + {{- $name -}} _ {{- $p -}} + {{ $.PairSeparator }} + {{- end -}} + {{- end }} + {{ $.AssetSeparator }} +{{- end }} +` diff --git a/exchanges/bitstamp/bitstamp_wrapper.go b/exchanges/bitstamp/bitstamp_wrapper.go index f8ff0632..36d02f80 100644 --- a/exchanges/bitstamp/bitstamp_wrapper.go +++ b/exchanges/bitstamp/bitstamp_wrapper.go @@ -107,6 +107,7 @@ func (b *Bitstamp) SetDefaults() { GlobalResultLimit: 1000, }, }, + Subscriptions: defaultSubscriptions.Clone(), } b.Requester, err = request.New(b.Name, @@ -156,7 +157,7 @@ func (b *Bitstamp) Setup(exch *config.Exchange) error { Connector: b.WsConnect, Subscriber: b.Subscribe, Unsubscriber: b.Unsubscribe, - GenerateSubscriptions: b.generateDefaultSubscriptions, + GenerateSubscriptions: b.generateSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, }) if err != nil { diff --git a/testdata/configtest.json b/testdata/configtest.json index 51de85a8..bfe19e2d 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -1008,7 +1008,7 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": false + "websocketAPI": true } }, "bankAccounts": [