Bitfinex: Add subscription configuration and templating (#1597)

* Bitfinex: Correct comment about R0 OB

* Bitfinex: Test config updates

* Bitfinex: Add missing assets to configtest

* Bitfinex: Rename GenerateDefaultSubscriptions

* Bitfinex: Add Subscription configuration

* Subscriptions: Document panic in templates
This commit is contained in:
Gareth Kirwan
2024-10-18 00:23:15 +02:00
committed by GitHub
parent 9645b7b6e7
commit ba77c9946d
7 changed files with 269 additions and 203 deletions

View File

@@ -104,9 +104,7 @@ const (
bitfinexChecksumFlag = 131072
bitfinexWsSequenceFlag = 65536
// CandlesTimeframeKey configures the timeframe in subscription.Subscription.Params
CandlesTimeframeKey = "_timeframe"
// CandlesPeriodKey configures the aggregated period in subscription.Subscription.Params
// CandlesPeriodKey configures the Candles aggregated period for MarginFunding in subscription.Subscription.Params
CandlesPeriodKey = "_period"
)

File diff suppressed because one or more lines are too long

View File

@@ -11,8 +11,10 @@ import (
"strconv"
"strings"
"sync"
"text/template"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
@@ -20,6 +22,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
@@ -30,6 +33,15 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Spot, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.Margin, Interval: kline.OneMin},
{Enabled: true, Channel: subscription.CandlesChannel, Asset: asset.MarginFunding, Interval: kline.OneMin, Params: map[string]any{CandlesPeriodKey: "p30"}},
{Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.All, Levels: 100, Params: map[string]any{"prec": "R0"}},
}
var comms = make(chan stream.Response)
type checksum struct {
@@ -41,6 +53,13 @@ type checksum struct {
var checksumStore = make(map[int]*checksum)
var cMtx sync.Mutex
var subscriptionNames = map[string]string{
subscription.TickerChannel: wsTicker,
subscription.OrderbookChannel: wsBook,
subscription.CandlesChannel: wsCandles,
subscription.AllTradesChannel: wsTrades,
}
// WsConnect starts a new websocket connection
func (b *Bitfinex) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
@@ -525,35 +544,35 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}
func (b *Bitfinex) handleWSChannelUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if eventType == wsChecksum {
return b.handleWSChecksum(c, d)
return b.handleWSChecksum(s, d)
}
if eventType == wsHeartbeat {
return nil
}
if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
switch c.Channel {
case wsBook:
return b.handleWSBookUpdate(c, d)
case wsCandles:
return b.handleWSCandleUpdate(c, d)
case wsTicker:
return b.handleWSTickerUpdate(c, d)
case wsTrades:
return b.handleWSTradesUpdate(c, eventType, d)
switch s.Channel {
case subscription.OrderbookChannel:
return b.handleWSBookUpdate(s, d)
case subscription.CandlesChannel:
return b.handleWSCandleUpdate(s, d)
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
}
return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel)
return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
}
func (b *Bitfinex) handleWSChecksum(c *subscription.Subscription, d []interface{}) error {
@@ -1669,44 +1688,20 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error {
return nil
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *Bitfinex) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsBook, wsTrades, wsTicker, wsCandles}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (b *Bitfinex) generateSubscriptions() (subscription.List, error) {
return b.Features.Subscriptions.ExpandTemplates(b)
}
var subscriptions subscription.List
assets := b.GetAssetTypes(true)
for i := range assets {
if !b.IsAssetWebsocketSupported(assets[i]) {
continue
}
enabledPairs, err := b.GetEnabledPairs(assets[i])
if err != nil {
return nil, err
}
for j := range channels {
for k := range enabledPairs {
params := make(map[string]interface{})
if channels[j] == wsBook {
params["prec"] = "R0"
params["len"] = "100"
}
if channels[j] == wsCandles && assets[i] == asset.MarginFunding {
params[CandlesPeriodKey] = "30"
}
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[j],
Pairs: currency.Pairs{enabledPairs[k]},
Params: params,
Asset: assets[i],
})
}
}
}
return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (b *Bitfinex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(sprig.FuncMap()).Funcs(template.FuncMap{
"subToMap": subToMap,
"removeSpotFromMargin": func(ap map[asset.Item]currency.Pairs) string {
spotPairs, _ := b.GetEnabledPairs(asset.Spot)
return removeSpotFromMargin(ap, spotPairs)
},
}).Parse(subTplText)
}
// ConfigureWS to send checksums and sequence numbers
@@ -1718,26 +1713,36 @@ func (b *Bitfinex) ConfigureWS() error {
}
// Subscribe sends a websocket message to receive data from channels
func (b *Bitfinex) Subscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.subscribeToChan, 1)
func (b *Bitfinex) Subscribe(subs subscription.List) error {
var err error
if subs, err = subs.ExpandTemplates(b); err != nil {
return err
}
return b.ParallelChanOp(subs, b.subscribeToChan, 1)
}
// Unsubscribe sends a websocket message to stop receiving data from channels
func (b *Bitfinex) Unsubscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1)
func (b *Bitfinex) Unsubscribe(subs subscription.List) error {
var err error
if subs, err = subs.ExpandTemplates(b); err != nil {
return err
}
return b.ParallelChanOp(subs, b.unsubscribeFromChan, 1)
}
// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
func (b *Bitfinex) subscribeToChan(subs subscription.List) error {
if len(subs) != 1 {
return subscription.ErrNotSinglePair
}
c := chans[0]
req, err := subscribeReq(c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
s := subs[0]
req := map[string]any{
"event": "subscribe",
}
if err := json.Unmarshal([]byte(s.QualifiedChannel), &req); err != nil {
return err
}
// subId is a single round-trip identifier that provides linking sub requests to chanIDs
@@ -1747,23 +1752,23 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
if err = b.Websocket.AddSubscriptions(b.Websocket.Conn, c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
s.Key = subID // Note subID string type avoids conflicts with later chanID key
if err := b.Websocket.AddSubscriptions(b.Websocket.Conn, s); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}
// Always remove the temporary subscription keyed by subID
defer func() {
_ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
_ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)
}()
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
return fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}
if err = b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
wErr := fmt.Errorf("%w: Channel: %s Pair: %s", err, s.Channel, s.Pairs)
b.Websocket.DataHandler <- wErr
return wErr
}
@@ -1771,78 +1776,15 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
return nil
}
// subscribeReq returns a map of request params for subscriptions
func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error) {
if c == nil {
return nil, fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
return nil, subscription.ErrNotSinglePair
}
pair := c.Pairs[0]
req := map[string]interface{}{
"event": "subscribe",
"channel": c.Channel,
}
for k, v := range c.Params {
switch k {
case CandlesPeriodKey, CandlesTimeframeKey:
// Skip these internal Params
case "key", "symbol":
// Ensure user's Params aren't silently overwritten
return nil, fmt.Errorf("%s %w", k, errParamNotAllowed)
default:
req[k] = v
}
}
prefix := "t"
if c.Asset == asset.MarginFunding {
prefix = "f"
}
needsDelimiter := pair.Len() > 6
var formattedPair string
if needsDelimiter {
formattedPair = pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = currency.PairFormat{Uppercase: true}.Format(pair)
}
if c.Channel == wsCandles {
timeframe := "1m"
if t, ok := c.Params[CandlesTimeframeKey]; ok {
if timeframe, ok = t.(string); !ok {
return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey")
}
}
fundingPeriod := ""
if p, ok := c.Params[CandlesPeriodKey]; ok {
s, cOk := p.(string)
if !cOk {
return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey")
}
fundingPeriod = ":p" + s
}
req["key"] = "trade:" + timeframe + ":" + prefix + formattedPair + fundingPeriod
} else {
req["symbol"] = prefix + formattedPair
}
return req, nil
}
// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
if len(chans) != 1 {
func (b *Bitfinex) unsubscribeFromChan(subs subscription.List) error {
if len(subs) != 1 {
return errors.New("subscription batching limited to 1")
}
c := chans[0]
chanID, ok := c.Key.(int)
s := subs[0]
chanID, ok := s.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "chanID")
return common.GetTypeAssertError("int", s.Key, "subscription.Key")
}
req := map[string]interface{}{
@@ -1856,12 +1798,12 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
}
if err := b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err)
wErr := fmt.Errorf("%w: ChanId: %v", err, chanID)
b.Websocket.DataHandler <- wErr
return wErr
}
return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)
}
// getErrResp takes a json response string and looks for an error event type
@@ -2143,7 +2085,7 @@ func validateCRC32(book *orderbook.Base, token int) error {
reOrderByID(book.Bids)
reOrderByID(book.Asks)
// RO precision calculation is based on order ID's and amount values
// R0 precision calculation is based on order ID's and amount values
var bids, asks []orderbook.Tranche
for i := range 25 {
if i < len(book.Bids) {
@@ -2233,3 +2175,71 @@ subSort:
break
}
}
// subToMap returns a json object of request params for subscriptions
func subToMap(s *subscription.Subscription, a asset.Item, p currency.Pair) map[string]any {
c := s.Channel
if name, ok := subscriptionNames[s.Channel]; ok {
c = name
}
req := map[string]interface{}{
"channel": c,
}
var fundingPeriod string
for k, v := range s.Params {
switch k {
case CandlesPeriodKey:
if s, ok := v.(string); !ok {
panic(common.GetTypeAssertError("string", v, "subscription.CandlesPeriodKey"))
} else {
fundingPeriod = ":" + s
}
case "key", "symbol", "len":
panic(fmt.Errorf("%w: %s", errParamNotAllowed, k)) // Ensure user's Params aren't silently overwritten
default:
req[k] = v
}
}
if s.Levels != 0 {
req["len"] = s.Levels
}
prefix := "t"
if a == asset.MarginFunding {
prefix = "f"
}
pairFmt := currency.PairFormat{Uppercase: true}
if needsDelimiter := p.Len() > 6; needsDelimiter {
pairFmt.Delimiter = ":"
}
symbol := p.Format(pairFmt).String()
if c == wsCandles {
req["key"] = "trade:" + s.Interval.Short() + ":" + prefix + symbol + fundingPeriod
} else {
req["symbol"] = prefix + symbol
}
return req
}
// removeSpotFromMargin removes spot pairs from margin pairs in the supplied AssetPairs map to avoid duplicate subscriptions
func removeSpotFromMargin(ap map[asset.Item]currency.Pairs, spotPairs currency.Pairs) string {
if p, ok := ap[asset.Margin]; ok {
ap[asset.Margin] = p.Remove(spotPairs...)
}
return ""
}
const subTplText = `
{{- removeSpotFromMargin $.AssetPairs -}}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- subToMap $.S $asset $p | mustToJson }}
{{- $.PairSeparator }}
{{- end -}}
{{ $.AssetSeparator }}
{{- end -}}
`

View File

@@ -159,6 +159,7 @@ func (b *Bitfinex) SetDefaults() {
GlobalResultLimit: 10000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
b.Requester, err = request.New(b.Name,
@@ -208,7 +209,7 @@ func (b *Bitfinex) Setup(exch *config.Exchange) error {
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
GenerateSubscriptions: b.generateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
UpdateEntriesByID: true,