Bitfinex: Websocket subscription improvements (#1353)

* Websockets: Add keys to websocket subscriptions

* This switches all RO uses of the mutex to use a RLock method.
* The mutex used for discrete field access has had scope drift from
  name 'connectionMutex' so rename to more appropriate fieldsMutex
* The mutex used for Set/CanUseAuthEndpoints moves from the
  subscriptions endpoint to the fieldsMutex
* Add GetSubscription by key
* Expose stream.Matcher type

* Bitfinex: Subscribe and Unsubscribe atomicly

* Fix Auth failures ignored
* This change makes it so that Subscribe and Unsubscribe wait for success
** Tells the DataHandler about errors
** Errors are returned to consumers
* Subscribes concurrently to the channels
* It also simplifies the chanId to stream mapping
* Removes unable to locate chanID: %d errors which are just noise
* Paves the way for unified channelSubscription id handling
* Adds support for subId for Book subscriptions, which is more robust

* Vastly simplifies what we need to test TestWsSubscribedResponse
This test was working to ensure that the various fancy key parsing
mechanisms all worked. Now that we use subId, we just need a thorough
test of that
* Expose Match.Set in order to capture websocket incoming data
Can't see another way of doing this. Doesn't seem too bad

* Allow tests to run with auth or WS
These flags made it difficult to run the tests whilst working on
websockets

* Enable API auth and WS in testconfig
This change minimises the changes requires for a full test run against
live endpoints, so that new contributors have a clearer testing path.
I cannot see any reason to turn WS off and Auth endpoints off when we're
not going to run API tests without Creds being set, and we're not going
to do live fire tests without canManipulateRealOrders

* TestWsSubscribe and various fixes
** Enables the websocket for live non-authed integration tests by default
** Adds an integration test for subscriptions
** Changes the Ws tests to respect canManipulateRealOrders
** Uses WsConnect instead of setupWs; fixes seqNo config not sent for WS tests
** Allows api creds to live in config/testdata.json which might be
  less likely to accidentally commit, and less obtrusive

* Bitfinex: Support period and timeframe for Candles

* Fixes manual Subscribe() symbol or key formatting
* Unifies handling of params for DefaultSubscriptions and manual
  subsrciptions

* Bitfinex: Handle conf and info WS channel events

* Bitfinex: Better tests for subscriptions

* fixup! Websockets: Add keys to websocket subscriptions

* fixup! Bitfinex: Subscribe and Unsubscribe atomicly

* fixup! Websockets: Add keys to websocket subscriptions

* Websockets: Add Pending subscription status

Add a status tracker so that Sub/Unsub can prevent duplicates,
and also fixes when first message comes before we have added the sub
to the tracker

* Websockets: Add State instead of pending

This change allows more clarity about the current state and
checks for specifically already Unsubing

* Bitfinex: Fix first sub message maybe lost

The only link we have between a sub req and the sub resp is the subID.
And the only link we have between a sub message and the sub is the chanID.
We can't derive a link using Pair or anything else.

This meant that by sending the resp and its chanID down the IncomingData
channel, we allowed the channel reader to maybe process the next
message, the first message on the channel, before the runtime executed
the switch back to subscribeToChan waiting on the chan.

To fix this, we key initially on subId.(string), and then replace it
with chanId.(int64) when we have it *inside* the wsHandleData so we
know we've procedurally handled it before the next message.

subscribeToChan is then free to remove the subId keyed Sub regardless of
error or not

If there's an error, we don't need to inline handling because there
won't be any second update.

Expands test coverage to make sure those subId keyed subscriptions are
removed.

* Websocket: Validate state in SetChanState

* fixup! Bitfinex: Fix first sub message maybe lost

* Websockets: Rename RemoveUnsuccessfulSubs

Implementation doesn't imply Unsuccessful or need to.
This change supports the registering of Pending subs

* Bitfinex: Fix race in Tests
This commit is contained in:
Gareth Kirwan
2023-11-02 02:10:43 +01:00
committed by GitHub
parent dbe41a7412
commit f9437dbd08
39 changed files with 1004 additions and 668 deletions

View File

@@ -1095,7 +1095,7 @@ channels:
continue
}
// When we have a successful unsubscription, we can alert our internal management system of the success.
f.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
f.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs

View File

@@ -621,7 +621,7 @@ func (b *Binance) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription
return err
}
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -614,7 +614,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscript
return err
}
}
bi.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
bi.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -21,7 +21,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -101,12 +100,16 @@ const (
bitfinexChecksumFlag = 131072
bitfinexWsSequenceFlag = 65536
// CandlesTimeframeKey configures the timeframe in stream.ChannelSubscription.Params
CandlesTimeframeKey = "_timeframe"
// CandlesPeriodKey configures the aggregated period in stream.ChannelSubscription.Params
CandlesPeriodKey = "_period"
)
// Bitfinex is the overarching type across the bitfinex package
type Bitfinex struct {
exchange.Base
WebsocketSubdChannels map[int]*stream.ChannelSubscription
}
// GetPlatformStatus returns the Bifinex platform status

File diff suppressed because one or more lines are too long

View File

@@ -12,9 +12,11 @@ import (
var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errSubNotFound = errors.New("could not find matching subscription")
errTypeAssert = errors.New("type assertion failed")
errNoSeqNo = errors.New("no sequence number")
errUnknownError = errors.New("unknown error")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
)
// AccountV2Data stores account v2 data
@@ -659,6 +661,12 @@ const (
wsTicker = "ticker"
wsTrades = "trades"
wsError = "error"
wsEventSubscribed = "subscribed"
wsEventUnsubscribed = "unsubscribed"
wsEventAuth = "auth"
wsEventError = "error"
wsEventConf = "conf"
wsEventInfo = "info"
)
// WsAuthRequest container for WS auth request

View File

