exchanges/websocket: Implement subscription configuration (#1394)

* Websockets: Move Subscription to its own package

This allows the small type to be imported from both `config` and from
`stream` without an import cycle, so we don't have to repeat ourselves

* Subs: Renamed Currency to Pair

This was being mis-used through much of the code, and since we're
already touching everything, we might as well fix it

* Websockets: Add Subscription configuration

* Binance: Add subscription configuration

* Kucoin: Subscription configuration

* Simplify GenerateDefaultSubs
* Improve TestGenSubs coverage
* Test Candle Sub generation
* Support Candle intervals
* Full responsibility for formatting Channel name on GenerateDefaultSubs
  OR consumer of Subscribe
* Simplify generatePayloads as a result
* Fix test coverage of asset types in processMarketSnapshot

* Exchanges: Abstract ParallelChanOp

* Tests: Generic ws mock instances

* Kucoin: Fix intermittent conflict in test currs

Use isolated test instance for `TestGetOpenInterest`.

`TestGetOpenInterest` would occassionally change pairs before
GenerateDefault Subs.
This commit is contained in:
Gareth Kirwan
2024-01-24 05:54:07 +01:00
committed by GitHub
parent 301551ac20
commit e007f69f7c
67 changed files with 3705 additions and 3167 deletions

View File

@@ -101,9 +101,9 @@ const (
bitfinexChecksumFlag = 131072
bitfinexWsSequenceFlag = 65536
// CandlesTimeframeKey configures the timeframe in stream.ChannelSubscription.Params
// CandlesTimeframeKey configures the timeframe in subscription.Subscription.Params
CandlesTimeframeKey = "_timeframe"
// CandlesPeriodKey configures the aggregated period in stream.ChannelSubscription.Params
// CandlesPeriodKey configures the aggregated period in subscription.Subscription.Params
CandlesPeriodKey = "_period"
)

File diff suppressed because one or more lines are too long

View File

@@ -23,6 +23,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
@@ -505,7 +506,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Pair)
}
// Note: chanID's int type avoids conflicts with the string type subID key because of the type difference
@@ -515,7 +516,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
b.Websocket.AddSuccessfulSubscriptions(*c)
if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID)
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Pair, chanID)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
@@ -523,7 +524,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}
func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if eventType == wsChecksum {
return b.handleWSChecksum(c, d)
}
@@ -546,7 +547,7 @@ func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, eventTyp
return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel)
}
func (b *Bitfinex) handleWSChecksum(c *stream.ChannelSubscription, d []interface{}) error {
func (b *Bitfinex) handleWSChecksum(c *subscription.Subscription, d []interface{}) error {
var token int
if f, ok := d[2].(float64); !ok {
return common.GetTypeAssertError("float64", d[2], "checksum")
@@ -577,7 +578,7 @@ func (b *Bitfinex) handleWSChecksum(c *stream.ChannelSubscription, d []interface
return nil
}
func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interface{}) error {
func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interface{}) error {
var newOrderbook []WebsocketBook
obSnapBundle, ok := d[1].([]interface{})
if !ok {
@@ -631,7 +632,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interfa
Amount: rateAmount})
}
}
if err := b.WsInsertSnapshot(c.Currency, c.Asset, newOrderbook, fundingRate); err != nil {
if err := b.WsInsertSnapshot(c.Pair, c.Asset, newOrderbook, fundingRate); err != nil {
return fmt.Errorf("inserting snapshot error: %s",
err)
}
@@ -663,7 +664,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interfa
Amount: amountRate})
}
if err := b.WsUpdateOrderbook(c, c.Currency, c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
if err := b.WsUpdateOrderbook(c, c.Pair, c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
return fmt.Errorf("updating orderbook error: %s",
err)
}
@@ -672,7 +673,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interfa
return nil
}
func (b *Bitfinex) handleWSCandleUpdate(c *stream.ChannelSubscription, d []interface{}) error {
func (b *Bitfinex) handleWSCandleUpdate(c *subscription.Subscription, d []interface{}) error {
candleBundle, ok := d[1].([]interface{})
if !ok || len(candleBundle) == 0 {
return nil
@@ -711,7 +712,7 @@ func (b *Bitfinex) handleWSCandleUpdate(c *stream.ChannelSubscription, d []inter
}
klineData.Exchange = b.Name
klineData.AssetType = c.Asset
klineData.Pair = c.Currency
klineData.Pair = c.Pair
b.Websocket.DataHandler <- klineData
}
case float64:
@@ -740,13 +741,13 @@ func (b *Bitfinex) handleWSCandleUpdate(c *stream.ChannelSubscription, d []inter
}
klineData.Exchange = b.Name
klineData.AssetType = c.Asset
klineData.Pair = c.Currency
klineData.Pair = c.Pair
b.Websocket.DataHandler <- klineData
}
return nil
}
func (b *Bitfinex) handleWSTickerUpdate(c *stream.ChannelSubscription, d []interface{}) error {
func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interface{}) error {
tickerData, ok := d[1].([]interface{})
if !ok {
return errors.New("type assertion for tickerData")
@@ -754,7 +755,7 @@ func (b *Bitfinex) handleWSTickerUpdate(c *stream.ChannelSubscription, d []inter
t := &ticker.Price{
AssetType: c.Asset,
Pair: c.Currency,
Pair: c.Pair,
ExchangeName: b.Name,
}
@@ -819,7 +820,7 @@ func (b *Bitfinex) handleWSTickerUpdate(c *stream.ChannelSubscription, d []inter
return nil
}
func (b *Bitfinex) handleWSTradesUpdate(c *stream.ChannelSubscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if !b.IsSaveTradeDataEnabled() {
return nil
}
@@ -935,7 +936,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *stream.ChannelSubscription, eventType
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Currency,
CurrencyPair: c.Pair,
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
@@ -1508,7 +1509,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books
// WsUpdateOrderbook updates the orderbook list, removing and adding to the
// orderbook sides
func (b *Bitfinex) WsUpdateOrderbook(c *stream.ChannelSubscription, p currency.Pair, assetType asset.Item, book []WebsocketBook, sequenceNo int64, fundingRate bool) error {
func (b *Bitfinex) WsUpdateOrderbook(c *subscription.Subscription, p currency.Pair, assetType asset.Item, book []WebsocketBook, sequenceNo int64, fundingRate bool) error {
orderbookUpdate := orderbook.Update{
Asset: assetType,
Pair: p,
@@ -1602,8 +1603,8 @@ func (b *Bitfinex) WsUpdateOrderbook(c *stream.ChannelSubscription, p currency.P
// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum,
// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting.
// Flushing the orderbook happens immediately, but the ReSub itself is a go routine to avoid blocking the WS data channel
func (b *Bitfinex) resubOrderbook(c *stream.ChannelSubscription) {
if err := b.Websocket.Orderbook.FlushOrderbook(c.Currency, c.Asset); err != nil {
func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) {
if err := b.Websocket.Orderbook.FlushOrderbook(c.Pair, c.Asset); err != nil {
log.Errorf(log.ExchangeSys, "%s error flushing orderbook: %v", b.Name, err)
}
@@ -1616,10 +1617,10 @@ func (b *Bitfinex) resubOrderbook(c *stream.ChannelSubscription) {
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
func (b *Bitfinex) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) {
var channels = []string{wsBook, wsTrades, wsTicker, wsCandles}
var subscriptions []stream.ChannelSubscription
var subscriptions []subscription.Subscription
assets := b.GetAssetTypes(true)
for i := range assets {
if !b.IsAssetWebsocketSupported(assets[i]) {
@@ -1642,11 +1643,11 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription,
params[CandlesPeriodKey] = "30"
}
subscriptions = append(subscriptions, stream.ChannelSubscription{
Channel: channels[j],
Currency: enabledPairs[k],
Params: params,
Asset: assets[i],
subscriptions = append(subscriptions, subscription.Subscription{
Channel: channels[j],
Pair: enabledPairs[k],
Params: params,
Asset: assets[i],
})
}
}
@@ -1664,47 +1665,26 @@ func (b *Bitfinex) ConfigureWS() error {
}
// Subscribe sends a websocket message to receive data from channels
func (b *Bitfinex) Subscribe(channels []stream.ChannelSubscription) error {
return b.parallelChanOp(channels, b.subscribeToChan)
func (b *Bitfinex) Subscribe(channels []subscription.Subscription) error {
return b.ParallelChanOp(channels, b.subscribeToChan, 1)
}
// Unsubscribe sends a websocket message to stop receiving data from channels
func (b *Bitfinex) Unsubscribe(channels []stream.ChannelSubscription) error {
return b.parallelChanOp(channels, b.unsubscribeFromChan)
}
// parallelChanOp performs a single method call in parallel across streams and waits to return any errors
func (b *Bitfinex) parallelChanOp(channels []stream.ChannelSubscription, m func(*stream.ChannelSubscription) error) error {
wg := sync.WaitGroup{}
wg.Add(len(channels))
errC := make(chan error, len(channels))
for i := range channels {
go func(c *stream.ChannelSubscription) {
defer wg.Done()
if err := m(c); err != nil {
errC <- err
}
}(&channels[i])
}
wg.Wait()
close(errC)
var errs error
for err := range errC {
errs = common.AppendError(errs, err)
}
return errs
func (b *Bitfinex) Unsubscribe(channels []subscription.Subscription) error {
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1)
}
// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
req, err := subscribeReq(c)
func (b *Bitfinex) subscribeToChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
}
c := chans[0]
req, err := subscribeReq(&c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
}
// subId is a single round-trip identifier that provides linking sub requests to chanIDs
@@ -1716,22 +1696,22 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
c.State = stream.ChannelSubscribing
err = b.Websocket.AddSubscription(c)
c.State = subscription.SubscribingState
err = b.Websocket.AddSubscription(&c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pair, err)
}
// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscriptions(*c)
defer b.Websocket.RemoveSubscriptions(c)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
}
if err = b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
b.Websocket.DataHandler <- wErr
return wErr
}
@@ -1740,7 +1720,7 @@ func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
}
// subscribeReq returns a map of request params for subscriptions
func subscribeReq(c *stream.ChannelSubscription) (map[string]interface{}, error) {
func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error) {
req := map[string]interface{}{
"event": "subscribe",
"channel": c.Channel,
@@ -1763,13 +1743,13 @@ func subscribeReq(c *stream.ChannelSubscription) (map[string]interface{}, error)
prefix = "f"
}
needsDelimiter := c.Currency.Len() > 6
needsDelimiter := c.Pair.Len() > 6
var formattedPair string
if needsDelimiter {
formattedPair = c.Currency.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
formattedPair = c.Pair.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = currency.PairFormat{Uppercase: true}.Format(c.Currency)
formattedPair = currency.PairFormat{Uppercase: true}.Format(c.Pair)
}
if c.Channel == wsCandles {
@@ -1796,7 +1776,11 @@ func subscribeReq(c *stream.ChannelSubscription) (map[string]interface{}, error)
}
// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error {
func (b *Bitfinex) unsubscribeFromChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
}
c := chans[0]
chanID, ok := c.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "chanID")
@@ -1818,7 +1802,7 @@ func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error {
return wErr
}
b.Websocket.RemoveSubscriptions(*c)
b.Websocket.RemoveSubscriptions(c)
return nil
}