BTCMarkets: Add subscription configuration (#1624)

* Subscription: Add List.GroupByPairs

* BTCMarkets: Add subscription conf

* BTCMarkets: Upgrade test config
This commit is contained in:
Gareth Kirwan
2024-12-05 04:50:10 +01:00
committed by GitHub
parent 6b79e4c0b4
commit 0c4b070ebc
8 changed files with 139 additions and 50 deletions

View File

@@ -19,7 +19,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
)
var b = &BTCMarkets{}
@@ -1134,3 +1136,33 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
b := new(BTCMarkets)
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
p := currency.Pairs{currency.NewPairWithDelimiter("BTC", "USD", "_"), currency.NewPairWithDelimiter("ETH", "BTC", "_")}
require.NoError(t, b.CurrencyPairs.StorePairs(asset.Spot, p, false))
require.NoError(t, b.CurrencyPairs.StorePairs(asset.Spot, p, true))
b.Websocket.SetCanUseAuthenticatedEndpoints(true)
require.True(t, b.Websocket.CanUseAuthenticatedEndpoints(), "CanUseAuthenticatedEndpoints must return true")
subs, err := b.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
pairs, err := b.GetEnabledPairs(asset.Spot)
require.NoError(t, err, "GetEnabledPairs must not error")
exp := subscription.List{}
for _, baseSub := range b.Features.Subscriptions {
s := baseSub.Clone()
if !s.Authenticated && s.Channel != subscription.HeartbeatChannel {
s.Pairs = pairs
}
s.QualifiedChannel = channelName(s)
exp = append(exp, s)
}
testsubs.EqualLists(t, exp, subs)
assert.PanicsWithError(t,
"subscription channel not supported: wibble",
func() { channelName(&subscription.Subscription{Channel: "wibble"}) },
"should panic on invalid channel",
)
}

View File