@@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/convert"
@@ -132,76 +133,31 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
}
switch d := result.(type) {
case map[string]interface{}:
event := d["event"]
switch event {
case "subscribed":
chanID, ok := d["chanId"].(float64)
if !ok {
return errors.New("unable to type assert chanId")
}
channel, ok := d["channel"].(string)
if !ok {
return errors.New("unable to type assert channel")
}
symbol, ok := d["symbol"].(string)
if !ok {
key, ok := d["key"].(string)
if !ok {
return fmt.Errorf("subscribed to channel but no symbol or key: %v", channel)
}
if channel != wsCandles {
// status channel not implemented at all yet.
return fmt.Errorf("%v channel subscription keys: %w", channel, common.ErrNotYetImplemented)
}
var err error
symbol, err = symbolFromCandleKey(key)
if err != nil {
return err
}
}
if err := b.WsAddSubscriptionChannel(int(chanID), channel, symbol); err != nil {
return err
}
case "unsubscribed":
chanID, ok := d["chanId"].(float64)
if !ok {
return errors.New("unable to type assert chanId")
}
delete(b.WebsocketSubdChannels, int(chanID))
case "auth":
status, ok := d["status"].(string)
if !ok {
return errors.New("unable to type assert status")
}
if status == "OK" {
b.Websocket.DataHandler <- d
} else if status == "fail" {
if code, ok := d["code"].(string); ok {
return fmt.Errorf("websocket unable to AUTH. Error code: %s",
code)
}
return errors.New("websocket unable to auth")
}
}
return b.handleWSEvent(respRaw)
case []interface{}:
var chanID int
if f, ok := d[0].(float64); !ok {
chanIDFloat, ok := d[0].(float64)
if !ok {
return common.GetTypeAssertError("float64", d[0], "chanID")
} else { //nolint:revive // using lexical variable requires else statement
chanID = int(f)
}
chanID := int(chanIDFloat)
eventType, hasEventType := d[1].(string)
if chanID != 0 {
if c, ok := b.WebsocketSubdChannels[chanID]; ok {
return b.handleWSChannelUpdate(c, chanID, eventType, d)
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
}
return fmt.Errorf("unable to locate chanID: %d", chanID)
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, stream.ErrSubscriptionNotFound, respRaw)
}
// We didn't have a mapping for this chanID; This probably means we have unsubscribed OR
// received our first message before processing the sub chanID
// In either case it's okay. No point in erroring because there's nothing we can do about it, and it happens often
return nil
}
if !hasEventType {
return errors.New("WS message without eventType or chanID")
return errors.New("WS message without eventType")
}
switch eventType {
@@ -469,9 +425,107 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return nil
}
func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, chanID int, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, respRaw)
}
switch event {
case wsEventSubscribed:
return b.handleWSSubscribed(respRaw)
case wsEventUnsubscribed:
chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw)
}
if !b.Websocket.Match.IncomingWithData("unsubscribe:"+chanID, respRaw) {
return fmt.Errorf("%v channel unsubscribe listener not found", chanID)
}
case wsEventError:
if subID, err := jsonparser.GetUnsafeString(respRaw, "subId"); err == nil {
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
}
} else if chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId"); err == nil {
if !b.Websocket.Match.IncomingWithData("unsubscribe:"+chanID, respRaw) {
return fmt.Errorf("%v channel unsubscribe listener not found", chanID)
}
} else {
return fmt.Errorf("unknown channel error; Message: %s", respRaw)
}
case wsEventAuth:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
}
if status == "OK" {
var glob map[string]interface{}
if err := json.Unmarshal(respRaw, &glob); err != nil {
return fmt.Errorf("unable to Unmarshal auth resp; Error: %w Msg: %v", err, respRaw)
}
// TODO - Send a better value down the channel
b.Websocket.DataHandler <- glob
} else {
errCode, err := jsonparser.GetInt(respRaw, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, respRaw)
}
return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode)
}
case wsEventInfo:
// Nothing to do with info for now.
// version or platform.status might be useful in the future.
case wsEventConf:
status, err := jsonparser.GetUnsafeString(respRaw, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", errParsingWSField, err, respRaw)
}
if status != "OK" {
return fmt.Errorf("WS configure channel error; Status: %s", status)
}
default:
return fmt.Errorf("unknown WS event msg: %s", respRaw)
}
return nil
}
// handleWSSubscribed parses a subscription response and registers the chanID key immediately, before updating subscribeToChan via IncomingWithData chan
// wsHandleData happens sequentially, so by rekeying on chanID immediately we ensure the first message is not dropped
func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
subID, err := jsonparser.GetUnsafeString(respRaw, "subId")
if err != nil {
return fmt.Errorf("%w 'subId': %w from message: %s", errParsingWSField, err, respRaw)
}
c := b.Websocket.GetSubscription(subID)
if c == nil {
return fmt.Errorf("%w: %w subID: %s", stream.ErrSubscriptionFailure, stream.ErrSubscriptionNotFound, subID)
}
chanID, err := jsonparser.GetInt(respRaw, "chanId")
if err != nil {
return fmt.Errorf("%w: %w 'chanId': %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, errParsingWSField, err, c.Channel, c.Currency)
}
// Note: chanID's int type avoids conflicts with the string type subID key because of the type difference
c.Key = int(chanID)
// subscribeToChan removes the old subID keyed Subscription
b.Websocket.AddSuccessfulSubscriptions(*c)
if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Currency, chanID)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%v channel subscribe listener not found", subID)
}
return nil
}
func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, eventType string, d []interface{}) error {
if eventType == wsChecksum {
return b.handleWSChecksum(chanID, d)
return b.handleWSChecksum(c, d)
}
if eventType == wsHeartbeat {
@@ -480,7 +534,7 @@ func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, chanID i
switch c.Channel {
case wsBook:
return b.handleWSBookUpdate(c, chanID, d)
return b.handleWSBookUpdate(c, d)
case wsCandles:
return b.handleWSCandleUpdate(c, d)
case wsTicker:
@@ -492,7 +546,7 @@ func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, chanID i
return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel)
}
func (b *Bitfinex) handleWSChecksum(chanID int, d []interface{}) error {
func (b *Bitfinex) handleWSChecksum(c *stream.ChannelSubscription, d []interface{}) error {
var token int
if f, ok := d[2].(float64); !ok {
return common.GetTypeAssertError("float64", d[2], "checksum")
@@ -509,6 +563,11 @@ func (b *Bitfinex) handleWSChecksum(chanID int, d []interface{}) error {
seqNo = int64(f)
}
chanID, ok := c.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "ChanID") // Should be impossible
}
cMtx.Lock()
checksumStore[chanID] = &checksum{
Token: token,
@@ -518,7 +577,7 @@ func (b *Bitfinex) handleWSChecksum(chanID int, d []interface{}) error {
return nil
}
func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, chanID int, d []interface{}) error {
func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interface{}) error {
var newOrderbook []WebsocketBook
obSnapBundle, ok := d[1].([]interface{})
if !ok {
@@ -604,7 +663,7 @@ func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, chanID int,
Amount: amountRate})
}
if err := b.WsUpdateOrderbook(c.Currency, c.Asset, newOrderbook, chanID, int64(sequenceNo), fundingRate); err != nil {
if err := b.WsUpdateOrderbook(c, c.Currency, c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
return fmt.Errorf("updating orderbook error: %s",
err)
}
@@ -1405,8 +1464,7 @@ func (b *Bitfinex) wsHandleOrder(data []interface{}) {
b.Websocket.DataHandler <- &od
}
// WsInsertSnapshot add the initial orderbook snapshot when subscribed to a
// channel
// WsInsertSnapshot add the initial orderbook snapshot when subscribed to a channel
func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books []WebsocketBook, fundingRate bool) error {
if len(books) == 0 {
return errors.New("no orderbooks submitted")
@@ -1450,7 +1508,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books
// WsUpdateOrderbook updates the orderbook list, removing and adding to the
// orderbook sides
func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book []WebsocketBook, channelID int, sequenceNo int64, fundingRate bool) error {
func (b *Bitfinex) WsUpdateOrderbook(c *stream.ChannelSubscription, p currency.Pair, assetType asset.Item, book []WebsocketBook, sequenceNo int64, fundingRate bool) error {
orderbookUpdate := orderbook.Update{
Asset: assetType,
Pair: p,
@@ -1506,13 +1564,18 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
}
}
chanID, ok := c.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "ChanID") // Should be impossible
}
cMtx.Lock()
checkme := checksumStore[channelID]
checkme := checksumStore[chanID]
if checkme == nil {
cMtx.Unlock()
return b.Websocket.Orderbook.Update(&orderbookUpdate)
}
checksumStore[channelID] = nil
checksumStore[chanID] = nil
cMtx.Unlock()
if checkme.Sequence+1 == sequenceNo {
@@ -1528,9 +1591,7 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
if err = validateCRC32(ob, checkme.Token); err != nil {
log.Errorf(log.WebsocketMgr, "%s websocket orderbook update error, will resubscribe orderbook: %v", b.Name, err)
if suberr := b.resubOrderbook(p, assetType); suberr != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, suberr)
}
b.resubOrderbook(c)
return err
}
}
@@ -1540,37 +1601,22 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum,
// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting.
func (b *Bitfinex) resubOrderbook(p currency.Pair, assetType asset.Item) error {
if err := b.Websocket.Orderbook.FlushOrderbook(p, assetType); err != nil {
return err
// Flushing the orderbook happens immediately, but the ReSub itself is a go routine to avoid blocking the WS data channel
func (b *Bitfinex) resubOrderbook(c *stream.ChannelSubscription) {
if err := b.Websocket.Orderbook.FlushOrderbook(c.Currency, c.Asset); err != nil {
log.Errorf(log.ExchangeSys, "%s error flushing orderbook: %v", b.Name, err)
}
c, err := b.chanForSub(wsBook, assetType, p)
if err != nil {
return err
}
return b.Websocket.ResubscribeToChannel(c)
}
// chanForSub returns an existing channel subscription for a given channel/asset/pair
func (b *Bitfinex) chanForSub(cName string, assetType asset.Item, pair currency.Pair) (*stream.ChannelSubscription, error) {
want := &stream.ChannelSubscription{
Channel: cName,
Currency: pair,
Asset: assetType,
}
subs := b.Websocket.GetSubscriptions()
for i := range subs {
if subs[i].Equal(want) {
return &subs[i], nil
// Resub will block so we have to do this in a goro
go func() {
if err := b.Websocket.ResubscribeToChannel(c); err != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, err)
}
}
return nil, errSubNotFound
}()
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
var wsPairFormat = currency.PairFormat{Uppercase: true}
var channels = []string{wsBook, wsTrades, wsTicker, wsCandles}
var subscriptions []stream.ChannelSubscription
@@ -1592,29 +1638,8 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription,
params["len"] = "100"
}
prefix := "t"
if assets[i] == asset.MarginFunding {
prefix = "f"
}
needsDelimiter := enabledPairs[k].Len() > 6
var formattedPair string
if needsDelimiter {
formattedPair = enabledPairs[k].Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = wsPairFormat.Format(enabledPairs[k])
}
if channels[j] == wsCandles {
// TODO: Add ability to select timescale && funding period
fundingPeriod := ""
if assets[i] == asset.MarginFunding {
fundingPeriod = ":p30"
}
params["key"] = "trade:1m:" + prefix + formattedPair + fundingPeriod
} else {
params["symbol"] = prefix + formattedPair
if channels[j] == wsCandles && assets[i] == asset.MarginFunding {
params[CandlesPeriodKey] = "30"
}
subscriptions = append(subscriptions, stream.ChannelSubscription{
@@ -1630,31 +1655,6 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription,
return subscriptions, nil
}
// Subscribe sends a websocket message to receive data from the channel
func (b *Bitfinex) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
var errs error
for i := range channelsToSubscribe {
req := make(map[string]interface{})
req["event"] = "subscribe"
req["channel"] = channelsToSubscribe[i].Channel
for k, v := range channelsToSubscribe[i].Params {
// Resubscribing channels might already have this set
if k != "chanId" {
req[k] = v
}
}
err := b.Websocket.Conn.SendJSONMessage(req)
if err != nil {
errs = common.AppendError(errs, err)
continue
}
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
}
return errs
}
// ConfigureWS to send checksums and sequence numbers
func (b *Bitfinex) ConfigureWS() error {
return b.Websocket.Conn.SendJSONMessage(map[string]interface{}{
@@ -1663,37 +1663,193 @@ func (b *Bitfinex) ConfigureWS() error {
})
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *Bitfinex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
var errs error
for i := range channelsToUnsubscribe {
idAny, ok := channelsToUnsubscribe[i].Params["chanId"]
if !ok {
errs = common.AppendError(errs, fmt.Errorf("cannot unsubscribe from a channel without an id"))
continue
}
chanID, ok := idAny.(int)
if !ok {
errs = common.AppendError(errs, fmt.Errorf("chanId is not an int"))
continue
}
// Subscribe sends a websocket message to receive data from channels
func (b *Bitfinex) Subscribe(channels []stream.ChannelSubscription) error {
return b.parallelChanOp(channels, b.subscribeToChan)
}
req := map[string]interface{}{
"event": "unsubscribe",
"chanId": chanID,
}
// Unsubscribe sends a websocket message to stop receiving data from channels
func (b *Bitfinex) Unsubscribe(channels []stream.ChannelSubscription) error {
return b.parallelChanOp(channels, b.unsubscribeFromChan)
}
err := b.Websocket.Conn.SendJSONMessage(req)
if err != nil {
errs = common.AppendError(errs, err)
continue
}
// We do this before the unsubscribed event comes back so we can subscribe again when called from ResubcribeToChannel
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
// parallelChanOp performs a single method call in parallel across streams and waits to return any errors
func (b *Bitfinex) parallelChanOp(channels []stream.ChannelSubscription, m func(*stream.ChannelSubscription) error) error {
wg := sync.WaitGroup{}
wg.Add(len(channels))
errC := make(chan error, len(channels))
for i := range channels {
go func(c *stream.ChannelSubscription) {
defer wg.Done()
if err := m(c); err != nil {
errC <- err
}
}(&channels[i])
}
wg.Wait()
close(errC)
var errs error
for err := range errC {
errs = common.AppendError(errs, err)
}
return errs
}
// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Bitfinex) subscribeToChan(c *stream.ChannelSubscription) error {
req, err := subscribeReq(c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
}
// subId is a single round-trip identifier that provides linking sub requests to chanIDs
// Although docs only mention subId for wsBook, it works for all chans
subID := strconv.FormatInt(b.Websocket.Conn.GenerateMessageID(false), 10)
req["subId"] = subID
// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
c.State = stream.ChannelSubscribing
err = b.Websocket.AddSubscription(c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
}
// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscriptions(*c)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
}
if err = b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Currency)
b.Websocket.DataHandler <- wErr
return wErr
}
return nil
}
// subscribeReq returns a map of request params for subscriptions
func subscribeReq(c *stream.ChannelSubscription) (map[string]interface{}, error) {
req := map[string]interface{}{
"event": "subscribe",
"channel": c.Channel,
}
for k, v := range c.Params {
switch k {
case CandlesPeriodKey, CandlesTimeframeKey:
// Skip these internal Params
case "key", "symbol":
// Ensure user's Params aren't silently overwritten
return nil, fmt.Errorf("%s %w", k, errParamNotAllowed)
default:
req[k] = v
}
}
prefix := "t"
if c.Asset == asset.MarginFunding {
prefix = "f"
}
needsDelimiter := c.Currency.Len() > 6
var formattedPair string
if needsDelimiter {
formattedPair = c.Currency.Format(currency.PairFormat{Uppercase: true, Delimiter: ":"}).String()
} else {
formattedPair = currency.PairFormat{Uppercase: true}.Format(c.Currency)
}
if c.Channel == wsCandles {
timeframe := "1m"
if t, ok := c.Params[CandlesTimeframeKey]; ok {
if timeframe, ok = t.(string); !ok {
return nil, common.GetTypeAssertError("string", t, "Subscription.CandlesTimeframeKey")
}
}
fundingPeriod := ""
if p, ok := c.Params[CandlesPeriodKey]; ok {
s, cOk := p.(string)
if !cOk {
return nil, common.GetTypeAssertError("string", p, "Subscription.CandlesPeriodKey")
}
fundingPeriod = ":p" + s
}
req["key"] = "trade:" + timeframe + ":" + prefix + formattedPair + fundingPeriod
} else {
req["symbol"] = prefix + formattedPair
}
return req, nil
}
// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Bitfinex) unsubscribeFromChan(c *stream.ChannelSubscription) error {
chanID, ok := c.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "chanID")
}
req := map[string]interface{}{
"event": "unsubscribe",
"chanId": chanID,
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("unsubscribe:"+strconv.Itoa(chanID), req)
if err != nil {
return err
}
if err := b.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w from ChanId: %v; %w", stream.ErrUnsubscribeFailure, chanID, err)
b.Websocket.DataHandler <- wErr
return wErr
}
b.Websocket.RemoveSubscriptions(*c)
return nil
}
// getErrResp takes a json response string and looks for an error event type
// If found it parses the error code and message as a wrapped error and returns it
// It might log parsing errors about the nature of the error
// If the error message is not defined it will return a wrapped errUnknownError
func (b *Bitfinex) getErrResp(resp []byte) error {
event, err := jsonparser.GetUnsafeString(resp, "event")
if err != nil {
return fmt.Errorf("%w 'event': %w from message: %s", errParsingWSField, err, resp)
}
if event != "error" {
return nil
}
errCode, err := jsonparser.GetInt(resp, "code")
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", b.Name, errParsingWSField, err, resp)
}
var apiErr error
if msg, e2 := jsonparser.GetString(resp, "msg"); e2 != nil {
log.Errorf(log.ExchangeSys, "%s %s 'msg': %s from message: %s", b.Name, errParsingWSField, e2, resp)
apiErr = errUnknownError
} else {
apiErr = errors.New(msg)
}
return fmt.Errorf("%w (code: %d)", apiErr, errCode)
}
// WsSendAuth sends a authenticated event payload
func (b *Bitfinex) WsSendAuth(ctx context.Context) error {
creds, err := b.GetCredentials(ctx)
@@ -1726,56 +1882,6 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error {
return nil
}
// WsAddSubscriptionChannel adds a confirmed channel subscription mapping from id to original params
func (b *Bitfinex) WsAddSubscriptionChannel(chanID int, channel, symbol string) error {
assetType, pair, err := assetPairFromSymbol(symbol)
if err != nil {
return err
}
var c *stream.ChannelSubscription
s := b.Websocket.GetSubscriptions()
for i := range s {
if strings.EqualFold(s[i].Channel, channel) && s[i].Currency.Equal(pair) && s[i].Asset == assetType {
c = &s[i]
break
}
}
if c == nil {
log.Errorf(log.ExchangeSys,
"%s Could not find an existing channel subscription: %s Pair: %s ChannelID: %d Asset: %s\n",
b.Name,
channel,
pair,
chanID,
assetType)
c = &stream.ChannelSubscription{
Channel: channel,
Currency: pair,
Asset: assetType,
}
}
if c.Params == nil {
c.Params = map[string]interface{}{}
}
c.Params["chanId"] = chanID
b.WebsocketSubdChannels[chanID] = c
if b.Verbose {
log.Debugf(log.ExchangeSys,
"%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n",
b.Name,
channel,
pair,
chanID)
}
return nil
}
// WsNewOrder authenticated new order request
func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) {
data.CustomID = b.Websocket.AuthConn.GenerateMessageID(false)
@@ -2087,42 +2193,3 @@ subSort:
break
}
}
func assetPairFromSymbol(symbol string) (asset.Item, currency.Pair, error) {
assetType := asset.Spot
if symbol == "" {
return assetType, currency.EMPTYPAIR, nil
}
switch symbol[0] {
case 'f':
assetType = asset.MarginFunding
case 't':
assetType = asset.Spot
default:
return assetType, currency.EMPTYPAIR, fmt.Errorf("unknown pair prefix: %v", symbol[0])
}
pair, err := currency.NewPairFromString(symbol[1:])
return assetType, pair, err
}
// symbolFromCandleKey extracts the symbol or pair from a subscribed channel key
// e.g. trade:1h:tBTC, trade:1h:tBTC:CNHT, trade:1m:fBTC:p30 and trade:1m:fBTC:a30:p2:p30
func symbolFromCandleKey(key string) (string, error) {
parts := strings.Split(key, ":")
if len(parts) < 3 {
return "", fmt.Errorf("subscription key has too few parts, need 3: %v", key)
}
parts = parts[2:]
if parts[0][0] == 'f' {
// Margin Funding subscription has one currency, and suffixes
return parts[0], nil
}
if len(parts) > 2 {
return "", fmt.Errorf("subscription key has too many parts for trade types: %v", key)
}
return strings.Join(parts, ":"), nil
}

