Bitfinex: Resubscribe orderbook after checksum err (#1303)

* Bitfinex: Resubscribe orderbook after checksum err

* Bitfinex: chanForSub return only first match
This commit is contained in:
Gareth Kirwan
2023-09-14 06:38:01 +01:00
committed by GitHub
parent c8ec22fe92
commit ade2d9c5d2
3 changed files with 55 additions and 3 deletions

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/core"
@@ -1854,3 +1855,20 @@ func TestCancelMultipleOrdersV2(t *testing.T) {
t.Error(err)
}
}
func TestChanForSub(t *testing.T) {
t.Parallel()
p := currency.NewPairWithDelimiter("DOGE", "XLM", "-")
s, err := b.chanForSub(wsBook, asset.Spot, p)
assert.ErrorIs(t, err, errSubNotFound, "Correct error returned when stream when sub not found")
assert.Nil(t, s, "No stream returned when sub not found")
// Add a spare sub to ensure we don't get only-answer-is-right syndrome
b.Websocket.AddSuccessfulSubscriptions(stream.ChannelSubscription{Asset: asset.Spot, Currency: btcusdPair, Channel: wsTicker})
want := stream.ChannelSubscription{Asset: asset.Spot, Currency: p, Channel: wsBook}
b.Websocket.AddSuccessfulSubscriptions(want)
s, err = b.chanForSub(wsBook, asset.Spot, p)
assert.Nil(t, err, "No error returned when sub found")
assert.EqualValues(t, want, *s, "Correct Sub found")
}

View File

@@ -11,8 +11,9 @@ import (
)
var (
errTypeAssert = errors.New("type assertion failed")
errSetCannotBeEmpty = errors.New("set cannot be empty")
errSubNotFound = errors.New("could not find matching subscription")
errTypeAssert = errors.New("type assertion failed")
)
// AccountV2Data stores account v2 data

View File

@@ -1525,8 +1525,11 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
err)
}
err = validateCRC32(ob, checkme.Token)
if err != nil {
if err = validateCRC32(ob, checkme.Token); err != nil {
log.Errorf(log.WebsocketMgr, "%s websocket orderbook update error, will resubscribe orderbook: %v", b.Name, err)
if suberr := b.resubOrderbook(p, assetType); suberr != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, suberr)
}
return err
}
}
@@ -1534,6 +1537,36 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
return b.Websocket.Orderbook.Update(&orderbookUpdate)
}
// resubOrderbook resubscribes the orderbook after a consistency error, probably a failed checksum,
// which forces a fresh snapshot. If we don't do this the orderbook will keep erroring and drifting.
func (b *Bitfinex) resubOrderbook(p currency.Pair, assetType asset.Item) error {
if err := b.Websocket.Orderbook.FlushOrderbook(p, assetType); err != nil {
return err
}
c, err := b.chanForSub(wsBook, assetType, p)
if err != nil {
return err
}
return b.Websocket.ResubscribeToChannel(c)
}
// chanForSub returns an existing channel subscription for a given channel/asset/pair
func (b *Bitfinex) chanForSub(cName string, assetType asset.Item, pair currency.Pair) (*stream.ChannelSubscription, error) {
want := &stream.ChannelSubscription{
Channel: cName,
Currency: pair,
Asset: assetType,
}
subs := b.Websocket.GetSubscriptions()
for i := range subs {
if subs[i].Equal(want) {
return &subs[i], nil
}
}
return nil, errSubNotFound
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
var wsPairFormat = currency.PairFormat{Uppercase: true}