Bitfinex: Fix websocket trade processing (#1754)

* Bitfinex: Fix WS trade processing

* Add handling for funding trades

Fixes #1746

* Linter: Disable shadow linting for err

It's been a year, and I'm still getting caught out by govet demanding I
don't shadow a var I was deliberately shadowing.
Made worse by an increase in clashes with stylecheck when they both want
opposite things on the same line.

* Bitfinex: Move websocket constants to websocket file

* Bitfinex: Rename channel consts

* Bitfinex: Send individual WS trades down the DataHandler
This commit is contained in:
Gareth Kirwan
2025-02-17 02:20:14 +00:00
committed by GitHub
parent 8fad985669
commit 5463e359bc
6 changed files with 228 additions and 269 deletions

View File

@@ -242,9 +242,7 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
}
m.syncer.PrintTickerSummary(&d[x], "websocket", err)
}
case order.Detail,
ticker.Price,
orderbook.Depth:
case order.Detail, ticker.Price, orderbook.Depth:
return errUseAPointer
case stream.KlineData:
if m.verbose {
@@ -347,7 +345,7 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
m.printAccountHoldingsChangeSummary(d[x])
}
}
case []trade.Data:
case []trade.Data, trade.Data:
if m.verbose {
log.Infof(log.Trade, "%+v", d)
}

File diff suppressed because one or more lines are too long

View File