View File

@@ -60,7 +60,6 @@ func (b *Bitfinex) SetDefaults() {
b.Name = "Bitfinex"
b.Enabled = true
b.Verbose = true
b.WebsocketSubdChannels = make(map[int]*stream.ChannelSubscription)
b.API.CredentialsValidator.RequiresKey = true
b.API.CredentialsValidator.RequiresSecret = true

View File

@@ -0,0 +1,5 @@
{"bird": "great eared nightjar", "you_are_welcome":true}
{"event": {}}
{"event": "sneezegasm"}
{"event": "error"}
{"event": "error", "msg":"redcoats", "code":42}

View File

@@ -657,7 +657,7 @@ func (b *Bitmex) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -317,7 +317,7 @@ func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscriptio
errs = common.AppendError(errs, err)
continue
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}

View File

@@ -363,7 +363,7 @@ func (b *Bittrex) unsubscribeSlice(channelsToUnsubscribe []stream.ChannelSubscri
errs = common.AppendError(errs, errors.New("unable to unsubscribe from "+channels[i]+" - error code "+response.Response[i].ErrorCode))
continue
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}

View File

@@ -429,7 +429,7 @@ func (b *BTCMarkets) Unsubscribe(subs []stream.ChannelSubscription) error {
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(subs...)
b.Websocket.RemoveSubscriptions(subs...)
return nil
}

