mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-14 07:26:47 +00:00
* Engine: Fix false test passes for nil OrderManager TestWebsocketRoutineManagerSetup tests that passing a nil value returns errNilOrderManager; However that's not actually what would happen when order manager is configured off. The arguments to setupWebsocketRoutineManager are interface types. When a nil pointer of interface type is passed, it does NOT equal nil. nil is a primitive type. A nil pointer of interface type has the value (nil;type). See [the Go lang spec](https://golang.org/ref/spec#Comparison_operators): ``` Interface values are comparable. Two interface values are equal if they have identical dynamic types and equal dynamic values or if both have value nil. ``` So that means that whilst this test was passing, because it was sending in a real nil value and comparing it to a real nil, that's not what would happen at runtime. At runtime the bot.OrderManager would be a nil pointer to a concrete type *OrderManager, and so not comparible to nil. This commit just fixes that oversight, and explains the often misunderstood mechanics of comparing interface types to nil. In practical terms this means that the tests assert that the WSRM would not run without a OM, but in fact it actually would. And panic later. This commit SHOULD introduce a FAILing test. Sorry if you're bisecting. * WSM: Fix error on OrderManager not enabled It's okay for OrderManager to not be enabled; It's configurable. Remove the WSM setup protection and move it to runtime using IsRunning. Left the WSM deps so they can be fixed as apporpriate to each.
464 lines
11 KiB
Go
464 lines
11 KiB
Go
package engine
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/thrasher-corp/gocryptotrader/common"
|
|
"github.com/thrasher-corp/gocryptotrader/currency"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
|
|
"github.com/thrasher-corp/gocryptotrader/log"
|
|
)
|
|
|
|
// setupWebsocketRoutineManager creates a new websocket routine manager
|
|
func setupWebsocketRoutineManager(exchangeManager iExchangeManager, orderManager iOrderManager, syncer iCurrencyPairSyncer, cfg *currency.Config, verbose bool) (*WebsocketRoutineManager, error) {
|
|
if exchangeManager == nil {
|
|
return nil, errNilExchangeManager
|
|
}
|
|
if syncer == nil {
|
|
return nil, errNilCurrencyPairSyncer
|
|
}
|
|
if cfg == nil {
|
|
return nil, errNilCurrencyConfig
|
|
}
|
|
if cfg.CurrencyPairFormat == nil {
|
|
return nil, errNilCurrencyPairFormat
|
|
}
|
|
man := &WebsocketRoutineManager{
|
|
verbose: verbose,
|
|
exchangeManager: exchangeManager,
|
|
orderManager: orderManager,
|
|
syncer: syncer,
|
|
currencyConfig: cfg,
|
|
}
|
|
return man, man.registerWebsocketDataHandler(man.websocketDataHandler, false)
|
|
}
|
|
|
|
// Start runs the subsystem
|
|
func (m *WebsocketRoutineManager) Start() error {
|
|
if m == nil {
|
|
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
|
|
}
|
|
|
|
if m.currencyConfig == nil {
|
|
return errNilCurrencyConfig
|
|
}
|
|
|
|
if m.currencyConfig.CurrencyPairFormat == nil {
|
|
return errNilCurrencyPairFormat
|
|
}
|
|
|
|
if !atomic.CompareAndSwapInt32(&m.state, stoppedState, startingState) {
|
|
return ErrSubSystemAlreadyStarted
|
|
}
|
|
|
|
m.shutdown = make(chan struct{})
|
|
|
|
go func() {
|
|
m.websocketRoutine()
|
|
// It's okay for this to fail, just means shutdown has started
|
|
atomic.CompareAndSwapInt32(&m.state, startingState, readyState)
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// IsRunning safely checks whether the subsystem is running
|
|
func (m *WebsocketRoutineManager) IsRunning() bool {
|
|
if m == nil {
|
|
return false
|
|
}
|
|
return atomic.LoadInt32(&m.state) == readyState
|
|
}
|
|
|
|
// Stop attempts to shutdown the subsystem
|
|
func (m *WebsocketRoutineManager) Stop() error {
|
|
if m == nil {
|
|
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
|
|
}
|
|
|
|
m.mu.Lock()
|
|
if atomic.LoadInt32(&m.state) == stoppedState {
|
|
m.mu.Unlock()
|
|
return fmt.Errorf("websocket routine manager %w", ErrSubSystemNotStarted)
|
|
}
|
|
atomic.StoreInt32(&m.state, stoppedState)
|
|
m.mu.Unlock()
|
|
|
|
close(m.shutdown)
|
|
m.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// websocketRoutine Initial routine management system for websocket
|
|
func (m *WebsocketRoutineManager) websocketRoutine() {
|
|
if m.verbose {
|
|
log.Debugln(log.WebsocketMgr, "Connecting exchange websocket services...")
|
|
}
|
|
exchanges, err := m.exchangeManager.GetExchanges()
|
|
if err != nil {
|
|
log.Errorf(log.WebsocketMgr, "websocket routine manager cannot get exchanges: %v", err)
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(exchanges))
|
|
for i := range exchanges {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
if exchanges[i].SupportsWebsocket() {
|
|
if m.verbose {
|
|
log.Debugf(log.WebsocketMgr,
|
|
"Exchange %s websocket support: Yes Enabled: %v",
|
|
exchanges[i].GetName(),
|
|
common.IsEnabled(exchanges[i].IsWebsocketEnabled()),
|
|
)
|
|
}
|
|
|
|
ws, err := exchanges[i].GetWebsocket()
|
|
if err != nil {
|
|
log.Errorf(
|
|
log.WebsocketMgr,
|
|
"Exchange %s GetWebsocket error: %s",
|
|
exchanges[i].GetName(),
|
|
err,
|
|
)
|
|
return
|
|
}
|
|
|
|
if ws.IsEnabled() {
|
|
err = ws.Connect()
|
|
if err != nil {
|
|
log.Errorf(log.WebsocketMgr, "%v", err)
|
|
}
|
|
|
|
err = m.websocketDataReceiver(ws)
|
|
if err != nil {
|
|
log.Errorf(log.WebsocketMgr, "%v", err)
|
|
}
|
|
|
|
err = ws.FlushChannels()
|
|
if err != nil {
|
|
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err)
|
|
}
|
|
}
|
|
} else if m.verbose {
|
|
log.Debugf(log.WebsocketMgr,
|
|
"Exchange %s websocket support: No",
|
|
exchanges[i].GetName(),
|
|
)
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// WebsocketDataReceiver handles websocket data coming from a websocket feed
|
|
// associated with an exchange
|
|
func (m *WebsocketRoutineManager) websocketDataReceiver(ws *stream.Websocket) error {
|
|
if m == nil {
|
|
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
|
|
}
|
|
|
|
if ws == nil {
|
|
return errNilWebsocket
|
|
}
|
|
|
|
if atomic.LoadInt32(&m.state) == stoppedState {
|
|
return errRoutineManagerNotStarted
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
go func() {
|
|
defer m.wg.Done()
|
|
for {
|
|
select {
|
|
case <-m.shutdown:
|
|
return
|
|
case data := <-ws.ToRoutine:
|
|
if data == nil {
|
|
log.Errorf(log.WebsocketMgr, "exchange %s nil data sent to websocket", ws.GetName())
|
|
}
|
|
m.mu.RLock()
|
|
for x := range m.dataHandlers {
|
|
err := m.dataHandlers[x](ws.GetName(), data)
|
|
if err != nil {
|
|
log.Errorln(log.WebsocketMgr, err)
|
|
}
|
|
}
|
|
m.mu.RUnlock()
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// websocketDataHandler is the default central point for exchange websocket
|
|
// implementations to send processed data which will then pass that to an
|
|
// appropriate handler.
|
|
func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data interface{}) error {
|
|
switch d := data.(type) {
|
|
case string:
|
|
log.Infoln(log.WebsocketMgr, d)
|
|
case error:
|
|
return fmt.Errorf("exchange %s websocket error - %s", exchName, data)
|
|
case stream.FundingData:
|
|
if m.verbose {
|
|
log.Infof(log.WebsocketMgr, "%s websocket %s %s funding updated %+v",
|
|
exchName,
|
|
m.FormatCurrency(d.CurrencyPair),
|
|
d.AssetType,
|
|
d)
|
|
}
|
|
case *ticker.Price:
|
|
if m.syncer.IsRunning() {
|
|
err := m.syncer.WebsocketUpdate(exchName,
|
|
d.Pair,
|
|
d.AssetType,
|
|
SyncItemTicker,
|
|
nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
err := ticker.ProcessTicker(d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.syncer.PrintTickerSummary(d, "websocket", err)
|
|
case []ticker.Price:
|
|
for x := range d {
|
|
if m.syncer.IsRunning() {
|
|
err := m.syncer.WebsocketUpdate(exchName,
|
|
d[x].Pair,
|
|
d[x].AssetType,
|
|
SyncItemTicker,
|
|
nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
err := ticker.ProcessTicker(&d[x])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.syncer.PrintTickerSummary(&d[x], "websocket", err)
|
|
}
|
|
case stream.KlineData:
|
|
if m.verbose {
|
|
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
|
|
exchName,
|
|
m.FormatCurrency(d.Pair),
|
|
d.AssetType,
|
|
d)
|
|
}
|
|
case []stream.KlineData:
|
|
for x := range d {
|
|
if m.verbose {
|
|
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
|
|
exchName,
|
|
m.FormatCurrency(d[x].Pair),
|
|
d[x].AssetType,
|
|
d)
|
|
}
|
|
}
|
|
case *orderbook.Depth:
|
|
base, err := d.Retrieve()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if m.syncer.IsRunning() {
|
|
err := m.syncer.WebsocketUpdate(exchName,
|
|
base.Pair,
|
|
base.Asset,
|
|
SyncItemOrderbook,
|
|
nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
m.syncer.PrintOrderbookSummary(base, "websocket", nil)
|
|
case *order.Detail:
|
|
if !m.orderManager.IsRunning() {
|
|
return nil
|
|
}
|
|
if !m.orderManager.Exists(d) {
|
|
err := m.orderManager.Add(d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.printOrderSummary(d, false)
|
|
} else {
|
|
od, err := m.orderManager.GetByExchangeAndID(d.Exchange, d.OrderID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = od.UpdateOrderFromDetail(d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.orderManager.UpdateExistingOrder(od)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.printOrderSummary(od, true)
|
|
}
|
|
case []order.Detail:
|
|
if !m.orderManager.IsRunning() {
|
|
return nil
|
|
}
|
|
for x := range d {
|
|
if !m.orderManager.Exists(&d[x]) {
|
|
err := m.orderManager.Add(&d[x])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.printOrderSummary(&d[x], false)
|
|
} else {
|
|
od, err := m.orderManager.GetByExchangeAndID(d[x].Exchange, d[x].OrderID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = od.UpdateOrderFromDetail(&d[x])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = m.orderManager.UpdateExistingOrder(od)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.printOrderSummary(od, true)
|
|
}
|
|
}
|
|
case order.ClassificationError:
|
|
return fmt.Errorf("%w %s", d.Err, d.Error())
|
|
case stream.UnhandledMessageWarning:
|
|
log.Warnln(log.WebsocketMgr, d.Message)
|
|
case account.Change:
|
|
if m.verbose {
|
|
m.printAccountHoldingsChangeSummary(d)
|
|
}
|
|
case []account.Change:
|
|
if m.verbose {
|
|
for x := range d {
|
|
m.printAccountHoldingsChangeSummary(d[x])
|
|
}
|
|
}
|
|
case []trade.Data:
|
|
if m.verbose {
|
|
log.Infof(log.Trade, "%+v", d)
|
|
}
|
|
case []fill.Data:
|
|
if m.verbose {
|
|
log.Infof(log.Fill, "%+v", d)
|
|
}
|
|
default:
|
|
if m.verbose {
|
|
log.Warnf(log.WebsocketMgr,
|
|
"%s websocket Unknown type: %+v",
|
|
exchName,
|
|
d)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FormatCurrency is a method that formats and returns a currency pair
|
|
// based on the user currency display preferences
|
|
func (m *WebsocketRoutineManager) FormatCurrency(p currency.Pair) currency.Pair {
|
|
if m == nil || atomic.LoadInt32(&m.state) == stoppedState {
|
|
return p
|
|
}
|
|
return p.Format(*m.currencyConfig.CurrencyPairFormat)
|
|
}
|
|
|
|
// printOrderSummary this function will be deprecated when a order manager
|
|
// update is done.
|
|
func (m *WebsocketRoutineManager) printOrderSummary(o *order.Detail, isUpdate bool) {
|
|
if m == nil || atomic.LoadInt32(&m.state) == stoppedState || o == nil {
|
|
return
|
|
}
|
|
|
|
orderNotif := "New Order:"
|
|
if isUpdate {
|
|
orderNotif = "Order Change:"
|
|
}
|
|
|
|
log.Debugf(log.WebsocketMgr,
|
|
"%s %s %s %s %s %s %s OrderID:%s ClientOrderID:%s Price:%f Amount:%f Executed Amount:%f Remaining Amount:%f",
|
|
orderNotif,
|
|
o.Exchange,
|
|
o.AssetType,
|
|
o.Pair,
|
|
o.Status,
|
|
o.Type,
|
|
o.Side,
|
|
o.OrderID,
|
|
o.ClientOrderID,
|
|
o.Price,
|
|
o.Amount,
|
|
o.ExecutedAmount,
|
|
o.RemainingAmount)
|
|
}
|
|
|
|
// printAccountHoldingsChangeSummary this function will be deprecated when a
|
|
// account holdings update is done.
|
|
func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(o account.Change) {
|
|
if m == nil || atomic.LoadInt32(&m.state) == stoppedState {
|
|
return
|
|
}
|
|
log.Debugf(log.WebsocketMgr,
|
|
"Account Holdings Balance Changed: %s %s %s has changed balance by %f for account: %s",
|
|
o.Exchange,
|
|
o.Asset,
|
|
o.Currency,
|
|
o.Amount,
|
|
o.Account)
|
|
}
|
|
|
|
// registerWebsocketDataHandler registers an externally (GCT Library) defined
|
|
// dedicated filter specific data types for internal & external strategy use.
|
|
// InterceptorOnly as true will purge all other registered handlers
|
|
// (including default) bypassing all other handling.
|
|
func (m *WebsocketRoutineManager) registerWebsocketDataHandler(fn WebsocketDataHandler, interceptorOnly bool) error {
|
|
if m == nil {
|
|
return fmt.Errorf("%T %w", m, ErrNilSubsystem)
|
|
}
|
|
|
|
if fn == nil {
|
|
return errNilWebsocketDataHandlerFunction
|
|
}
|
|
|
|
if interceptorOnly {
|
|
return m.setWebsocketDataHandler(fn)
|
|
}
|
|
|
|
m.mu.Lock()
|
|
// Push front so that any registered data handler has first preference
|
|
// over the gct default handler.
|
|
m.dataHandlers = append([]WebsocketDataHandler{fn}, m.dataHandlers...)
|
|
m.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// setWebsocketDataHandler sets a single websocket data handler, removing all
|
|
// pre-existing handlers.
|
|
func (m *WebsocketRoutineManager) setWebsocketDataHandler(fn WebsocketDataHandler) error {
|
|
if m == nil {
|
|
return fmt.Errorf("%T %w", m, ErrNilSubsystem)
|
|
}
|
|
if fn == nil {
|
|
return errNilWebsocketDataHandlerFunction
|
|
}
|
|
m.mu.Lock()
|
|
m.dataHandlers = []WebsocketDataHandler{fn}
|
|
m.mu.Unlock()
|
|
return nil
|
|
}
|