exchanges/websocket: Expose Trades/Fills feed through data channel (#814)

* Expose trade feed websocket exchange data through data channel

Most relevant to applications that import GCT as a lib, this allows
them to (through configuration, disabled by default) receive trade data
through the data channel similarly to the orderbook feed.

* exchanges: allow exposure of trade websocket feed through data channel

* Expose fill feed websocket abstracted exchange data through data channel

* exchanges: allow exposure of fill websocket feed through data channel
This commit is contained in:
Luis Rascão
2021-10-28 00:25:15 +01:00
committed by GitHub
parent 8617b50ff6
commit 4531fdcb4a
15 changed files with 261 additions and 29 deletions

View File

@@ -282,6 +282,8 @@ type FeaturesEnabledConfig struct {
AutoPairUpdates bool `json:"autoPairUpdates"`
Websocket bool `json:"websocketAPI"`
SaveTradeData bool `json:"saveTradeData"`
TradeFeed bool `json:"tradeFeed"`
FillsFeed bool `json:"fillsFeed"`
}
// FeaturesConfig stores the exchanges supported and enabled features

View File

@@ -8,10 +8,12 @@ import (
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
@@ -245,6 +247,14 @@ func (m *websocketRoutineManager) WebsocketDataHandler(exchName string, data int
if m.verbose {
m.printAccountHoldingsChangeSummary(d)
}
case []trade.Data:
if m.verbose {
log.Infof(log.Trade, "%+v", d)
}
case []fill.Data:
if m.verbose {
log.Infof(log.Fill, "%+v", d)
}
default:
if m.verbose {
log.Warnf(log.WebsocketMgr,

View File

@@ -167,6 +167,14 @@ func (b *Base) SetFeatureDefaults() {
b.SetSaveTradeDataStatus(b.Config.Features.Enabled.SaveTradeData)
}
if b.IsTradeFeedEnabled() != b.Config.Features.Enabled.TradeFeed {
b.SetTradeFeedStatus(b.Config.Features.Enabled.TradeFeed)
}
if b.IsFillsFeedEnabled() != b.Config.Features.Enabled.FillsFeed {
b.SetFillsFeedStatus(b.Config.Features.Enabled.FillsFeed)
}
b.Features.Enabled.AutoPairUpdates = b.Config.Features.Enabled.AutoPairUpdates
}
}
@@ -1208,6 +1216,48 @@ func (b *Base) SetSaveTradeDataStatus(enabled bool) {
}
}
// IsTradeFeedEnabled checks the state of
// TradeFeed in a concurrent-friendly manner
func (b *Base) IsTradeFeedEnabled() bool {
b.settingsMutex.RLock()
isEnabled := b.Features.Enabled.TradeFeed
b.settingsMutex.RUnlock()
return isEnabled
}
// SetTradeFeedStatus locks and sets the status of
// the config and the exchange's setting for TradeFeed
func (b *Base) SetTradeFeedStatus(enabled bool) {
b.settingsMutex.Lock()
defer b.settingsMutex.Unlock()
b.Features.Enabled.TradeFeed = enabled
b.Config.Features.Enabled.TradeFeed = enabled
if b.Verbose {
log.Debugf(log.Trade, "Set %v 'TradeFeed' to %v", b.Name, enabled)
}
}
// IsFillsFeedEnabled checks the state of
// FillsFeed in a concurrent-friendly manner
func (b *Base) IsFillsFeedEnabled() bool {
b.settingsMutex.RLock()
isEnabled := b.Features.Enabled.FillsFeed
b.settingsMutex.RUnlock()
return isEnabled
}
// SetFillsFeedStatus locks and sets the status of
// the config and the exchange's setting for FillsFeed
func (b *Base) SetFillsFeedStatus(enabled bool) {
b.settingsMutex.Lock()
defer b.settingsMutex.Unlock()
b.Features.Enabled.FillsFeed = enabled
b.Config.Features.Enabled.FillsFeed = enabled
if b.Verbose {
log.Debugf(log.Trade, "Set %v 'FillsFeed' to %v", b.Name, enabled)
}
}
// NewEndpoints declares default and running URLs maps
func (b *Base) NewEndpoints() *Endpoints {
return &Endpoints{

View File

@@ -159,6 +159,8 @@ type FeaturesEnabled struct {
AutoPairUpdates bool
Kline kline.ExchangeCapabilitiesEnabled
SaveTradeData bool
TradeFeed bool
FillsFeed bool
}
// FeaturesSupported stores the exchanges supported features

22
exchanges/fill/fill.go Normal file
View File

@@ -0,0 +1,22 @@
package fill
// Setup sets up the fill processor
func (f *Fills) Setup(fillsFeedEnabled bool, c chan interface{}) {
f.dataHandler = c
f.fillsFeedEnabled = fillsFeedEnabled
}
// Update disseminates fill data through the data channel if so
// configured
func (f *Fills) Update(data ...Data) error {
if len(data) == 0 {
// nothing to do
return nil
}
if f.fillsFeedEnabled {
f.dataHandler <- data
}
return nil
}

View File

@@ -0,0 +1,30 @@
package fill
import (
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
)
// Fills is used to hold data and methods related to fill dissemination
type Fills struct {
dataHandler chan interface{}
fillsFeedEnabled bool
}
// Data defines fill data
type Data struct {
ID string
Timestamp time.Time
Exchange string
AssetType asset.Item
CurrencyPair currency.Pair
Side order.Side
OrderID string
ClientOrderID string
TradeID string
Price float64
Amount float64
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
@@ -298,7 +299,10 @@ func (f *FTX) wsHandleData(respRaw []byte) error {
return err
}
case wsTrades:
if !f.IsSaveTradeDataEnabled() {
saveTradeData := f.IsSaveTradeDataEnabled()
if !saveTradeData &&
!f.IsTradeFeedEnabled() {
return nil
}
var resultData WsTradeDataStore
@@ -327,7 +331,7 @@ func (f *FTX) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(resultData.TradeData[z].ID, 10),
})
}
return trade.AddTradesToBuffer(f.Name, trades...)
return f.Websocket.Trade.Update(saveTradeData, trades...)
case wsOrders:
var resultData WsOrderDataStore
err = json.Unmarshal(respRaw, &resultData)
@@ -377,12 +381,46 @@ func (f *FTX) wsHandleData(respRaw []byte) error {
resp.Pair = pair
f.Websocket.DataHandler <- &resp
case wsFills:
if !f.IsFillsFeedEnabled() {
return nil
}
var resultData WsFillsDataStore
err = json.Unmarshal(respRaw, &resultData)
if err != nil {
return err
}
f.Websocket.DataHandler <- resultData.FillsData
var side order.Side
side, err = order.StringToOrderSide(resultData.FillsData.Side)
if err != nil {
f.Websocket.DataHandler <- order.ClassificationError{
Exchange: f.Name,
Err: err,
}
}
p, err = currency.NewPairFromString(resultData.FillsData.Market)
if err != nil {
return err
}
a, err = f.GetPairAssetType(p)
if err != nil {
return err
}
return f.Websocket.Fills.Update(fill.Data{
ID: strconv.FormatInt(resultData.FillsData.ID, 10),
Timestamp: resultData.FillsData.Time,
Exchange: f.Name,
AssetType: a,
CurrencyPair: p,
Side: side,
OrderID: strconv.FormatInt(resultData.FillsData.OrderID, 10),
TradeID: strconv.FormatInt(resultData.FillsData.TradeID, 10),
Price: resultData.FillsData.Price,
Amount: resultData.FillsData.Size,
})
default:
f.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: f.Name + stream.UnhandledMessage + string(respRaw)}
}

View File

@@ -1,12 +1,14 @@
package ftx
import (
"fmt"
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
@@ -20,8 +22,20 @@ func parseRaw(t *testing.T, input string) interface{} {
Quote: currency.USDT,
},
}
dataC := make(chan interface{}, 1)
fills := fill.Fills{}
fills.Setup(true, dataC)
x := FTX{
exchange.Base{
Name: "FTX",
Features: exchange.Features{
Enabled: exchange.FeaturesEnabled{
FillsFeed: true,
},
},
CurrencyPairs: currency.PairsManager{
Pairs: map[asset.Item]*currency.PairStore{
asset.Spot: {
@@ -35,7 +49,8 @@ func parseRaw(t *testing.T, input string) interface{} {
},
},
Websocket: &stream.Websocket{
DataHandler: make(chan interface{}, 1),
DataHandler: dataC,
Fills: fills,
},
},
}
@@ -43,7 +58,15 @@ func parseRaw(t *testing.T, input string) interface{} {
if err := x.wsHandleData([]byte(input)); err != nil {
t.Fatal(err)
}
return <-x.Websocket.DataHandler
var ret interface{}
select {
case ret = <-x.Websocket.DataHandler:
default:
t.Error(fmt.Errorf("timed out waiting for channel data"))
}
return ret
}
func TestFTX_wsHandleData_Details(t *testing.T) {
@@ -153,10 +176,7 @@ func TestFTX_wsHandleData_wsFills(t *testing.T) {
"type": "update",
"data": {
"id": 1234567890,
"market": "MARKET",
"future": "FUTURE",
"baseCurrency": "BTC",
"quoteCurrency": "USDT",
"market": "BTC-USDT",
"type": "order",
"side": "sell",
"price": 32768,
@@ -171,27 +191,22 @@ func TestFTX_wsHandleData_wsFills(t *testing.T) {
}
}`
p := parseRaw(t, input)
x, ok := p.(WsFills)
x, ok := p.([]fill.Data)
if !ok {
t.Fatalf("have %T, want ftx.WsFills", p)
t.Fatalf("have %T, want []fill.Data", p)
}
if x.ID != 1234567890 ||
x.Market != "MARKET" ||
x.Future != "FUTURE" ||
x.BaseCurrency != "BTC" ||
x.QuoteCurrency != "USDT" ||
x.Type != "order" ||
x.Side != "sell" ||
x.Price != 32768 ||
x.Size != 2 ||
x.OrderID != 23456789012 ||
!x.Time.Equal(time.Unix(1628346762, 373010000).UTC()) ||
x.TradeID != 3456789012 ||
x.FeeRate != 8 ||
x.Fee != 16 ||
x.FeeCurrency != "FTT" ||
x.Liquidity != "maker" {
t.Error("parsed values do not match")
if x[0].Exchange != "FTX" ||
x[0].ID != "1234567890" ||
x[0].OrderID != "23456789012" ||
x[0].CurrencyPair.Base.String() != "BTC" ||
x[0].CurrencyPair.Quote.String() != "USDT" ||
x[0].Side != order.Sell ||
x[0].TradeID != "3456789012" ||
x[0].Price != 32768 ||
x[0].Amount != 2 ||
!x[0].Timestamp.Equal(time.Unix(1628346762, 373010000).UTC()) {
t.Errorf("parsed values do not match, x: %#v", x)
}
}

View File

@@ -190,6 +190,8 @@ func (f *FTX) Setup(exch *config.Exchange) error {
Unsubscriber: f.Unsubscribe,
GenerateSubscriptions: f.GenerateDefaultSubscriptions,
Features: &f.Features.Supports.WebsocketCapabilities,
TradeFeed: f.Features.Enabled.TradeFeed,
FillsFeed: f.Features.Enabled.FillsFeed,
})
if err != nil {
return err

View File

@@ -145,11 +145,22 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
w.Wg = new(sync.WaitGroup)
w.SetCanUseAuthenticatedEndpoints(s.ExchangeConfig.API.AuthenticatedWebsocketSupport)
return w.Orderbook.Setup(s.ExchangeConfig,
if err := w.Orderbook.Setup(s.ExchangeConfig,
s.SortBuffer,
s.SortBufferByUpdateIDs,
s.UpdateEntriesByID,
w.DataHandler); err != nil {
return err
}
w.Trade.Setup(w.exchangeName,
s.TradeFeed,
w.DataHandler)
w.Fills.Setup(s.FillsFeed,
w.DataHandler)
return nil
}
// SetupNewConnection sets up an auth or unauth streaming connection

View File

@@ -6,8 +6,10 @@ import (
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
)
// Websocket functionality list and state consts
@@ -72,6 +74,12 @@ type Websocket struct {
// Orderbook is a local buffer of orderbooks
Orderbook buffer.Orderbook
// Trade is a notifier of occurring trades
Trade trade.Trade
// Fills is a notifier of occurring fills
Fills fill.Fills
// trafficAlert monitors if there is a halt in traffic throughput
TrafficAlert chan struct{}
// ReadMessageErrors will received all errors from ws.ReadMessage() and
@@ -100,6 +108,9 @@ type WebsocketSetup struct {
SortBuffer bool
SortBufferByUpdateIDs bool
UpdateEntriesByID bool
TradeFeed bool
// Fill data config values
FillsFeed bool
}
// WebsocketConnection contains all the data needed to send a message to a WS

View File

@@ -27,6 +27,35 @@ func (p *Processor) setup(wg *sync.WaitGroup) {
go p.Run(wg)
}
// Setup configures necessary fields to the `Trade` structure that govern trade data
// processing.
func (t *Trade) Setup(exchangeName string, tradeFeedEnabled bool, c chan interface{}) {
t.exchangeName = exchangeName
t.dataHandler = c
t.tradeFeedEnabled = tradeFeedEnabled
}
// Update processes trade data, either by saving it or routing it through
// the data channel.
func (t *Trade) Update(save bool, data ...Data) error {
if len(data) == 0 {
// nothing to do
return nil
}
if t.tradeFeedEnabled {
t.dataHandler <- data
}
if save {
if err := AddTradesToBuffer(t.exchangeName, data...); err != nil {
return err
}
}
return nil
}
// AddTradesToBuffer will push trade data onto the buffer
func AddTradesToBuffer(exchangeName string, data ...Data) error {
cfg := database.DB.GetConfig()

View File

@@ -24,6 +24,14 @@ var (
ErrNoTradesSupplied = errors.New("no trades supplied")
)
// Trade used to hold data and methods related to trade dissemination and
// storage
type Trade struct {
exchangeName string
dataHandler chan interface{}
tradeFeedEnabled bool
}
// Data defines trade data
type Data struct {
ID uuid.UUID `json:"ID,omitempty"`

View File

@@ -162,4 +162,5 @@ func init() {
Ticker = registerNewSubLogger("TICKER")
OrderBook = registerNewSubLogger("ORDERBOOK")
Trade = registerNewSubLogger("TRADE")
Fill = registerNewSubLogger("FILL")
}

View File

@@ -31,6 +31,7 @@ var (
Ticker *SubLogger
OrderBook *SubLogger
Trade *SubLogger
Fill *SubLogger
)
// logFields is used to store data in a non-global and thread-safe manner