View File

@@ -411,6 +411,6 @@ func (b *BTSE) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) e
if err != nil {
return err
}
b.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -170,7 +170,7 @@ func (by *Bybit) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}

View File

@@ -166,7 +166,7 @@ func (by *Bybit) UnsubscribeCoin(channelsToUnsubscribe []stream.ChannelSubscript
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}

View File

@@ -118,7 +118,7 @@ func (by *Bybit) UnsubscribeFutures(channelsToUnsubscribe []stream.ChannelSubscr
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
return errs
}

View File

@@ -123,7 +123,7 @@ func (by *Bybit) UnsubscribeUSDT(channelsToUnsubscribe []stream.ChannelSubscript
errs = common.AppendError(errs, err)
continue
}
by.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
by.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs

View File

@@ -481,6 +481,6 @@ unsubscriptions:
if err != nil {
return err
}
c.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
c.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -684,7 +684,7 @@ func (c *COINUT) Unsubscribe(channelToUnsubscribe []stream.ChannelSubscription)
channelToUnsubscribe[i].Channel))
continue
}
c.Websocket.RemoveSuccessfulUnsubscriptions(channelToUnsubscribe[i])
c.Websocket.RemoveSubscriptions(channelToUnsubscribe[i])
}
return errs
}

View File

@@ -712,7 +712,7 @@ func (g *Gateio) handleSubscription(event string, channelsToSubscribe []stream.C
if payloads[k].Event == "subscribe" {
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
} else {
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToSubscribe[k])
g.Websocket.RemoveSubscriptions(channelsToSubscribe[k])
}
}
}

View File

@@ -331,7 +331,7 @@ func (g *Gateio) handleOptionsSubscription(event string, channelsToSubscribe []s
if payloads[k].Event == "subscribe" {
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
} else {
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToSubscribe[k])
g.Websocket.RemoveSubscriptions(channelsToSubscribe[k])
}
}
}

View File

@@ -171,7 +171,7 @@ func (g *Gemini) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
return err
}
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...)
g.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -558,7 +558,7 @@ func (h *HitBTC) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs

View File

@@ -606,7 +606,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
continue
}
err := h.Websocket.Conn.SendJSONMessage(WsRequest{
@@ -616,7 +616,7 @@ func (h *HUOBI) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription)
errs = common.AppendError(errs, err)
continue
}
h.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe[i])
h.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
}
if errs != nil {
return errs

View File

@@ -1347,7 +1347,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
k.Websocket.RemoveSubscriptions(unsubs[i].Channels...)
continue
}
@@ -1356,7 +1356,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
k.Websocket.RemoveSuccessfulUnsubscriptions(unsubs[i].Channels...)
k.Websocket.RemoveSubscriptions(unsubs[i].Channels...)
}
return errs
}

View File

