BTSE: Add subscription configuration (#1616)

* BTSE: Add subscription configuration

* BTSE: Upgrade test config
This commit is contained in:
Gareth Kirwan
2024-11-12 04:29:58 +01:00
committed by GitHub
parent 0c0ae7bcaa
commit 8f9ebcb521
4 changed files with 98 additions and 47 deletions

View File

@@ -22,8 +22,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
)
// Please supply your own keys here to do better tests
@@ -749,3 +751,23 @@ func TestStripExponent(t *testing.T) {
_, err = (&MarketPair{Symbol: "M_BTC_ETH"}).StripExponent()
assert.ErrorIs(t, err, errInvalidPairSymbol, "Should error on a symbol with too many underscores")
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
b := new(BTSE)
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
exp := subscription.List{
{Channel: subscription.AllTradesChannel, QualifiedChannel: "tradeHistory:BTC-USD", Asset: asset.Spot, Pairs: currency.Pairs{spotPair}},
{Channel: subscription.MyTradesChannel, QualifiedChannel: "notificationApi"},
}
b.Websocket.SetCanUseAuthenticatedEndpoints(true)
subs, err := b.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
testsubs.EqualLists(t, exp, subs)
_, err = subscription.List{{Channel: subscription.OrderbookChannel}}.ExpandTemplates(b)
assert.ErrorContains(t, err, "Channel not supported", "Sub template must error on unsupported channels")
}

View File

@@ -4,10 +4,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
@@ -28,6 +28,16 @@ const (
btseWebsocketTimer = time.Second * 57
)
var subscriptionNames = map[string]string{
subscription.MyTradesChannel: "notificationApi",
subscription.AllTradesChannel: "tradeHistory",
}
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true},
}
// WsConnect connects the websocket client
func (b *BTSE) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
@@ -64,7 +74,7 @@ func (b *BTSE) WsAuthenticate(ctx context.Context) error {
return err
}
nonce := strconv.FormatInt(time.Now().UnixMilli(), 10)
path := "/spotWS" + nonce
path := "/ws/spot" + nonce
hmac, err := crypto.GetHMAC(crypto.HashSHA512_384,
[]byte((path)),
@@ -361,56 +371,70 @@ func (b *BTSE) orderbookFilter(price, amount float64) bool {
return price == 0 || amount == 0
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *BTSE) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{"orderBookL2Api:%s_0", "tradeHistory:%s"}
pairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions subscription.List
if b.Websocket.CanUseAuthenticatedEndpoints() {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "notificationApi",
})
}
for i := range channels {
for j := range pairs {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: fmt.Sprintf(channels[i], pairs[j]),
Pairs: currency.Pairs{pairs[j]},
Asset: asset.Spot,
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (b *BTSE) generateSubscriptions() (subscription.List, error) {
return b.Features.Subscriptions.ExpandTemplates(b)
}
// Subscribe sends a websocket message to receive data from the channel
func (b *BTSE) Subscribe(channelsToSubscribe subscription.List) error {
var sub wsSub
sub.Operation = "subscribe"
for i := range channelsToSubscribe {
sub.Arguments = append(sub.Arguments, channelsToSubscribe[i].Channel)
// GetSubscriptionTemplate returns a subscription channel template
func (b *BTSE) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"isSymbolChannel": isSymbolChannel,
}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from a list of channels
func (b *BTSE) Subscribe(subs subscription.List) error {
req := wsSub{Operation: "subscribe"}
for _, s := range subs {
req.Arguments = append(req.Arguments, s.QualifiedChannel)
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, sub)
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
if err == nil {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, channelsToSubscribe...)
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, subs...)
}
return err
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *BTSE) Unsubscribe(channelsToUnsubscribe subscription.List) error {
var unSub wsSub
unSub.Operation = "unsubscribe"
for i := range channelsToUnsubscribe {
unSub.Arguments = append(unSub.Arguments,
channelsToUnsubscribe[i].Channel)
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (b *BTSE) Unsubscribe(subs subscription.List) error {
req := wsSub{Operation: "unsubscribe"}
for _, s := range subs {
req.Arguments = append(req.Arguments, s.QualifiedChannel)
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, unSub)
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
if err == nil {
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, channelsToUnsubscribe...)
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...)
}
return err
}
// channelName returns the correct channel name for the asset
func channelName(s *subscription.Subscription) string {
if name, ok := subscriptionNames[s.Channel]; ok {
return name
}
panic("Channel not supported: " + s.Channel)
}
// isSymbolChannel returns if the channel expects receive a symbol
func isSymbolChannel(s *subscription.Subscription) bool {
return s.Channel != subscription.MyTradesChannel
}
const subTplText = `
{{- with $name := channelName $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- if isSymbolChannel $.S }}
{{- range $p := $pairs -}}
{{- $name -}} : {{- $p -}}
{{- $.PairSeparator }}
{{- end }}
{{- else -}}
{{ $name }}
{{- end }}
{{- $.AssetSeparator }}
{{- end }}
{{- end }}
`

View File

@@ -140,6 +140,7 @@ func (b *BTSE) SetDefaults() {
GlobalResultLimit: 300,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
b.Requester, err = request.New(b.Name,
@@ -190,7 +191,7 @@ func (b *BTSE) Setup(exch *config.Exchange) error {
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
GenerateSubscriptions: b.generateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
})
if err != nil {

View File

@@ -339,11 +339,9 @@
"delimiter": "-"
},
"useGlobalFormat": true,
"assetTypes": [
"spot"
],
"pairs": {
"spot": {
"assetEnabled": true,
"enabled": "BTC-USD",
"available": "BTC-CNY,BTC-EUR,BTC-GBP,BTC-HKD,BTC-JPY,BTC-SGD,BTC-USD,ETH-CNY,ETH-EUR,ETH-GBP,ETH-HKD,ETH-JPY,ETH-SGD,ETH-USD,LTC-CNY,LTC-EUR,LTC-GBP,LTC-HKD,LTC-JPY,LTC-SGD,LTC-USD,USDT-CNY,USDT-EUR,USDT-GBP,USDT-HKD,USDT-JPY,USDT-SGD,USDT-USD,XMR-CNY,XMR-EUR,XMR-GBP,XMR-HKD,XMR-JPY,XMR-SGD,XMR-USD"
}
@@ -394,7 +392,13 @@
"iban": "",
"supportedCurrencies": ""
}
]
],
"orderbook": {
"verificationBypass": false,
"websocketBufferLimit": 5,
"websocketBufferEnabled": false,
"publishPeriod": 10000000000
}
},
{
"name": "Binance",