@@ -1,6 +1,7 @@
package bitfinex
import (
"encoding/json"
"errors"
"sync"
"time"
@@ -8,13 +9,13 @@ import (
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/types"
)
var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
@@ -488,16 +489,18 @@ type WebsocketBook struct {
Period int64
}
// WebsocketTrade holds trade information
type WebsocketTrade struct {
// wsTrade holds trade information
type wsTrade struct {
ID int64
Timestamp int64
Price float64
Timestamp types.Time
Amount float64
// Funding rate of the trade
Rate float64
// Funding offer period in days
Period int64
Price float64
Period int64 // Funding offer period in days
}
// UnmarshalJSON unmarshals json bytes into a wsTrade
func (t *wsTrade) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &[5]any{&t.ID, &t.Timestamp, &t.Amount, &t.Price, &t.Period})
}
// Candle holds OHLC data
@@ -613,63 +616,6 @@ type WebsocketHandshake struct {
Version float64 `json:"version"`
}
const (
authenticatedBitfinexWebsocketEndpoint = "wss://api.bitfinex.com/ws/2"
publicBitfinexWebsocketEndpoint = "wss://api-pub.bitfinex.com/ws/2"
pong = "pong"
wsHeartbeat = "hb"
wsChecksum = "cs"
wsPositionSnapshot = "ps"
wsPositionNew = "pn"
wsPositionUpdate = "pu"
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeExecutionUpdate = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
wsFundingCreditUpdate = "fcu"
wsFundingCreditCancel = "fcc"
wsFundingLoanSnapshot = "fls"
wsFundingLoanNew = "fln"
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdate = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
wsNotification = "n"
wsOrderSnapshot = "os"
wsOrderNew = "on"
wsOrderUpdate = "ou"
wsOrderCancel = "oc"
wsRequest = "-req"
wsOrderNewRequest = wsOrderNew + wsRequest
wsOrderUpdateRequest = wsOrderUpdate + wsRequest
wsOrderCancelRequest = wsOrderCancel + wsRequest
wsFundingOfferSnapshot = "fos"
wsFundingOfferNew = "fon"
wsFundingOfferUpdate = "fou"
wsFundingOfferCancel = "foc"
wsFundingOfferNewRequest = wsFundingOfferNew + wsRequest
wsFundingOfferUpdateRequest = wsFundingOfferUpdate + wsRequest
wsFundingOfferCancelRequest = wsFundingOfferCancel + wsRequest
wsCancelMultipleOrders = "oc_multi"
wsBook = "book"
wsCandles = "candles"
wsTicker = "ticker"
wsTrades = "trades"
wsError = "error"
wsEventSubscribed = "subscribed"
wsEventUnsubscribed = "unsubscribed"
wsEventAuth = "auth"
wsEventError = "error"
wsEventConf = "conf"
wsEventInfo = "info"
)
// WsAuthRequest container for WS auth request
type WsAuthRequest struct {
Event string `json:"event"`

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"math"
"net/http"
"sort"
"strconv"
@@ -33,6 +34,67 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)
var (
errParsingWSField = errors.New("error parsing WS field")
)
const (
authenticatedBitfinexWebsocketEndpoint = "wss://api.bitfinex.com/ws/2"
publicBitfinexWebsocketEndpoint = "wss://api-pub.bitfinex.com/ws/2"
pong = "pong"
wsHeartbeat = "hb"
wsChecksum = "cs"
wsPositionSnapshot = "ps"
wsPositionNew = "pn"
wsPositionUpdate = "pu"
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeUpdated = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
wsFundingCreditUpdate = "fcu"
wsFundingCreditCancel = "fcc"
wsFundingLoanSnapshot = "fls"
wsFundingLoanNew = "fln"
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdated = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
wsNotification = "n"
wsOrderSnapshot = "os"
wsOrderNew = "on"
wsOrderUpdate = "ou"
wsOrderCancel = "oc"
wsRequest = "-req"
wsOrderNewRequest = wsOrderNew + wsRequest
wsOrderUpdateRequest = wsOrderUpdate + wsRequest
wsOrderCancelRequest = wsOrderCancel + wsRequest
wsFundingOfferSnapshot = "fos"
wsFundingOfferNew = "fon"
wsFundingOfferUpdate = "fou"
wsFundingOfferCancel = "foc"
wsFundingOfferNewRequest = wsFundingOfferNew + wsRequest
wsFundingOfferUpdateRequest = wsFundingOfferUpdate + wsRequest
wsFundingOfferCancelRequest = wsFundingOfferCancel + wsRequest
wsCancelMultipleOrders = "oc_multi"
wsBookChannel = "book"
wsCandlesChannel = "candles"
wsTickerChannel = "ticker"
wsTradesChannel = "trades"
wsError = "error"
wsEventSubscribed = "subscribed"
wsEventUnsubscribed = "unsubscribed"
wsEventAuth = "auth"
wsEventError = "error"
wsEventConf = "conf"
wsEventInfo = "info"
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
@@ -54,10 +116,10 @@ var checksumStore = make(map[int]*checksum)
var cMtx sync.Mutex
var subscriptionNames = map[string]string{
subscription.TickerChannel: wsTicker,
subscription.OrderbookChannel: wsBook,
subscription.CandlesChannel: wsCandles,
subscription.AllTradesChannel: wsTrades,
subscription.TickerChannel: wsTickerChannel,
subscription.OrderbookChannel: wsBookChannel,
subscription.CandlesChannel: wsCandlesChannel,
subscription.AllTradesChannel: wsTradesChannel,
}
// WsConnect starts a new websocket connection
@@ -162,8 +224,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
eventType, hasEventType := d[1].(string)
if chanID != 0 {
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
if s := b.Websocket.GetSubscription(chanID); s != nil {
return b.handleWSChannelUpdate(s, respRaw, eventType, d)
}
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw)
@@ -201,8 +263,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return b.handleWSPositionSnapshot(d)
case wsPositionNew, wsPositionUpdate, wsPositionClose:
return b.handleWSPositionUpdate(d)
case wsTradeExecuted, wsTradeExecutionUpdate:
return b.handleWSTradeUpdate(d, eventType)
case wsTradeExecuted, wsTradeUpdated:
return b.handleWSMyTradeUpdate(d, eventType)
case wsFundingOfferSnapshot:
if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 {
if _, ok := snapBundle[0].([]interface{}); ok {
@@ -398,7 +460,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
b.Websocket.DataHandler <- fundingInfo
}
}
case wsFundingTradeExecuted, wsFundingTradeUpdate:
case wsFundingTradeExecuted, wsFundingTradeUpdated:
if data, ok := d[2].([]interface{}); ok && len(data) > 0 {
var wsFundingTrade WsFundingTrade
tradeID, ok := data[0].(float64)
@@ -546,16 +608,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return b.Websocket.Match.RequireMatchWithData("subscribe:"+subID, respRaw)
}
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if eventType == wsChecksum {
switch eventType {
case wsChecksum:
return b.handleWSChecksum(s, d)
}
if eventType == wsHeartbeat {
case wsHeartbeat:
return nil
}
@@ -571,7 +632,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
return b.handleWSAllTrades(s, respRaw)
}
return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
@@ -871,139 +932,77 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf
return nil
}
func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSAllTrades(s *subscription.Subscription, respRaw []byte) error {
feedEnabled := b.IsTradeFeedEnabled()
if !feedEnabled && !b.IsSaveTradeDataEnabled() {
return nil
}
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
if !b.IsSaveTradeDataEnabled() {
return nil
v, valueType, _, err := jsonparser.Get(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err)
}
if c.Asset == asset.MarginFunding {
return nil
}
var tradeHolder []WebsocketTrade
switch len(d) {
case 2:
snapshot, ok := d[1].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot data")
}
for i := range snapshot {
elem, ok := snapshot[i].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot element data")
}
tradeID, ok := elem[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
}
timestamp, ok := elem[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
}
amount, ok := elem[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
}
if len(elem) == 5 {
rate, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
wsTrade.Rate = rate
period, ok := elem[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Period = int64(period)
} else {
price, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
}
wsTrade.Rate = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
case 3:
if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate {
return fmt.Errorf("unhandled WS trade update event: %s", eventType)
}
data, ok := d[2].([]interface{})
if !ok {
return errors.New("trade data type assertion error")
}
tradeID, ok := data[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
}
timestamp, ok := data[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
}
amount, ok := data[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
}
if len(data) == 5 {
rate, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
period, ok := data[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Rate = rate
wsTrade.Period = int64(period)
var wsTrades []*wsTrade
switch valueType {
case jsonparser.String:
if t, err := b.handleWSPublicTradeUpdate(respRaw); err != nil {
return fmt.Errorf("%w `tradesUpdate[2]`: %w", errParsingWSField, err)
} else {
price, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
}
wsTrade.Price = price
wsTrades = []*wsTrade{t}
}
tradeHolder = append(tradeHolder, wsTrade)
case jsonparser.Array:
if wsTrades, err = b.handleWSPublicTradesSnapshot(v); err != nil {
return fmt.Errorf("%w `tradesSnapshot`: %w", errParsingWSField, err)
}
default:
return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
trades := make([]trade.Data, len(wsTrades))
for _, w := range wsTrades {
t := trade.Data{
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
AssetType: s.Asset,
CurrencyPair: s.Pairs[0],
TID: strconv.FormatInt(w.ID, 10),
Timestamp: w.Timestamp.Time().UTC(),
Side: order.Buy,
Amount: w.Amount,
Price: w.Price,
}
if w.Period != 0 {
t.AssetType = asset.MarginFunding
}
if t.Amount < 0 {
t.Side = order.Sell
t.Amount = math.Abs(t.Amount)
}
if feedEnabled {
b.Websocket.DataHandler <- t
}
}
if b.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(b.GetName(), trades...)
}
return err
}
return b.AddTradesToBuffer(trades...)
func (b *Bitfinex) handleWSPublicTradesSnapshot(v []byte) ([]*wsTrade, error) {
var trades []*wsTrade
return trades, json.Unmarshal(v, &trades)
}
func (b *Bitfinex) handleWSPublicTradeUpdate(respRaw []byte) (*wsTrade, error) {
v, _, _, err := jsonparser.Get(respRaw, "[2]")
if err != nil {
return nil, err
}
t := &wsTrade{}
return t, json.Unmarshal(v, t)
}
func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error {
@@ -1175,7 +1174,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error {
return nil
}
func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error {
func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error {
tradeData, ok := d[2].([]interface{})
if !ok {
return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate")
@@ -1748,7 +1747,7 @@ func (b *Bitfinex) subscribeToChan(subs subscription.List) error {
}
// subId is a single round-trip identifier that provides linking sub requests to chanIDs
// Although docs only mention subId for wsBook, it works for all chans
// Although docs only mention subId for wsBookChannel, it works for all chans
subID := strconv.FormatInt(b.Websocket.Conn.GenerateMessageID(false), 10)
req["subId"] = subID
@@ -2218,7 +2217,7 @@ func subToMap(s *subscription.Subscription, a asset.Item, p currency.Pair) map[s
pairFmt.Delimiter = ":"
}
symbol := p.Format(pairFmt).String()
if c == wsCandles {
if c == wsCandlesChannel {
req["key"] = "trade:" + s.Interval.Short() + ":" + prefix + symbol + fundingPeriod
} else {
req["symbol"] = prefix + symbol

View File

@@ -0,0 +1,5 @@
[18788,[[412685577,1580268444802,11.1998,176.3],[412685578,1580268484802,-5,176.29952759],[412685579,1580269005757,4.2,0.1244,12]],1]
[18788,"te",[5690221201,1734237017719,0.00991467,102570],2]
[18788,"tu",[5690221202,1734237017704,-0.01925285,102560],3]
[18788,"fte",[5690221203,1734237018019,0.00991467,102550,30],4]
[18788,"ftu",[5690221204,1734237018094,-0.01925285,102540,30],5]

View File

@@ -643,7 +643,8 @@
},
"enabled": {
"autoPairUpdates": true,
"websocketAPI": true
"websocketAPI": true,
"tradeFeed": true
}
},
"bankAccounts": [