subscriptions: Add templating support and integrate with Binance (#1568)

* Subscriptions: Add List.AssetPairs

* Subscriptions: Add Template and QualifiedChannel

These fields separate the concept of what the channel is from the
qualified resource name

* Subscriptions: Add List.SetStates()

* Subscriptions: Add List.QualifiedChannels

* Subscriptions: Rename testsubs.EqualLists

* Binance: Switch to ExpandTemplates

* Binance: Update ConfigTest format

* Subscriptions: Test Coverage improvements

* Subscriptions: Reenterant List.ExpandTemplates

* Subscriptions: Move templates from subscriptions to exchanges

* Binance: Inline subscription template and improvements
This commit is contained in:
Gareth Kirwan
2024-07-09 12:53:00 +07:00
committed by GitHub
parent 00c5c95468
commit c601575c66
27 changed files with 886 additions and 232 deletions

View File

@@ -21,7 +21,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
)
// Binance is the overarching type across the Binance package
@@ -111,13 +110,6 @@ var (
errEitherLoanOrCollateralAmountsMustBeSet = errors.New("either loan or collateral amounts must be set")
)
var subscriptionNames = map[string]string{
subscription.TickerChannel: "ticker",
subscription.OrderbookChannel: "depth",
subscription.CandlesChannel: "kline",
subscription.AllTradesChannel: "trade",
}
// GetExchangeInfo returns exchange information. Check binance_types for more
// information
func (b *Binance) GetExchangeInfo(ctx context.Context) (ExchangeInfo, error) {

View File

@@ -28,9 +28,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/margin"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -1981,26 +1981,23 @@ func BenchmarkWsHandleData(bb *testing.B) {
func TestSubscribe(t *testing.T) {
t.Parallel()
b := b
channels := subscription.List{
{Channel: "btcusdt@ticker"},
{Channel: "btcusdt@trade"},
}
b := new(Binance) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
channels, err := b.generateSubscriptions() // Note: We grab this before it's overwritten by MockWsInstance below
require.NoError(t, err, "generateSubscriptions must not error")
if mockTests {
exp := []string{"btcusdt@depth@100ms", "btcusdt@kline_1m", "btcusdt@ticker", "btcusdt@trade", "dogeusdt@depth@100ms", "dogeusdt@kline_1m", "dogeusdt@ticker", "dogeusdt@trade"}
mock := func(msg []byte, w *websocket.Conn) error {
var req WsPayload
err := json.Unmarshal(msg, &req)
require.NoError(t, err, "Unmarshal should not error")
require.Len(t, req.Params, len(channels), "Params should only have 2 channel") // Failure might mean mockWSInstance default Subs is not empty
assert.Equal(t, req.Params[0], channels[0].Channel, "Channel name should be correct")
assert.Equal(t, req.Params[1], channels[1].Channel, "Channel name should be correct")
require.NoError(t, json.Unmarshal(msg, &req), "Unmarshal should not error")
require.ElementsMatch(t, req.Params, exp, "Params should have correct channels")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":null,"id":%d}`, req.ID)))
}
b = testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock))
} else {
testexch.SetupWs(t, b)
}
err := b.Subscribe(channels)
err = b.Subscribe(channels)
require.NoError(t, err, "Subscribe should not error")
err = b.Unsubscribe(channels)
require.NoError(t, err, "Unsubscribe should not error")
@@ -2019,7 +2016,6 @@ func TestSubscribeBadResp(t *testing.T) {
}
b := testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
err := b.Subscribe(channels)
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Subscribe should error ErrSubscriptionFailure")
assert.ErrorIs(t, err, errUnknownError, "Subscribe should error errUnknownError")
assert.ErrorContains(t, err, "carrots", "Subscribe should error containing the carrots")
}
@@ -2434,61 +2430,42 @@ func TestSeedLocalCache(t *testing.T) {
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
expected := subscription.List{}
exp := subscription.List{}
pairs, err := b.GetEnabledPairs(asset.Spot)
assert.NoError(t, err, "GetEnabledPairs should not error")
wsFmt := currency.PairFormat{Uppercase: false, Delimiter: ""}
baseExp := subscription.List{
{Channel: subscription.CandlesChannel, QualifiedChannel: "kline_1m", Asset: asset.Spot, Interval: kline.OneMin},
{Channel: subscription.OrderbookChannel, QualifiedChannel: "depth@100ms", Asset: asset.Spot, Interval: kline.HundredMilliseconds},
{Channel: subscription.TickerChannel, QualifiedChannel: "ticker", Asset: asset.Spot},
{Channel: subscription.AllTradesChannel, QualifiedChannel: "trade", Asset: asset.Spot},
}
for _, p := range pairs {
for _, c := range []string{"kline_1m", "depth@100ms", "ticker", "trade"} {
expected = append(expected, &subscription.Subscription{
Channel: p.Format(currency.PairFormat{Delimiter: "", Uppercase: false}).String() + "@" + c,
Pairs: currency.Pairs{p},
Asset: asset.Spot,
})
for _, baseSub := range baseExp {
sub := baseSub.Clone()
sub.Pairs = currency.Pairs{p}
sub.QualifiedChannel = wsFmt.Format(p) + "@" + sub.QualifiedChannel
exp = append(exp, sub)
}
}
subs, err := b.generateSubscriptions()
assert.NoError(t, err, "GenerateSubscriptions should not error")
if assert.Len(t, subs, len(expected), "Should have the correct number of subs") {
assert.ElementsMatch(t, subs, expected, "Should get the correct subscriptions")
}
require.NoError(t, err, "generateSubscriptions should not error")
testsubs.EqualLists(t, exp, subs)
}
func TestChannelName(t *testing.T) {
_, err := channelName(&subscription.Subscription{Channel: "Wobbegongs"})
assert.ErrorIs(t, err, stream.ErrSubscriptionNotSupported, "Invalid channel name should return ErrSubNotSupported")
assert.ErrorContains(t, err, "Wobbegong", "Invalid channel name error should contain at least one shark")
// TestFormatChannelInterval exercises formatChannelInterval
func TestFormatChannelInterval(t *testing.T) {
t.Parallel()
assert.Equal(t, "@1000ms", formatChannelInterval(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.ThousandMilliseconds}), "1s should format correctly for Orderbook")
assert.Equal(t, "@1m", formatChannelInterval(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.OneMin}), "Orderbook should format correctly")
assert.Equal(t, "_15m", formatChannelInterval(&subscription.Subscription{Channel: subscription.CandlesChannel, Interval: kline.FifteenMin}), "Candles should format correctly")
}
n, err := channelName(&subscription.Subscription{Channel: subscription.TickerChannel})
assert.NoError(t, err, "Ticker channel should not error")
assert.Equal(t, "ticker", n, "Ticker channel name should be correct")
n, err = channelName(&subscription.Subscription{Channel: subscription.AllTradesChannel})
assert.NoError(t, err, "AllTrades channel should not error")
assert.Equal(t, "trade", n, "Trades channel name should be correct")
n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@0s", n, "Orderbook with no update rate should return 0s") // It's not channelName's job to supply defaults
n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.Interval(time.Second)})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@1000ms", n, "Orderbook with 1s update rate should 1000ms")
n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@100ms", n, "Orderbook with update rate should return it in the depth channel name")
n, err = channelName(&subscription.Subscription{Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds, Levels: 5})
assert.NoError(t, err, "Orderbook channel should not error")
assert.Equal(t, "depth@5@100ms", n, "Orderbook with Level should return it in the depth channel name")
n, err = channelName(&subscription.Subscription{Channel: subscription.CandlesChannel, Interval: kline.FifteenMin})
assert.NoError(t, err, "Candles channel should not error")
assert.Equal(t, "kline_15m", n, "Candles with interval should return it in the depth channel name")
n, err = channelName(&subscription.Subscription{Channel: subscription.CandlesChannel})
assert.NoError(t, err, "Candles channel should not error")
assert.Equal(t, "kline_0s", n, "Candles with no interval should return 0s") // It's not channelName's job to supply defaults
// TestFormatChannelLevels exercises formatChannelLevels
func TestFormatChannelLevels(t *testing.T) {
t.Parallel()
assert.Equal(t, "10", formatChannelLevels(&subscription.Subscription{Channel: subscription.OrderbookChannel, Levels: 10}), "Levels should format correctly")
assert.Empty(t, formatChannelLevels(&subscription.Subscription{Channel: subscription.OrderbookChannel, Levels: 0}), "Levels should format correctly")
}
var websocketDepthUpdate = []byte(`{"E":1608001030784,"U":7145637266,"a":[["19455.19000000","0.59490200"],["19455.37000000","0.00000000"],["19456.11000000","0.00000000"],["19456.16000000","0.00000000"],["19458.67000000","0.06400000"],["19460.73000000","0.05139800"],["19461.43000000","0.00000000"],["19464.59000000","0.00000000"],["19466.03000000","0.45000000"],["19466.36000000","0.00000000"],["19508.67000000","0.00000000"],["19572.96000000","0.00217200"],["24386.00000000","0.00256600"]],"b":[["19455.18000000","2.94649200"],["19453.15000000","0.01233600"],["19451.18000000","0.00000000"],["19446.85000000","0.11427900"],["19446.74000000","0.00000000"],["19446.73000000","0.00000000"],["19444.45000000","0.14937800"],["19426.75000000","0.00000000"],["19416.36000000","0.36052100"]],"e":"depthUpdate","s":"BTCUSDT","u":7145637297}`)