@@ -9,6 +9,7 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
@@ -33,10 +34,26 @@ const (
var (
errTypeAssertionFailure = errors.New("type assertion failure")
errChecksumFailure = errors.New("crc32 checksum failure")
authChannels = []string{fundChange, heartbeat, orderChange}
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
{Enabled: true, Channel: subscription.HeartbeatChannel},
}
var subscriptionNames = map[string]string{
subscription.OrderbookChannel: wsOrderbookUpdate,
subscription.TickerChannel: tick,
subscription.AllTradesChannel: tradeEndPoint,
subscription.MyOrdersChannel: orderChange,
subscription.MyAccountChannel: fundChange,
subscription.HeartbeatChannel: heartbeat,
}
// WsConnect connects to a websocket feed
func (b *BTCMarkets) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
@@ -326,29 +343,13 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error {
return nil
}
func (b *BTCMarkets) generateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsOrderbookUpdate, tick, tradeEndPoint}
enabledCurrencies, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions subscription.List
for i := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[i],
Pairs: enabledCurrencies,
Asset: asset.Spot,
})
}
func (b *BTCMarkets) generateSubscriptions() (subscription.List, error) {
return b.Features.Subscriptions.ExpandTemplates(b)
}
if b.Websocket.CanUseAuthenticatedEndpoints() {
for i := range authChannels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: authChannels[i],
})
}
}
return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (b *BTCMarkets) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from the channel
@@ -358,13 +359,17 @@ func (b *BTCMarkets) Subscribe(subs subscription.List) error {
}
var errs error
for _, s := range subs {
if baseReq.Key == "" && common.StringSliceContains(authChannels, s.Channel) {
if err := b.authWsSubscibeReq(baseReq); err != nil {
return err
if authed := subs.Private(); len(authed) > 0 {
if err := b.signWsReq(baseReq); err != nil {
errs = err
for _, s := range authed {
errs = common.AppendError(errs, fmt.Errorf("%w: %s", request.ErrAuthRequestFailed, s))
}
subs = subs.Public()
}
}
for _, batch := range subs.GroupByPairs() {
if baseReq.MessageType == subscribe && len(b.Websocket.GetSubscriptions()) != 0 {
baseReq.MessageType = addSubscription // After first *successful* subscription API requires addSubscription
baseReq.ClientType = clientType // Note: Only addSubscription requires/accepts clientType
@@ -372,12 +377,15 @@ func (b *BTCMarkets) Subscribe(subs subscription.List) error {
r := baseReq
r.Channels = []string{s.Channel}
r.MarketIDs = s.Pairs.Strings()
r.MarketIDs = batch[0].Pairs.Strings()
r.Channels = make([]string, len(batch))
for i, s := range batch {
r.Channels[i] = s.QualifiedChannel
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r)
if err == nil {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, batch...)
}
if err != nil {
errs = common.AppendError(errs, err)
@@ -387,7 +395,7 @@ func (b *BTCMarkets) Subscribe(subs subscription.List) error {
return errs
}
func (b *BTCMarkets) authWsSubscibeReq(r *WsSubscribe) error {
func (b *BTCMarkets) signWsReq(r *WsSubscribe) error {
creds, err := b.GetCredentials(context.TODO())
if err != nil {
return err
@@ -471,11 +479,24 @@ func concat(liquidity orderbook.Tranches) string {
return c
}
// trim turns value into string, removes the decimal point and all the leading
// zeros.
// trim turns value into string, removes the decimal point and all the leading zeros
func trim(value float64) string {
valstr := strconv.FormatFloat(value, 'f', -1, 64)
valstr = strings.ReplaceAll(valstr, ".", "")
valstr = strings.TrimLeft(valstr, "0")
return valstr
}
func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}
const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{ $.AssetSeparator }}
{{- end }}
`

View File

@@ -111,6 +111,7 @@ func (b *BTCMarkets) SetDefaults() {
GlobalResultLimit: 1000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
b.Requester, err = request.New(b.Name,
@@ -160,7 +161,7 @@ func (b *BTCMarkets) Setup(exch *config.Exchange) error {
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
GenerateSubscriptions: b.generateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,

View File

@@ -46,6 +46,22 @@ func (l List) GroupPairs() (n List) {
return s.List()
}
// GroupByPairs groups subscriptions which have the same Pairs
func (l List) GroupByPairs() []List {
n := []List{}
outer:
for _, a := range l {
for i, b := range n {
if a.Pairs.Equal(b[0].Pairs) { // Note: b is guaranteed to have 1 element by the append(n) below
n[i] = append(n[i], a)
continue outer
}
}
n = append(n, List{a})
}
return n
}
// Clone returns a deep clone of the List
func (l List) Clone() List {
n := make(List, len(l))

View File

@@ -26,7 +26,7 @@ func TestListStrings(t *testing.T) {
},
}
exp := []string{"orderbook ETH/USDC", "ticker spot ETH/USDC,BTC/USDT"}
assert.ElementsMatch(t, exp, l.Strings(), "String must return correct sorted list")
assert.ElementsMatch(t, exp, l.Strings(), "String should return correct sorted list")
}
// TestQualifiedChannels exercises List.QualifiedChannels()
@@ -63,7 +63,26 @@ func TestListGroupPairs(t *testing.T) {
assert.Len(t, l, 5, "Orig list should not be changed")
assert.Len(t, n, 2, "New list should be grouped")
exp := []string{"ticker spot ETH/USDC,BTC/USDT", "orderbook spot ETH/USDC,BTC/USDT"}
assert.ElementsMatch(t, exp, n.Strings(), "String must return correct sorted list")
assert.ElementsMatch(t, exp, n.Strings(), "String should return correct sorted list")
}
// TestListGroupByPairs exercises List.GroupByPairs()
func TestListGroupByPairs(t *testing.T) {
t.Parallel()
l := List{
{Asset: asset.Spot, Channel: TickerChannel, Pairs: currency.Pairs{ethusdcPair, btcusdtPair}},
{Asset: asset.Spot, Channel: OrderbookChannel, Pairs: currency.Pairs{ethusdcPair, btcusdtPair}},
{Asset: asset.Spot, Channel: CandlesChannel, Pairs: currency.Pairs{ltcusdcPair, btcusdtPair}},
}
n := l.GroupByPairs()
assert.Len(t, l, 3, "Orig list should not be changed")
require.Len(t, n, 2, "New list must be grouped")
require.Len(t, n[0], 2, "New list must be grouped")
require.Len(t, n[1], 1, "New list must be grouped")
exp := []string{"ticker spot ETH/USDC,BTC/USDT", "orderbook spot ETH/USDC,BTC/USDT"}
assert.ElementsMatch(t, exp, n[0].Strings(), "String should return correct sorted list")
exp = []string{"candles spot LTC/USDC,BTC/USDT"}
assert.ElementsMatch(t, exp, n[1].Strings(), "String should return correct sorted list")
}
// TestListSetStates exercises List.SetState()
@@ -99,9 +118,9 @@ func TestListClone(t *testing.T) {
t.Parallel()
l := List{{Channel: TickerChannel}, {Channel: OrderbookChannel}}
n := l.Clone()
assert.NotSame(t, &n, &l, "Slices must not be the same")
assert.NotSame(t, &n, &l, "Slices should not be the same")
require.NotEmpty(t, n, "List must not be empty")
assert.NotSame(t, n[0], l[0], "Subscriptions must be cloned")
assert.NotSame(t, n[0], l[0], "Subscriptions should be cloned")
assert.Equal(t, n[0], l[0], "Subscriptions should be equal")
l[0].Interval = kline.OneHour
assert.NotEqual(t, n[0], l[0], "Subscriptions should be cloned")

View File

@@ -33,6 +33,7 @@ const (
MyOrdersChannel = "myOrders"
MyWalletChannel = "myWallet"
MyAccountChannel = "myAccount"
HeartbeatChannel = "heartbeat"
)
// Public errors

View File

@@ -3,7 +3,6 @@ package exchange
import (
"bufio"
"context"
"errors"
"fmt"
"log"
"net/http"
@@ -38,11 +37,8 @@ func Setup(e exchange.IBotExchange) error {
if err != nil {
return fmt.Errorf("LoadConfig() error: %w", err)
}
parts := strings.Split(fmt.Sprintf("%T", e), ".")
if len(parts) != 2 {
return errors.New("unexpected parts splitting exchange type name")
}
eName := parts[1]
e.SetDefaults()
eName := e.GetName()
exchConf, err := cfg.GetExchangeConfig(eName)
if err != nil {
return fmt.Errorf("GetExchangeConfig(`%s`) error: %w", eName, err)

View File

@@ -249,7 +249,6 @@
"websocketResponseCheckTimeout": 30000000,
"websocketResponseMaxLimit": 7000000000,
"websocketTrafficTimeout": 30000000000,
"websocketOrderbookBufferLimit": 5,
"baseCurrencies": "AUD",
"currencyPairs": {
"requestFormat": {
@@ -261,11 +260,9 @@
"delimiter": "-"
},
"useGlobalFormat": true,
"assetTypes": [
"spot"
],
"pairs": {
"spot": {
"assetEnabled": true,
"enabled": "BTC-AUD",
"available": "BTC-AUD,LTC-AUD,LTC-BTC,ETH-BTC,ETH-AUD,ETC-AUD,ETC-BTC,XRP-AUD,XRP-BTC,POWR-AUD,POWR-BTC,OMG-AUD,OMG-BTC,BCHABC-AUD,BCHABC-BTC,BCHSV-AUD,BCHSV-BTC,GNT-AUD,GNT-BTC,BAT-AUD,BAT-BTC,XLM-AUD,XLM-BTC"
}
@@ -317,7 +314,13 @@
"iban": "",
"supportedCurrencies": ""
}
]
],
"orderbook": {
"verificationBypass": false,
"websocketBufferLimit": 5,
"websocketBufferEnabled": false,
"publishPeriod": 10000000000
}
},
{
"name": "BTSE",