@@ -926,9 +926,9 @@ func (o *Okcoin) handleSubscriptions(operation string, subs []stream.ChannelSubs
if operation == "unsubscribe" {
if authenticatedChannelSubscription {
o.Websocket.RemoveSuccessfulUnsubscriptions(authChannels...)
o.Websocket.RemoveSubscriptions(authChannels...)
} else {
o.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
o.Websocket.RemoveSubscriptions(channels...)
}
} else {
if authenticatedChannelSubscription {
@@ -968,7 +968,7 @@ func (o *Okcoin) handleSubscriptions(operation string, subs []stream.ChannelSubs
}
}
if operation == "unsubscribe" {
o.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
o.Websocket.RemoveSubscriptions(channels...)
} else {
o.Websocket.AddSuccessfulSubscriptions(channels...)
}

View File

@@ -478,7 +478,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
return err
}
if operation == operationUnsubscribe {
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
ok.Websocket.AddSuccessfulSubscriptions(channels...)
}
@@ -500,7 +500,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
return err
}
if operation == operationUnsubscribe {
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
ok.Websocket.AddSuccessfulSubscriptions(channels...)
}
@@ -529,7 +529,7 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
if operation == operationUnsubscribe {
channels = append(channels, authChannels...)
ok.Websocket.RemoveSuccessfulUnsubscriptions(channels...)
ok.Websocket.RemoveSubscriptions(channels...)
} else {
channels = append(channels, authChannels...)
ok.Websocket.AddSuccessfulSubscriptions(channels...)

View File

@@ -639,7 +639,7 @@ channels:
errs = common.AppendError(errs, err)
continue channels
}
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
p.Websocket.RemoveSubscriptions(unsub[i])
continue channels
case strings.EqualFold(strconv.FormatInt(wsTickerDataID, 10),
unsub[i].Channel):
@@ -652,7 +652,7 @@ channels:
errs = common.AppendError(errs, err)
continue
}
p.Websocket.RemoveSuccessfulUnsubscriptions(unsub[i])
p.Websocket.RemoveSubscriptions(unsub[i])
}
if errs != nil {
return errs

View File

@@ -5,7 +5,7 @@ import (
"sync"
)
// NewMatch returns a new matcher
// NewMatch returns a new Match
func NewMatch() *Match {
return &Match{
m: make(map[interface{}]chan []byte),
@@ -21,13 +21,20 @@ type Match struct {
mu sync.Mutex
}
// Matcher defines a payload matching return mechanism
type Matcher struct {
C chan []byte
sig interface{}
m *Match
}
// Incoming matches with request, disregarding the returned payload
func (m *Match) Incoming(signature interface{}) bool {
return m.IncomingWithData(signature, nil)
}
// IncomingWithData matches with requests and takes in the returned payload, to
// be processed outside of a stream processing routine
// be processed outside of a stream processing routine and returns true if a handler was found
func (m *Match) IncomingWithData(signature interface{}, data []byte) bool {
m.mu.Lock()
defer m.mu.Unlock()
@@ -44,35 +51,28 @@ func (m *Match) IncomingWithData(signature interface{}, data []byte) bool {
return false
}
// Sets the signature response channel for incoming data
func (m *Match) set(signature interface{}) (matcher, error) {
// Set the signature response channel for incoming data
func (m *Match) Set(signature interface{}) (Matcher, error) {
var ch chan []byte
m.mu.Lock()
if _, ok := m.m[signature]; ok {
m.mu.Unlock()
return matcher{}, errors.New("signature collision")
return Matcher{}, errors.New("signature collision")
}
// This is buffered so we don't need to wait for receiver.
ch = make(chan []byte, 1)
m.m[signature] = ch
m.mu.Unlock()
return matcher{
return Matcher{
C: ch,
sig: signature,
m: m,
}, nil
}
// matcher defines a payload matching return mechanism
type matcher struct {
C chan []byte
sig interface{}
m *Match
}
// Cleanup closes underlying channel and deletes signature from map
func (m *matcher) Cleanup() {
func (m *Matcher) Cleanup() {
m.m.mu.Lock()
close(m.C)
delete(m.m.m, m.sig)

View File

@@ -18,12 +18,12 @@ func TestMatch(t *testing.T) {
t.Fatal("should not be able to match")
}
m, err := nm.set("hello")
m, err := nm.Set("hello")
if err != nil {
t.Fatal(err)
}
_, err = nm.set("hello")
_, err = nm.Set("hello")
if err == nil {
t.Fatal("error cannot be nil as this collision cannot occur")
}

View File

@@ -31,12 +31,31 @@ type Response struct {
Raw []byte
}
// ChannelSubscription container for streaming subscriptions
// DefaultChannelKey is the fallback key for AddSuccessfulSubscriptions
type DefaultChannelKey struct {
Channel string
Currency currency.Pair
Asset asset.Item
}
// ChannelState tracks the status of a subscription channel
type ChannelState uint8
const (
ChannelStateUnknown ChannelState = iota // ChannelStateUnknown means subscription state is not registered, but doesn't imply Inactive
ChannelSubscribing // ChannelSubscribing means channel is in the process of subscribing
ChannelSubscribed // ChannelSubscribed means the channel has finished a successful and acknowledged subscription
ChannelUnsubscribing // ChannelUnsubscribing means the channel has started to unsubscribe, but not yet confirmed
)
// ChannelSubscription container for streaming subscription channels
type ChannelSubscription struct {
Key any
Channel string
Currency currency.Pair
Asset asset.Item
Params map[string]interface{}
State ChannelState
}
// ConnectionSetup defines variables for an individual stream connection

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
@@ -25,8 +24,16 @@ const (
)
var (
// ErrSubscriptionNotFound defines an error when a subscription is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrSubscribedAlready defines an error when a channel is already subscribed
ErrSubscribedAlready = errors.New("duplicate subscription")
// ErrSubscriptionFailure defines an error when a subscription fails
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrUnsubscribeFailure defines an error when a unsubscribe fails
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
// ErrChannelInStateAlready defines an error when a subscription channel is already in a new state
ErrChannelInStateAlready = errors.New("channel already in state")
// ErrAlreadyDisabled is returned when you double-disable the websocket
ErrAlreadyDisabled = errors.New("websocket already disabled")
// ErrNotConnected defines an error when websocket is not connected
@@ -52,7 +59,8 @@ var (
errSubscriptionsExceedsLimit = errors.New("subscriptions exceeds limit")
errInvalidMaxSubscriptions = errors.New("max subscriptions cannot be less than 0")
errNoSubscriptionsSupplied = errors.New("no subscriptions supplied")
errChannelSubscriptionAlreadySubscribed = errors.New("channel subscription already subscribed")
errChannelAlreadySubscribed = errors.New("channel already subscribed")
errInvalidChannelState = errors.New("invalid Channel state")
)
var globalReporter Reporter
@@ -373,9 +381,9 @@ func (w *Websocket) connectionMonitor() error {
if w.checkAndSetMonitorRunning() {
return errAlreadyRunning
}
w.connectionMutex.RLock()
w.fieldMutex.RLock()
delay := w.connectionMonitorDelay
w.connectionMutex.RUnlock()
w.fieldMutex.RUnlock()
go func() {
timer := time.NewTimer(delay)
@@ -477,7 +485,7 @@ func (w *Websocket) Shutdown() error {
// flush any subscriptions from last connection if needed
w.subscriptionMutex.Lock()
w.subscriptions = nil
w.subscriptions = subscriptionMap{}
w.subscriptionMutex.Unlock()
close(w.ShutdownC)
@@ -537,7 +545,7 @@ func (w *Websocket) FlushChannels() error {
if len(newsubs) != 0 {
// Purge subscription list as there will be conflicts
w.subscriptionMutex.Lock()
w.subscriptions = nil
w.subscriptions = subscriptionMap{}
w.subscriptionMutex.Unlock()
return w.SubscribeToChannels(newsubs)
}
@@ -629,73 +637,73 @@ func (w *Websocket) trafficMonitor() {
}
func (w *Websocket) setConnectedStatus(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.connected = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsConnected returns status of connection
func (w *Websocket) IsConnected() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connected
}
func (w *Websocket) setConnectingStatus(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.connecting = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsConnecting returns status of connecting
func (w *Websocket) IsConnecting() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connecting
}
func (w *Websocket) setEnabled(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.enabled = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsEnabled returns status of enabled
func (w *Websocket) IsEnabled() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.enabled
}
func (w *Websocket) setInit(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.Init = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsInit returns status of init
func (w *Websocket) IsInit() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.Init
}
func (w *Websocket) setTrafficMonitorRunning(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.trafficMonitorRunning = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsTrafficMonitorRunning returns status of the traffic monitor
func (w *Websocket) IsTrafficMonitorRunning() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.trafficMonitorRunning
}
func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) {
w.connectionMutex.Lock()
defer w.connectionMutex.Unlock()
w.fieldMutex.Lock()
defer w.fieldMutex.Unlock()
if w.connectionMonitorRunning {
return true
}
@@ -704,28 +712,28 @@ func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) {
}
func (w *Websocket) setConnectionMonitorRunning(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.connectionMonitorRunning = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsConnectionMonitorRunning returns status of connection monitor
func (w *Websocket) IsConnectionMonitorRunning() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connectionMonitorRunning
}
func (w *Websocket) setDataMonitorRunning(b bool) {
w.connectionMutex.Lock()
w.fieldMutex.Lock()
w.dataMonitorRunning = b
w.connectionMutex.Unlock()
w.fieldMutex.Unlock()
}
// IsDataMonitorRunning returns status of data monitor
func (w *Websocket) IsDataMonitorRunning() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.dataMonitorRunning
}
@@ -862,52 +870,44 @@ func (w *Websocket) GetName() string {
// GetChannelDifference finds the difference between the subscribed channels
// and the new subscription list when pairs are disabled or enabled.
func (w *Websocket) GetChannelDifference(genSubs []ChannelSubscription) (sub, unsub []ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
w.subscriptionMutex.RLock()
unsubMap := make(map[any]ChannelSubscription, len(w.subscriptions))
for k, c := range w.subscriptions {
unsubMap[k] = *c
}
w.subscriptionMutex.RUnlock()
oldsubs:
for x := range w.subscriptions {
for y := range genSubs {
if w.subscriptions[x].Equal(&genSubs[y]) {
continue oldsubs
}
for i := range genSubs {
key := genSubs[i].EnsureKeyed()
if _, ok := unsubMap[key]; ok {
delete(unsubMap, key) // If it's in both then we remove it from the unsubscribe list
} else {
sub = append(sub, genSubs[i]) // If it's in genSubs but not existing subs we want to subscribe
}
unsub = append(unsub, w.subscriptions[x])
}
newsubs:
for x := range genSubs {
for y := range w.subscriptions {
if genSubs[x].Equal(&w.subscriptions[y]) {
continue newsubs
}
}
sub = append(sub, genSubs[x])
for _, c := range unsubMap {
unsub = append(unsub, c)
}
return
}
// UnsubscribeChannels unsubscribes from a websocket channel
func (w *Websocket) UnsubscribeChannels(channels []ChannelSubscription) error {
if len(channels) == 0 {
return fmt.Errorf("%s websocket: channels not populated cannot remove",
w.exchangeName)
return fmt.Errorf("%s websocket: %w", w.exchangeName, errNoSubscriptionsSupplied)
}
w.subscriptionMutex.Lock()
w.subscriptionMutex.RLock()
channels:
for x := range channels {
for y := range w.subscriptions {
if channels[x].Equal(&w.subscriptions[y]) {
continue channels
}
for i := range channels {
key := channels[i].EnsureKeyed()
if _, ok := w.subscriptions[key]; !ok {
w.subscriptionMutex.RUnlock()
return fmt.Errorf("%s websocket: %w: %+v", w.exchangeName, ErrSubscriptionNotFound, channels[i])
}
w.subscriptionMutex.Unlock()
return fmt.Errorf("%s websocket: subscription not found in list: %+v",
w.exchangeName,
channels[x])
}
w.subscriptionMutex.Unlock()
w.subscriptionMutex.RUnlock()
return w.Unsubscriber(channels)
}
@@ -922,69 +922,138 @@ func (w *Websocket) ResubscribeToChannel(subscribedChannel *ChannelSubscription)
// SubscribeToChannels appends supplied channels to channelsToSubscribe
func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error {
err := w.checkSubscriptions(channels)
if err != nil {
if err := w.checkSubscriptions(channels); err != nil {
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
err = w.Subscriber(channels)
if err != nil {
if err := w.Subscriber(channels); err != nil {
return fmt.Errorf("%s websocket: %w", w.exchangeName, common.AppendError(ErrSubscriptionFailure, err))
}
return nil
}
// AddSubscription adds a subscription to the subscription lists
// Unlike AddSubscriptions this method will error if the subscription already exists
func (w *Websocket) AddSubscription(c *ChannelSubscription) error {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
key := c.EnsureKeyed()
if _, ok := w.subscriptions[key]; ok {
return ErrSubscribedAlready
}
n := *c // Fresh copy; we don't want to use the pointer we were given and allow encapsulation/locks to be bypassed
w.subscriptions[key] = &n
return nil
}
// SetSubscriptionState sets an existing subscription state
// returns an error if the subscription is not found, or the new state is already set
func (w *Websocket) SetSubscriptionState(c *ChannelSubscription, state ChannelState) error {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
key := c.EnsureKeyed()
p, ok := w.subscriptions[key]
if !ok {
return ErrSubscriptionNotFound
}
if state == p.State {
return ErrChannelInStateAlready
}
if state > ChannelUnsubscribing {
return errInvalidChannelState
}
p.State = state
return nil
}
// AddSuccessfulSubscriptions adds subscriptions to the subscription lists that
// has been successfully subscribed
func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
w.subscriptions = append(w.subscriptions, channels...)
w.subscriptionMutex.Unlock()
}
// RemoveSuccessfulUnsubscriptions removes subscriptions from the subscription
// list that has been successfulling unsubscribed
func (w *Websocket) RemoveSuccessfulUnsubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
for x := range channels {
for y := range w.subscriptions {
if channels[x].Equal(&w.subscriptions[y]) {
w.subscriptions[y] = w.subscriptions[len(w.subscriptions)-1]
w.subscriptions[len(w.subscriptions)-1] = ChannelSubscription{}
w.subscriptions = w.subscriptions[:len(w.subscriptions)-1]
break
}
}
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
for _, cN := range channels {
c := cN // cN is an iteration var; Not safe to make a pointer to
key := c.EnsureKeyed()
c.State = ChannelSubscribed
w.subscriptions[key] = &c
}
}
// Equal two WebsocketChannelSubscription to determine equality
func (w *ChannelSubscription) Equal(s *ChannelSubscription) bool {
return strings.EqualFold(w.Channel, s.Channel) &&
w.Currency.Equal(s.Currency)
}
// GetSubscriptions returns a copied list of subscriptions
// and is a private member that cannot be manipulated
func (w *Websocket) GetSubscriptions() []ChannelSubscription {
// RemoveSubscriptions removes subscriptions from the subscription list
func (w *Websocket) RemoveSubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
return append(w.subscriptions[:0:0], w.subscriptions...)
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
for i := range channels {
key := channels[i].EnsureKeyed()
delete(w.subscriptions, key)
}
}
// EnsureKeyed sets the default key on a channel if it doesn't have one
// Returns key for convenience
func (c *ChannelSubscription) EnsureKeyed() any {
if c.Key == nil {
c.Key = DefaultChannelKey{
Channel: c.Channel,
Asset: c.Asset,
Currency: c.Currency,
}
}
return c.Key
}
// GetSubscription returns a pointer to a copy of the subscription at the key provided
// returns nil if no subscription is at that key or the key is nil
func (w *Websocket) GetSubscription(key any) *ChannelSubscription {
if key == nil || w == nil || w.subscriptions == nil {
return nil
}
w.subscriptionMutex.RLock()
defer w.subscriptionMutex.RUnlock()
if s, ok := w.subscriptions[key]; ok {
c := *s
return &c
}
return nil
}
// GetSubscriptions returns a new slice of the subscriptions
func (w *Websocket) GetSubscriptions() []ChannelSubscription {
w.subscriptionMutex.RLock()
defer w.subscriptionMutex.RUnlock()
subs := make([]ChannelSubscription, 0, len(w.subscriptions))
for _, c := range w.subscriptions {
subs = append(subs, *c)
}
return subs
}
// SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
w.fieldMutex.Lock()
defer w.fieldMutex.Unlock()
w.canUseAuthenticatedEndpoints = val
}
// CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) CanUseAuthenticatedEndpoints() bool {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.canUseAuthenticatedEndpoints
}
@@ -1018,8 +1087,8 @@ func (w *Websocket) checkSubscriptions(subs []ChannelSubscription) error {
return errNoSubscriptionsSupplied
}
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
w.subscriptionMutex.RLock()
defer w.subscriptionMutex.RUnlock()
if w.MaxSubscriptionsPerConnection > 0 && len(w.subscriptions)+len(subs) > w.MaxSubscriptionsPerConnection {
return fmt.Errorf("%w: current subscriptions: %v, incoming subscriptions: %v, max subscriptions per connection: %v - please reduce enabled pairs",
@@ -1029,12 +1098,12 @@ func (w *Websocket) checkSubscriptions(subs []ChannelSubscription) error {
w.MaxSubscriptionsPerConnection)
}
for x := range subs {
for y := range w.subscriptions {
if subs[x].Equal(&w.subscriptions[y]) {
return fmt.Errorf("%w for %+v", errChannelSubscriptionAlreadySubscribed, subs[x])
}
for i := range subs {
key := subs[i].EnsureKeyed()
if _, ok := w.subscriptions[key]; ok {
return fmt.Errorf("%w for %+v", errChannelAlreadySubscribed, subs[i])
}
}
return nil
}

View File

@@ -22,7 +22,7 @@ import (
// SendMessageReturnResponse will send a WS message to the connection and wait
// for response
func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error) {
m, err := w.Match.set(signature)
m, err := w.Match.Set(signature)
if err != nil {
return nil, err
}

View File

@@ -9,6 +9,7 @@ import (
"fmt"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
@@ -16,8 +17,10 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
)
@@ -52,6 +55,10 @@ type testResponse struct {
RequestID int64 `json:"reqid,omitempty"`
}
type testSubKey struct {
Mood string
}
var defaultSetup = &WebsocketSetup{
ExchangeConfig: &config.Exchange{
Features: &config.FeaturesConfig{
@@ -71,9 +78,9 @@ var defaultSetup = &WebsocketSetup{
GenerateSubscriptions: func() ([]ChannelSubscription, error) {
return []ChannelSubscription{
{Channel: "TestSub"},
{Channel: "TestSub2"},
{Channel: "TestSub3"},
{Channel: "TestSub4"},
{Channel: "TestSub2", Key: "purple"},
{Channel: "TestSub3", Key: testSubKey{"mauve"}},
{Channel: "TestSub4", Key: 42},
}, nil
},
Features: &protocol.Features{Subscribe: true, Unsubscribe: true},
@@ -495,62 +502,48 @@ func TestWebsocket(t *testing.T) {
func TestSubscribeUnsubscribe(t *testing.T) {
t.Parallel()
ws := *New()
err := ws.Setup(defaultSetup)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, ws.Setup(defaultSetup), "WS Setup should not error")
fnSub := func(subs []ChannelSubscription) error {
ws.AddSuccessfulSubscriptions(subs...)
return nil
}
fnUnsub := func(unsubs []ChannelSubscription) error {
ws.RemoveSuccessfulUnsubscriptions(unsubs...)
ws.RemoveSubscriptions(unsubs...)
return nil
}
ws.Subscriber = fnSub
ws.Unsubscriber = fnUnsub
err = ws.UnsubscribeChannels(nil)
if err == nil {
t.Fatal("error cannot be nil")
}
// Generate test sub
subs, err := ws.GenerateSubs()
if err != nil {
t.Fatal(err)
assert.NoError(t, err, "Generating test subscriptions should not error")
assert.ErrorIs(t, ws.UnsubscribeChannels(nil), errNoSubscriptionsSupplied, "Unsubscribing from nil should error")
assert.ErrorIs(t, ws.UnsubscribeChannels(subs), ErrSubscriptionNotFound, "Unsubscribing should error when not subscribed")
assert.Nil(t, ws.GetSubscription(42), "GetSubscription on empty internal map should return")
assert.NoError(t, ws.SubscribeToChannels(subs), "Basic Subscribing should not error")
assert.Len(t, ws.GetSubscriptions(), 4, "Should have 4 subscriptions")
byDefKey := ws.GetSubscription(DefaultChannelKey{Channel: "TestSub"})
if assert.NotNil(t, byDefKey, "GetSubscription by default key should find a channel") {
assert.Equal(t, "TestSub", byDefKey.Channel, "GetSubscription by default key should return a pointer a copy of the right channel")
assert.NotSame(t, byDefKey, ws.subscriptions["TestSub"], "GetSubscription returns a fresh pointer")
}
// unsub when no subscribed channel
err = ws.UnsubscribeChannels(subs)
if err == nil {
t.Fatal("error cannot be nil")
if assert.NotNil(t, ws.GetSubscription("purple"), "GetSubscription by string key should find a channel") {
assert.Equal(t, "TestSub2", ws.GetSubscription("purple").Channel, "GetSubscription by string key should return a pointer a copy of the right channel")
}
err = ws.SubscribeToChannels(subs)
if err != nil {
t.Fatal(err)
if assert.NotNil(t, ws.GetSubscription(testSubKey{"mauve"}), "GetSubscription by type key should find a channel") {
assert.Equal(t, "TestSub3", ws.GetSubscription(testSubKey{"mauve"}).Channel, "GetSubscription by type key should return a pointer a copy of the right channel")
}
// subscribe when already subscribed
err = ws.SubscribeToChannels(subs)
if err == nil {
t.Fatal("error cannot be nil")
}
// subscribe to nothing
err = ws.SubscribeToChannels(nil)
if err == nil {
t.Fatal("error cannot be nil")
}
err = ws.UnsubscribeChannels(subs)
if err != nil {
t.Fatal(err)
if assert.NotNil(t, ws.GetSubscription(42), "GetSubscription by int key should find a channel") {
assert.Equal(t, "TestSub4", ws.GetSubscription(42).Channel, "GetSubscription by int key should return a pointer a copy of the right channel")
}
assert.Nil(t, ws.GetSubscription(nil), "GetSubscription by nil should return nil")
assert.Nil(t, ws.GetSubscription(45), "GetSubscription by invalid key should return nil")
assert.ErrorIs(t, ws.SubscribeToChannels(subs), errChannelAlreadySubscribed, "Subscribe should error when already subscribed")
assert.ErrorIs(t, ws.SubscribeToChannels(nil), errNoSubscriptionsSupplied, "Subscribe to nil should error")
assert.NoError(t, ws.UnsubscribeChannels(subs), "Unsubscribing should not error")
}
// TestResubscribe tests Resubscribing to existing subscriptions
func TestResubscribe(t *testing.T) {
t.Parallel()
ws := *New()
@@ -558,41 +551,66 @@ func TestResubscribe(t *testing.T) {
wackedOutSetup := *defaultSetup
wackedOutSetup.MaxWebsocketSubscriptionsPerConnection = -1
err := ws.Setup(&wackedOutSetup)
if !errors.Is(err, errInvalidMaxSubscriptions) {
t.Fatalf("received: '%v' but expected: '%v'", err, errInvalidMaxSubscriptions)
}
assert.ErrorIs(t, err, errInvalidMaxSubscriptions, "Invalid MaxWebsocketSubscriptionsPerConnection should error")
err = ws.Setup(defaultSetup)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "WS Setup should not error")
fnSub := func(subs []ChannelSubscription) error {
ws.AddSuccessfulSubscriptions(subs...)
return nil
}
fnUnsub := func(unsubs []ChannelSubscription) error {
ws.RemoveSuccessfulUnsubscriptions(unsubs...)
ws.RemoveSubscriptions(unsubs...)
return nil
}
ws.Subscriber = fnSub
ws.Unsubscriber = fnUnsub
channel := []ChannelSubscription{{Channel: "resubTest"}}
err = ws.ResubscribeToChannel(&channel[0])
if err == nil {
t.Fatal("error cannot be nil")
}
err = ws.SubscribeToChannels(channel)
if err != nil {
t.Fatal(err)
}
assert.ErrorIs(t, ws.ResubscribeToChannel(&channel[0]), ErrSubscriptionNotFound, "Resubscribe should error when channel isn't subscribed yet")
assert.NoError(t, ws.SubscribeToChannels(channel), "Subscribe should not error")
assert.NoError(t, ws.ResubscribeToChannel(&channel[0]), "Resubscribe should not error now the channel is subscribed")
}
err = ws.ResubscribeToChannel(&channel[0])
if err != nil {
t.Fatal("error cannot be nil")
}
// TestSubscriptionState tests Subscription state changes
func TestSubscriptionState(t *testing.T) {
t.Parallel()
ws := New()
c := &ChannelSubscription{Key: 42, Channel: "Gophers", State: ChannelSubscribing}
assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), ErrSubscriptionNotFound, "Setting an imaginary sub should error")
assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error")
found := ws.GetSubscription(42)
assert.NotNil(t, found, "Should find the subscription")
assert.Equal(t, ChannelSubscribing, found.State, "Subscription should be Subscribing")
assert.ErrorIs(t, ws.AddSubscription(c), ErrSubscribedAlready, "Adding an already existing sub should error")
assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelSubscribing), ErrChannelInStateAlready, "Setting Same state should error")
assert.ErrorIs(t, ws.SetSubscriptionState(c, ChannelUnsubscribing+1), errInvalidChannelState, "Setting an invalid state should error")
ws.AddSuccessfulSubscriptions(*c)
found = ws.GetSubscription(42)
assert.NotNil(t, found, "Should find the subscription")
assert.Equal(t, found.State, ChannelSubscribed, "Subscription should be subscribed state")
assert.NoError(t, ws.SetSubscriptionState(c, ChannelUnsubscribing), "Setting Unsub state should not error")
found = ws.GetSubscription(42)
assert.Equal(t, found.State, ChannelUnsubscribing, "Subscription should be unsubscribing state")
}
// TestRemoveSubscriptions tests removing a subscription
func TestRemoveSubscriptions(t *testing.T) {
t.Parallel()
ws := New()
c := &ChannelSubscription{Key: 42, Channel: "Unite!"}
assert.NoError(t, ws.AddSubscription(c), "Adding first subscription should not error")
assert.NotNil(t, ws.GetSubscription(42), "Added subscription should be findable")
ws.RemoveSubscriptions(*c)
assert.Nil(t, ws.GetSubscription(42), "Remove should have removed the sub")
}
// TestConnectionMonitorNoConnection logic test
@@ -618,18 +636,64 @@ func TestConnectionMonitorNoConnection(t *testing.T) {
}
}
// TestSliceCopyDoesntImpactBoth logic test
func TestGetSubscriptions(t *testing.T) {
// TestGetSubscription logic test
func TestGetSubscription(t *testing.T) {
t.Parallel()
assert.Nil(t, (*Websocket).GetSubscription(nil, "imaginary"), "GetSubscription on a nil Websocket should return nil")
assert.Nil(t, (&Websocket{}).GetSubscription("empty"), "GetSubscription on a Websocket with no sub map should return nil")
w := Websocket{
subscriptions: []ChannelSubscription{
{
subscriptions: subscriptionMap{
42: {
Channel: "hello3",
},
},
}
if !strings.EqualFold("hello3", w.GetSubscriptions()[0].Channel) {
t.Error("Subscriptions was not copied properly")
assert.Nil(t, w.GetSubscription(43), "GetSubscription with an invalid key should return nil")
c := w.GetSubscription(42)
if assert.NotNil(t, c, "GetSubscription with an valid key should return a channel") {
assert.Equal(t, "hello3", c.Channel, "GetSubscription should return the correct channel details")
}
}
// TestGetSubscriptions logic test
func TestGetSubscriptions(t *testing.T) {
t.Parallel()
w := Websocket{
subscriptions: subscriptionMap{
42: {
Channel: "hello3",
},
},
}
assert.Equal(t, "hello3", w.GetSubscriptions()[0].Channel, "GetSubscriptions should return the correct channel details")
}
// TestEnsureKeyed logic test
func TestEnsureKeyed(t *testing.T) {
t.Parallel()
c := ChannelSubscription{
Channel: "candles",
Asset: asset.Spot,
Currency: currency.NewPair(currency.BTC, currency.USDT),
}
k1, ok := c.EnsureKeyed().(DefaultChannelKey)
if assert.True(t, ok, "EnsureKeyed should return a DefaultChannelKey") {
assert.Exactly(t, k1, c.Key, "EnsureKeyed should set the same key")
assert.Equal(t, k1.Channel, c.Channel, "DefaultChannelKey channel should be correct")
assert.Equal(t, k1.Asset, c.Asset, "DefaultChannelKey asset should be correct")
assert.Equal(t, k1.Currency, c.Currency, "DefaultChannelKey currency should be correct")
}
type platypus string
c = ChannelSubscription{
Key: platypus("Gerald"),
Channel: "orderbook",
Asset: asset.Margin,
Currency: currency.NewPair(currency.ETH, currency.USDC),
}
k2, ok := c.EnsureKeyed().(platypus)
if assert.True(t, ok, "EnsureKeyed should return a platypus") {
assert.Exactly(t, k2, c.Key, "EnsureKeyed should set the same key")
assert.EqualValues(t, "Gerald", k2, "key should have the correct value")
}
}
@@ -1024,15 +1088,10 @@ func TestGetChannelDifference(t *testing.T) {
},
}
subs, unsubs := web.GetChannelDifference(newChans)
if len(subs) != 3 {
t.Fatal("error mismatch")
}
assert.Len(t, subs, 3, "Should get the correct number of subs")
assert.Len(t, unsubs, 0, "Should get the correct number of unsubs")
if len(unsubs) != 0 {
t.Fatal("error mismatch")
}
web.subscriptions = subs
web.AddSuccessfulSubscriptions(subs...)
flushedSubs := []ChannelSubscription{
{
@@ -1041,12 +1100,8 @@ func TestGetChannelDifference(t *testing.T) {
}
subs, unsubs = web.GetChannelDifference(flushedSubs)
if len(subs) != 0 {
t.Fatal("error mismatch")
}
if len(unsubs) != 2 {
t.Fatal("error mismatch")
}
assert.Len(t, subs, 0, "Should get the correct number of subs")
assert.Len(t, unsubs, 2, "Should get the correct number of unsubs")
flushedSubs = []ChannelSubscription{
{
@@ -1058,11 +1113,13 @@ func TestGetChannelDifference(t *testing.T) {
}
subs, unsubs = web.GetChannelDifference(flushedSubs)
if len(subs) != 1 {
t.Fatal("error mismatch")
if assert.Len(t, subs, 1, "Should get the correct number of subs") {
assert.Equal(t, subs[0].Channel, "Test4", "Should subscribe to the right channel")
}
if len(unsubs) != 2 {
t.Fatal("error mismatch")
if assert.Len(t, unsubs, 2, "Should get the correct number of unsubs") {
sort.Slice(unsubs, func(i, j int) bool { return unsubs[i].Channel <= unsubs[j].Channel })
assert.Equal(t, unsubs[0].Channel, "Test1", "Should unsubscribe from the right channels")
assert.Equal(t, unsubs[1].Channel, "Test3", "Should unsubscribe from the right channels")
}
}
@@ -1177,9 +1234,7 @@ func TestFlushChannels(t *testing.T) {
if err != nil {
t.Fatal(err)
}
web.subscriptionMutex.Lock()
web.subscriptions = subs
web.subscriptionMutex.Unlock()
web.AddSuccessfulSubscriptions(subs...)
err = web.FlushChannels()
if err != nil {
t.Fatal(err)
@@ -1199,12 +1254,14 @@ func TestFlushChannels(t *testing.T) {
t.Fatal(err)
}
web.subscriptionMutex.Lock()
web.subscriptions = []ChannelSubscription{
{
web.subscriptions = subscriptionMap{
41: {
Key: 41,
Channel: "match channel",
Currency: currency.NewPair(currency.BTC, currency.AUD),
},
{
42: {
Key: 42,
Channel: "unsub channel",
Currency: currency.NewPair(currency.THETA, currency.USDT),
},
@@ -1416,10 +1473,10 @@ func TestCheckSubscriptions(t *testing.T) {
ws.MaxSubscriptionsPerConnection = 2
ws.subscriptions = []ChannelSubscription{{Channel: "test"}}
err = ws.checkSubscriptions([]ChannelSubscription{{Channel: "test"}})
if !errors.Is(err, errChannelSubscriptionAlreadySubscribed) {
t.Fatalf("received: %v, but expected: %v", err, errChannelSubscriptionAlreadySubscribed)
ws.subscriptions = subscriptionMap{42: {Key: 42, Channel: "test"}}
err = ws.checkSubscriptions([]ChannelSubscription{{Key: 42, Channel: "test"}})
if !errors.Is(err, errChannelAlreadySubscribed) {
t.Fatalf("received: %v, but expected: %v", err, errChannelAlreadySubscribed)
}
err = ws.checkSubscriptions([]ChannelSubscription{{}})

View File

@@ -22,6 +22,8 @@ const (
UnhandledMessage = " - Unhandled websocket message: "
)
type subscriptionMap map[any]*ChannelSubscription
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing
type Websocket struct {
@@ -43,11 +45,11 @@ type Websocket struct {
runningURLAuth string
exchangeName string
m sync.Mutex
connectionMutex sync.RWMutex
fieldMutex sync.RWMutex
connector func() error
subscriptionMutex sync.Mutex
subscriptions []ChannelSubscription
subscriptionMutex sync.RWMutex
subscriptions subscriptionMap
Subscribe chan []ChannelSubscription
Unsubscribe chan []ChannelSubscription

1
go.mod
View File

@@ -3,6 +3,7 @@ module github.com/thrasher-corp/gocryptotrader
go 1.20
require (
github.com/buger/jsonparser v1.1.1
github.com/d5/tengo/v2 v2.16.1
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gorilla/mux v1.8.0

2
go.sum
View File

@@ -55,6 +55,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1 h1:NDBbPmhS+EqABEs5Kg3n/5ZNjy73Pz7SIV+KCeqyXcs=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=

View File

@@ -543,8 +543,8 @@
}
},
"api": {
"authenticatedSupport": false,
"authenticatedWebsocketApiSupport": false,
"authenticatedSupport": true,
"authenticatedWebsocketApiSupport": true,
"endpoints": {
"url": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API",
"urlSecondary": "NON_DEFAULT_HTTP_LINK_TO_EXCHANGE_API",
@@ -571,7 +571,7 @@
},
"enabled": {
"autoPairUpdates": true,
"websocketAPI": false
"websocketAPI": true
}
},
"bankAccounts": [