View File

@@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/buger/jsonparser"
@@ -503,130 +504,82 @@ func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) {
return false, err
}
// generateSubscriptions generates the default subscription set
func (b *Binance) generateSubscriptions() (subscription.List, error) {
var channels = make([]string, 0, len(b.Features.Subscriptions))
for i := range b.Features.Subscriptions {
name, err := channelName(b.Features.Subscriptions[i])
if err != nil {
return nil, err
}
channels = append(channels, name)
}
var subscriptions subscription.List
pairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for y := range pairs {
for z := range channels {
lp := pairs[y].Lower()
lp.Delimiter = ""
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: lp.String() + "@" + channels[z],
Pairs: currency.Pairs{pairs[y]},
Asset: asset.Spot,
})
for _, s := range b.Features.Subscriptions {
if s.Asset == asset.Empty {
// Handle backwards compatibility with config without assets, all binance subs are spot
s.Asset = asset.Spot
}
}
return subscriptions, nil
return b.Features.Subscriptions.ExpandTemplates(b)
}
// channelName converts a Subscription Config into binance format channel suffix
func channelName(s *subscription.Subscription) (string, error) {
name, ok := subscriptionNames[s.Channel]
if !ok {
return name, fmt.Errorf("%w: %s", stream.ErrSubscriptionNotSupported, s.Channel)
}
var subTemplate *template.Template
// GetSubscriptionTemplate returns a subscription channel template
func (b *Binance) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
var err error
if subTemplate == nil {
subTemplate, err = template.New("subscriptions.tmpl").
Funcs(template.FuncMap{
"interval": formatChannelInterval,
"levels": formatChannelLevels,
"fmt": currency.EMPTYFORMAT.Format,
}).
Parse(subTplText)
}
return subTemplate, err
}
func formatChannelLevels(s *subscription.Subscription) string {
if s.Levels != 0 {
return strconv.Itoa(s.Levels)
}
return ""
}
func formatChannelInterval(s *subscription.Subscription) string {
switch s.Channel {
case subscription.OrderbookChannel:
if s.Levels != 0 {
name += "@" + strconv.Itoa(s.Levels)
}
if s.Interval.Duration() == time.Second {
name += "@1000ms"
} else {
name += "@" + s.Interval.Short()
return "@1000ms"
}
return "@" + s.Interval.Short()
case subscription.CandlesChannel:
name += "_" + s.Interval.Short()
return "_" + s.Interval.Short()
}
return name, nil
return ""
}
// Subscribe subscribes to a set of channels
func (b *Binance) Subscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.subscribeToChan, 50)
}
// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Binance) subscribeToChan(chans subscription.List) error {
id := b.Websocket.Conn.GenerateMessageID(false)
cNames := make([]string, len(chans))
for i := range chans {
c := chans[i]
cNames[i] = c.Channel
if err := b.Websocket.AddSubscriptions(c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
}
}
req := WsPayload{
Method: wsSubscribeMethod,
Params: cNames,
ID: id,
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
} else if d != jsonparser.Null { // null is the only expected and acceptable response
err = fmt.Errorf("%w: %s", errUnknownError, v)
}
}
if err != nil {
if err2 := b.Websocket.RemoveSubscriptions(chans...); err2 != nil {
err = common.AppendError(err, err2)
}
err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrSubscriptionFailure, err, strings.Join(cNames, ", "))
b.Websocket.DataHandler <- err
} else {
for _, s := range chans {
if sErr := s.SetState(subscription.SubscribedState); sErr != nil {
err = common.AppendError(err, sErr)
}
}
}
return err
return b.ParallelChanOp(channels, func(l subscription.List) error { return b.manageSubs(wsSubscribeMethod, l) }, 50)
}
// Unsubscribe unsubscribes from a set of channels
func (b *Binance) Unsubscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 50)
return b.ParallelChanOp(channels, func(l subscription.List) error { return b.manageSubs(wsUnsubscribeMethod, l) }, 50)
}
// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Binance) unsubscribeFromChan(chans subscription.List) error {
id := b.Websocket.Conn.GenerateMessageID(false)
cNames := make([]string, len(chans))
for i := range chans {
cNames[i] = chans[i].Channel
// manageSubs subscribes or unsubscribes from a list of subscriptions
func (b *Binance) manageSubs(op string, subs subscription.List) error {
if op == wsSubscribeMethod {
if err := b.Websocket.AddSubscriptions(subs...); err != nil { // Note: AddSubscription will set state to subscribing
return err
}
} else {
if err := subs.SetStates(subscription.UnsubscribingState); err != nil {
return err
}
}
req := WsPayload{
Method: wsUnsubscribeMethod,
Params: cNames,
ID: id,
ID: b.Websocket.Conn.GenerateMessageID(false),
Method: op,
Params: subs.QualifiedChannels(),
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(req.ID, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
@@ -636,10 +589,20 @@ func (b *Binance) unsubscribeFromChan(chans subscription.List) error {
}
if err != nil {
err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrUnsubscribeFailure, err, strings.Join(cNames, ", "))
err = fmt.Errorf("%w; Channels: %s", err, strings.Join(subs.QualifiedChannels(), ", "))
b.Websocket.DataHandler <- err
if op == wsSubscribeMethod {
if err2 := b.Websocket.RemoveSubscriptions(subs...); err2 != nil {
err = common.AppendError(err, err2)
}
}
} else {
err = b.Websocket.RemoveSubscriptions(chans...)
if op == wsSubscribeMethod {
err = common.AppendError(err, subs.SetStates(subscription.SubscribedState))
} else {
err = b.Websocket.RemoveSubscriptions(subs...)
}
}
return err
@@ -1051,3 +1014,16 @@ func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error {
state.needsFetchingBook = false
return nil
}
const subTplText = `
{{ range $pair := index $.AssetPairs $.S.Asset }}
{{ fmt $pair -}} @
{{- with $c := $.S.Channel -}}
{{ if eq $c "ticker" -}} ticker
{{ else if eq $c "allTrades" -}} trade
{{ else if eq $c "candles" -}} kline {{- interval $.S }}
{{ else if eq $c "orderbook" -}} depth {{- levels $.S }}{{ interval $.S }}
{{- end }}{{ end }}
{{ $.PairSeparator }}
{{end}}
`

View File

@@ -189,10 +189,10 @@ func (b *Binance) SetDefaults() {
},
},
Subscriptions: subscription.List{
{Enabled: true, Channel: subscription.TickerChannel},
{Enabled: true, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds},
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds},
},
}
@@ -222,16 +222,14 @@ func (b *Binance) SetDefaults() {
// Setup takes in the supplied exchange configuration details and sets params
func (b *Binance) Setup(exch *config.Exchange) error {
err := exch.Validate()
if err != nil {
if err := exch.Validate(); err != nil {
return err
}
if !exch.Enabled {
b.SetEnabled(false)
return nil
}
err = b.SetupDefaults(exch)
if err != nil {
if err := b.SetupDefaults(exch); err != nil {
return err
}
ePoint, err := b.API.Endpoints.GetURL(exchange.WebsocketSpot)