From 4531fdcb4a6af0ff25d54d907042b7c33563767d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Rasc=C3=A3o?= Date: Thu, 28 Oct 2021 00:25:15 +0100 Subject: [PATCH] exchanges/websocket: Expose Trades/Fills feed through data channel (#814) * Expose trade feed websocket exchange data through data channel Most relevant to applications that import GCT as a lib, this allows them to (through configuration, disabled by default) receive trade data through the data channel similarly to the orderbook feed. * exchanges: allow exposure of trade websocket feed through data channel * Expose fill feed websocket abstracted exchange data through data channel * exchanges: allow exposure of fill websocket feed through data channel --- config/config_types.go | 2 + engine/websocketroutine_manager.go | 10 +++++ exchanges/exchange.go | 50 ++++++++++++++++++++++ exchanges/exchange_types.go | 2 + exchanges/fill/fill.go | 22 ++++++++++ exchanges/fill/fill_types.go | 30 +++++++++++++ exchanges/ftx/ftx_websocket.go | 44 +++++++++++++++++-- exchanges/ftx/ftx_websocket_test.go | 65 ++++++++++++++++++----------- exchanges/ftx/ftx_wrapper.go | 2 + exchanges/stream/websocket.go | 13 +++++- exchanges/stream/websocket_types.go | 11 +++++ exchanges/trade/trade.go | 29 +++++++++++++ exchanges/trade/trade_types.go | 8 ++++ log/logger_setup.go | 1 + log/sublogger_types.go | 1 + 15 files changed, 261 insertions(+), 29 deletions(-) create mode 100644 exchanges/fill/fill.go create mode 100644 exchanges/fill/fill_types.go diff --git a/config/config_types.go b/config/config_types.go index 428f220c..38810f3f 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -282,6 +282,8 @@ type FeaturesEnabledConfig struct { AutoPairUpdates bool `json:"autoPairUpdates"` Websocket bool `json:"websocketAPI"` SaveTradeData bool `json:"saveTradeData"` + TradeFeed bool `json:"tradeFeed"` + FillsFeed bool `json:"fillsFeed"` } // FeaturesConfig stores the exchanges supported and enabled features diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index 4b96e1e4..3e3ffc9b 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -8,10 +8,12 @@ import ( "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/account" + "github.com/thrasher-corp/gocryptotrader/exchanges/fill" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" ) @@ -245,6 +247,14 @@ func (m *websocketRoutineManager) WebsocketDataHandler(exchName string, data int if m.verbose { m.printAccountHoldingsChangeSummary(d) } + case []trade.Data: + if m.verbose { + log.Infof(log.Trade, "%+v", d) + } + case []fill.Data: + if m.verbose { + log.Infof(log.Fill, "%+v", d) + } default: if m.verbose { log.Warnf(log.WebsocketMgr, diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 93b0f975..83093aa0 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -167,6 +167,14 @@ func (b *Base) SetFeatureDefaults() { b.SetSaveTradeDataStatus(b.Config.Features.Enabled.SaveTradeData) } + if b.IsTradeFeedEnabled() != b.Config.Features.Enabled.TradeFeed { + b.SetTradeFeedStatus(b.Config.Features.Enabled.TradeFeed) + } + + if b.IsFillsFeedEnabled() != b.Config.Features.Enabled.FillsFeed { + b.SetFillsFeedStatus(b.Config.Features.Enabled.FillsFeed) + } + b.Features.Enabled.AutoPairUpdates = b.Config.Features.Enabled.AutoPairUpdates } } @@ -1208,6 +1216,48 @@ func (b *Base) SetSaveTradeDataStatus(enabled bool) { } } +// IsTradeFeedEnabled checks the state of +// TradeFeed in a concurrent-friendly manner +func (b *Base) IsTradeFeedEnabled() bool { + b.settingsMutex.RLock() + isEnabled := b.Features.Enabled.TradeFeed + b.settingsMutex.RUnlock() + return isEnabled +} + +// SetTradeFeedStatus locks and sets the status of +// the config and the exchange's setting for TradeFeed +func (b *Base) SetTradeFeedStatus(enabled bool) { + b.settingsMutex.Lock() + defer b.settingsMutex.Unlock() + b.Features.Enabled.TradeFeed = enabled + b.Config.Features.Enabled.TradeFeed = enabled + if b.Verbose { + log.Debugf(log.Trade, "Set %v 'TradeFeed' to %v", b.Name, enabled) + } +} + +// IsFillsFeedEnabled checks the state of +// FillsFeed in a concurrent-friendly manner +func (b *Base) IsFillsFeedEnabled() bool { + b.settingsMutex.RLock() + isEnabled := b.Features.Enabled.FillsFeed + b.settingsMutex.RUnlock() + return isEnabled +} + +// SetFillsFeedStatus locks and sets the status of +// the config and the exchange's setting for FillsFeed +func (b *Base) SetFillsFeedStatus(enabled bool) { + b.settingsMutex.Lock() + defer b.settingsMutex.Unlock() + b.Features.Enabled.FillsFeed = enabled + b.Config.Features.Enabled.FillsFeed = enabled + if b.Verbose { + log.Debugf(log.Trade, "Set %v 'FillsFeed' to %v", b.Name, enabled) + } +} + // NewEndpoints declares default and running URLs maps func (b *Base) NewEndpoints() *Endpoints { return &Endpoints{ diff --git a/exchanges/exchange_types.go b/exchanges/exchange_types.go index 30ad3440..83f5d868 100644 --- a/exchanges/exchange_types.go +++ b/exchanges/exchange_types.go @@ -159,6 +159,8 @@ type FeaturesEnabled struct { AutoPairUpdates bool Kline kline.ExchangeCapabilitiesEnabled SaveTradeData bool + TradeFeed bool + FillsFeed bool } // FeaturesSupported stores the exchanges supported features diff --git a/exchanges/fill/fill.go b/exchanges/fill/fill.go new file mode 100644 index 00000000..66dfa22f --- /dev/null +++ b/exchanges/fill/fill.go @@ -0,0 +1,22 @@ +package fill + +// Setup sets up the fill processor +func (f *Fills) Setup(fillsFeedEnabled bool, c chan interface{}) { + f.dataHandler = c + f.fillsFeedEnabled = fillsFeedEnabled +} + +// Update disseminates fill data through the data channel if so +// configured +func (f *Fills) Update(data ...Data) error { + if len(data) == 0 { + // nothing to do + return nil + } + + if f.fillsFeedEnabled { + f.dataHandler <- data + } + + return nil +} diff --git a/exchanges/fill/fill_types.go b/exchanges/fill/fill_types.go new file mode 100644 index 00000000..9b6f4a86 --- /dev/null +++ b/exchanges/fill/fill_types.go @@ -0,0 +1,30 @@ +package fill + +import ( + "time" + + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/order" +) + +// Fills is used to hold data and methods related to fill dissemination +type Fills struct { + dataHandler chan interface{} + fillsFeedEnabled bool +} + +// Data defines fill data +type Data struct { + ID string + Timestamp time.Time + Exchange string + AssetType asset.Item + CurrencyPair currency.Pair + Side order.Side + OrderID string + ClientOrderID string + TradeID string + Price float64 + Amount float64 +} diff --git a/exchanges/ftx/ftx_websocket.go b/exchanges/ftx/ftx_websocket.go index ad0dfa02..311b39bd 100644 --- a/exchanges/ftx/ftx_websocket.go +++ b/exchanges/ftx/ftx_websocket.go @@ -17,6 +17,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/fill" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" @@ -298,7 +299,10 @@ func (f *FTX) wsHandleData(respRaw []byte) error { return err } case wsTrades: - if !f.IsSaveTradeDataEnabled() { + saveTradeData := f.IsSaveTradeDataEnabled() + + if !saveTradeData && + !f.IsTradeFeedEnabled() { return nil } var resultData WsTradeDataStore @@ -327,7 +331,7 @@ func (f *FTX) wsHandleData(respRaw []byte) error { TID: strconv.FormatInt(resultData.TradeData[z].ID, 10), }) } - return trade.AddTradesToBuffer(f.Name, trades...) + return f.Websocket.Trade.Update(saveTradeData, trades...) case wsOrders: var resultData WsOrderDataStore err = json.Unmarshal(respRaw, &resultData) @@ -377,12 +381,46 @@ func (f *FTX) wsHandleData(respRaw []byte) error { resp.Pair = pair f.Websocket.DataHandler <- &resp case wsFills: + if !f.IsFillsFeedEnabled() { + return nil + } + var resultData WsFillsDataStore err = json.Unmarshal(respRaw, &resultData) if err != nil { return err } - f.Websocket.DataHandler <- resultData.FillsData + + var side order.Side + side, err = order.StringToOrderSide(resultData.FillsData.Side) + if err != nil { + f.Websocket.DataHandler <- order.ClassificationError{ + Exchange: f.Name, + Err: err, + } + } + + p, err = currency.NewPairFromString(resultData.FillsData.Market) + if err != nil { + return err + } + a, err = f.GetPairAssetType(p) + if err != nil { + return err + } + + return f.Websocket.Fills.Update(fill.Data{ + ID: strconv.FormatInt(resultData.FillsData.ID, 10), + Timestamp: resultData.FillsData.Time, + Exchange: f.Name, + AssetType: a, + CurrencyPair: p, + Side: side, + OrderID: strconv.FormatInt(resultData.FillsData.OrderID, 10), + TradeID: strconv.FormatInt(resultData.FillsData.TradeID, 10), + Price: resultData.FillsData.Price, + Amount: resultData.FillsData.Size, + }) default: f.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: f.Name + stream.UnhandledMessage + string(respRaw)} } diff --git a/exchanges/ftx/ftx_websocket_test.go b/exchanges/ftx/ftx_websocket_test.go index 4986335d..4326592c 100644 --- a/exchanges/ftx/ftx_websocket_test.go +++ b/exchanges/ftx/ftx_websocket_test.go @@ -1,12 +1,14 @@ package ftx import ( + "fmt" "testing" "time" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/fill" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" @@ -20,8 +22,20 @@ func parseRaw(t *testing.T, input string) interface{} { Quote: currency.USDT, }, } + + dataC := make(chan interface{}, 1) + + fills := fill.Fills{} + fills.Setup(true, dataC) + x := FTX{ exchange.Base{ + Name: "FTX", + Features: exchange.Features{ + Enabled: exchange.FeaturesEnabled{ + FillsFeed: true, + }, + }, CurrencyPairs: currency.PairsManager{ Pairs: map[asset.Item]*currency.PairStore{ asset.Spot: { @@ -35,7 +49,8 @@ func parseRaw(t *testing.T, input string) interface{} { }, }, Websocket: &stream.Websocket{ - DataHandler: make(chan interface{}, 1), + DataHandler: dataC, + Fills: fills, }, }, } @@ -43,7 +58,15 @@ func parseRaw(t *testing.T, input string) interface{} { if err := x.wsHandleData([]byte(input)); err != nil { t.Fatal(err) } - return <-x.Websocket.DataHandler + + var ret interface{} + select { + case ret = <-x.Websocket.DataHandler: + default: + t.Error(fmt.Errorf("timed out waiting for channel data")) + } + + return ret } func TestFTX_wsHandleData_Details(t *testing.T) { @@ -153,10 +176,7 @@ func TestFTX_wsHandleData_wsFills(t *testing.T) { "type": "update", "data": { "id": 1234567890, - "market": "MARKET", - "future": "FUTURE", - "baseCurrency": "BTC", - "quoteCurrency": "USDT", + "market": "BTC-USDT", "type": "order", "side": "sell", "price": 32768, @@ -171,27 +191,22 @@ func TestFTX_wsHandleData_wsFills(t *testing.T) { } }` p := parseRaw(t, input) - x, ok := p.(WsFills) + x, ok := p.([]fill.Data) if !ok { - t.Fatalf("have %T, want ftx.WsFills", p) + t.Fatalf("have %T, want []fill.Data", p) } - if x.ID != 1234567890 || - x.Market != "MARKET" || - x.Future != "FUTURE" || - x.BaseCurrency != "BTC" || - x.QuoteCurrency != "USDT" || - x.Type != "order" || - x.Side != "sell" || - x.Price != 32768 || - x.Size != 2 || - x.OrderID != 23456789012 || - !x.Time.Equal(time.Unix(1628346762, 373010000).UTC()) || - x.TradeID != 3456789012 || - x.FeeRate != 8 || - x.Fee != 16 || - x.FeeCurrency != "FTT" || - x.Liquidity != "maker" { - t.Error("parsed values do not match") + + if x[0].Exchange != "FTX" || + x[0].ID != "1234567890" || + x[0].OrderID != "23456789012" || + x[0].CurrencyPair.Base.String() != "BTC" || + x[0].CurrencyPair.Quote.String() != "USDT" || + x[0].Side != order.Sell || + x[0].TradeID != "3456789012" || + x[0].Price != 32768 || + x[0].Amount != 2 || + !x[0].Timestamp.Equal(time.Unix(1628346762, 373010000).UTC()) { + t.Errorf("parsed values do not match, x: %#v", x) } } diff --git a/exchanges/ftx/ftx_wrapper.go b/exchanges/ftx/ftx_wrapper.go index b7697be0..8954efe9 100644 --- a/exchanges/ftx/ftx_wrapper.go +++ b/exchanges/ftx/ftx_wrapper.go @@ -190,6 +190,8 @@ func (f *FTX) Setup(exch *config.Exchange) error { Unsubscriber: f.Unsubscribe, GenerateSubscriptions: f.GenerateDefaultSubscriptions, Features: &f.Features.Supports.WebsocketCapabilities, + TradeFeed: f.Features.Enabled.TradeFeed, + FillsFeed: f.Features.Enabled.FillsFeed, }) if err != nil { return err diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 8d9609b8..da171b50 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -145,11 +145,22 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.Wg = new(sync.WaitGroup) w.SetCanUseAuthenticatedEndpoints(s.ExchangeConfig.API.AuthenticatedWebsocketSupport) - return w.Orderbook.Setup(s.ExchangeConfig, + if err := w.Orderbook.Setup(s.ExchangeConfig, s.SortBuffer, s.SortBufferByUpdateIDs, s.UpdateEntriesByID, + w.DataHandler); err != nil { + return err + } + + w.Trade.Setup(w.exchangeName, + s.TradeFeed, w.DataHandler) + + w.Fills.Setup(s.FillsFeed, + w.DataHandler) + + return nil } // SetupNewConnection sets up an auth or unauth streaming connection diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 189a5c31..4c3c4827 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -6,8 +6,10 @@ import ( "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/config" + "github.com/thrasher-corp/gocryptotrader/exchanges/fill" "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" ) // Websocket functionality list and state consts @@ -72,6 +74,12 @@ type Websocket struct { // Orderbook is a local buffer of orderbooks Orderbook buffer.Orderbook + // Trade is a notifier of occurring trades + Trade trade.Trade + + // Fills is a notifier of occurring fills + Fills fill.Fills + // trafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} // ReadMessageErrors will received all errors from ws.ReadMessage() and @@ -100,6 +108,9 @@ type WebsocketSetup struct { SortBuffer bool SortBufferByUpdateIDs bool UpdateEntriesByID bool + TradeFeed bool + // Fill data config values + FillsFeed bool } // WebsocketConnection contains all the data needed to send a message to a WS diff --git a/exchanges/trade/trade.go b/exchanges/trade/trade.go index 1aa09777..b5b4e049 100644 --- a/exchanges/trade/trade.go +++ b/exchanges/trade/trade.go @@ -27,6 +27,35 @@ func (p *Processor) setup(wg *sync.WaitGroup) { go p.Run(wg) } +// Setup configures necessary fields to the `Trade` structure that govern trade data +// processing. +func (t *Trade) Setup(exchangeName string, tradeFeedEnabled bool, c chan interface{}) { + t.exchangeName = exchangeName + t.dataHandler = c + t.tradeFeedEnabled = tradeFeedEnabled +} + +// Update processes trade data, either by saving it or routing it through +// the data channel. +func (t *Trade) Update(save bool, data ...Data) error { + if len(data) == 0 { + // nothing to do + return nil + } + + if t.tradeFeedEnabled { + t.dataHandler <- data + } + + if save { + if err := AddTradesToBuffer(t.exchangeName, data...); err != nil { + return err + } + } + + return nil +} + // AddTradesToBuffer will push trade data onto the buffer func AddTradesToBuffer(exchangeName string, data ...Data) error { cfg := database.DB.GetConfig() diff --git a/exchanges/trade/trade_types.go b/exchanges/trade/trade_types.go index 337611c9..0a7ec8a0 100644 --- a/exchanges/trade/trade_types.go +++ b/exchanges/trade/trade_types.go @@ -24,6 +24,14 @@ var ( ErrNoTradesSupplied = errors.New("no trades supplied") ) +// Trade used to hold data and methods related to trade dissemination and +// storage +type Trade struct { + exchangeName string + dataHandler chan interface{} + tradeFeedEnabled bool +} + // Data defines trade data type Data struct { ID uuid.UUID `json:"ID,omitempty"` diff --git a/log/logger_setup.go b/log/logger_setup.go index 05e8e5b9..a58c29ee 100644 --- a/log/logger_setup.go +++ b/log/logger_setup.go @@ -162,4 +162,5 @@ func init() { Ticker = registerNewSubLogger("TICKER") OrderBook = registerNewSubLogger("ORDERBOOK") Trade = registerNewSubLogger("TRADE") + Fill = registerNewSubLogger("FILL") } diff --git a/log/sublogger_types.go b/log/sublogger_types.go index 52e6e57f..e264a14e 100644 --- a/log/sublogger_types.go +++ b/log/sublogger_types.go @@ -31,6 +31,7 @@ var ( Ticker *SubLogger OrderBook *SubLogger Trade *SubLogger + Fill *SubLogger ) // logFields is used to store data in a non-global and thread-safe manner