Files
gocryptotrader/engine/websocketroutine_manager.go
Gareth Kirwan 0ecd082254 wsManager: Fix IsRunning true before flushed (#1201)
* Engine: Expose WebsocketRoutineManager

Without exposing the manager users don't know when it's ready to use.

* wsManager: Remove duplicate shutdown chan make

The shutdown channel was already in setup.
Consumers could reasonably expect it to not be replaced in between the
two, and it's not really part of Start to assign it.

* wsManager: Fix IsRunning true before flushed

Consumers must be able to tell when it's safe to start new
subscriptions. Before this fix any new subscriptions would get unsubbed
as part of the flush during `websocketRoutine`.

* WSM: Fix Stop/Start/Stop failing

We previously removed the shutdown channel from Start to avoid duplicate
allocation.
However that will result in a closed channel after the first Stop
So it's better to remove it from the setup.
It's private anyway.

* WSM: Export WebsocketRoutineManager type
2023-05-23 09:19:28 +10:00

403 lines
10 KiB
Go

package engine
import (
"fmt"
"sync"
"sync/atomic"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
// setupWebsocketRoutineManager creates a new websocket routine manager
func setupWebsocketRoutineManager(exchangeManager iExchangeManager, orderManager iOrderManager, syncer iCurrencyPairSyncer, cfg *currency.Config, verbose bool) (*WebsocketRoutineManager, error) {
if exchangeManager == nil {
return nil, errNilExchangeManager
}
if orderManager == nil {
return nil, errNilOrderManager
}
if syncer == nil {
return nil, errNilCurrencyPairSyncer
}
if cfg == nil {
return nil, errNilCurrencyConfig
}
if cfg.CurrencyPairFormat == nil {
return nil, errNilCurrencyPairFormat
}
man := &WebsocketRoutineManager{
verbose: verbose,
exchangeManager: exchangeManager,
orderManager: orderManager,
syncer: syncer,
currencyConfig: cfg,
}
return man, man.registerWebsocketDataHandler(man.websocketDataHandler, false)
}
// Start runs the subsystem
func (m *WebsocketRoutineManager) Start() error {
if m == nil {
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
}
if m.currencyConfig == nil {
return errNilCurrencyConfig
}
if m.currencyConfig.CurrencyPairFormat == nil {
return errNilCurrencyPairFormat
}
if !atomic.CompareAndSwapInt32(&m.state, stoppedState, startingState) {
return ErrSubSystemAlreadyStarted
}
m.shutdown = make(chan struct{})
go func() {
m.websocketRoutine()
// It's okay for this to fail, just means shutdown has started
atomic.CompareAndSwapInt32(&m.state, startingState, readyState)
}()
return nil
}
// IsRunning safely checks whether the subsystem is running
func (m *WebsocketRoutineManager) IsRunning() bool {
if m == nil {
return false
}
return atomic.LoadInt32(&m.state) == readyState
}
// Stop attempts to shutdown the subsystem
func (m *WebsocketRoutineManager) Stop() error {
if m == nil {
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
}
m.mu.Lock()
if atomic.LoadInt32(&m.state) == stoppedState {
m.mu.Unlock()
return fmt.Errorf("websocket routine manager %w", ErrSubSystemNotStarted)
}
atomic.StoreInt32(&m.state, stoppedState)
m.mu.Unlock()
close(m.shutdown)
m.wg.Wait()
return nil
}
// websocketRoutine Initial routine management system for websocket
func (m *WebsocketRoutineManager) websocketRoutine() {
if m.verbose {
log.Debugln(log.WebsocketMgr, "Connecting exchange websocket services...")
}
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.WebsocketMgr, "websocket routine manager cannot get exchanges: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(len(exchanges))
for i := range exchanges {
go func(i int) {
defer wg.Done()
if exchanges[i].SupportsWebsocket() {
if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: Yes Enabled: %v",
exchanges[i].GetName(),
common.IsEnabled(exchanges[i].IsWebsocketEnabled()),
)
}
ws, err := exchanges[i].GetWebsocket()
if err != nil {
log.Errorf(
log.WebsocketMgr,
"Exchange %s GetWebsocket error: %s",
exchanges[i].GetName(),
err,
)
return
}
if ws.IsEnabled() {
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = m.websocketDataReceiver(ws)
if err != nil {
log.Errorf(log.WebsocketMgr, "%v", err)
}
err = ws.FlushChannels()
if err != nil {
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err)
}
}
} else if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: No",
exchanges[i].GetName(),
)
}
}(i)
}
wg.Wait()
}
// WebsocketDataReceiver handles websocket data coming from a websocket feed
// associated with an exchange
func (m *WebsocketRoutineManager) websocketDataReceiver(ws *stream.Websocket) error {
if m == nil {
return fmt.Errorf("websocket routine manager %w", ErrNilSubsystem)
}
if ws == nil {
return errNilWebsocket
}
if atomic.LoadInt32(&m.state) == stoppedState {
return errRoutineManagerNotStarted
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.shutdown:
return
case data := <-ws.ToRoutine:
if data == nil {
log.Errorf(log.WebsocketMgr, "exchange %s nil data sent to websocket", ws.GetName())
}
m.mu.RLock()
for x := range m.dataHandlers {
err := m.dataHandlers[x](ws.GetName(), data)
if err != nil {
log.Errorln(log.WebsocketMgr, err)
}
}
m.mu.RUnlock()
}
}
}()
return nil
}
// websocketDataHandler is the default central point for exchange websocket
// implementations to send processed data which will then pass that to an
// appropriate handler.
func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data interface{}) error {
switch d := data.(type) {
case string:
log.Infoln(log.WebsocketMgr, d)
case error:
return fmt.Errorf("exchange %s websocket error - %s", exchName, data)
case stream.FundingData:
if m.verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s funding updated %+v",
exchName,
m.FormatCurrency(d.CurrencyPair),
d.AssetType,
d)
}
case *ticker.Price:
if m.syncer.IsRunning() {
err := m.syncer.Update(exchName,
d.Pair,
d.AssetType,
SyncItemTicker,
nil)
if err != nil {
return err
}
}
err := ticker.ProcessTicker(d)
if err != nil {
return err
}
m.syncer.PrintTickerSummary(d, "websocket", err)
case stream.KlineData:
if m.verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
exchName,
m.FormatCurrency(d.Pair),
d.AssetType,
d)
}
case *orderbook.Depth:
base, err := d.Retrieve()
if err != nil {
return err
}
if m.syncer.IsRunning() {
err := m.syncer.Update(exchName,
base.Pair,
base.Asset,
SyncItemOrderbook,
nil)
if err != nil {
return err
}
}
m.syncer.PrintOrderbookSummary(base, "websocket", nil)
case *order.Detail:
if !m.orderManager.Exists(d) {
err := m.orderManager.Add(d)
if err != nil {
return err
}
m.printOrderSummary(d, false)
} else {
od, err := m.orderManager.GetByExchangeAndID(d.Exchange, d.OrderID)
if err != nil {
return err
}
err = od.UpdateOrderFromDetail(d)
if err != nil {
return err
}
err = m.orderManager.UpdateExistingOrder(od)
if err != nil {
return err
}
m.printOrderSummary(d, true)
}
case order.ClassificationError:
return fmt.Errorf("%w %s", d.Err, d.Error())
case stream.UnhandledMessageWarning:
log.Warnln(log.WebsocketMgr, d.Message)
case account.Change:
if m.verbose {
m.printAccountHoldingsChangeSummary(d)
}
case []trade.Data:
if m.verbose {
log.Infof(log.Trade, "%+v", d)
}
case []fill.Data:
if m.verbose {
log.Infof(log.Fill, "%+v", d)
}
default:
if m.verbose {
log.Warnf(log.WebsocketMgr,
"%s websocket Unknown type: %+v",
exchName,
d)
}
}
return nil
}
// FormatCurrency is a method that formats and returns a currency pair
// based on the user currency display preferences
func (m *WebsocketRoutineManager) FormatCurrency(p currency.Pair) currency.Pair {
if m == nil || atomic.LoadInt32(&m.state) == stoppedState {
return p
}
return p.Format(*m.currencyConfig.CurrencyPairFormat)
}
// printOrderSummary this function will be deprecated when a order manager
// update is done.
func (m *WebsocketRoutineManager) printOrderSummary(o *order.Detail, isUpdate bool) {
if m == nil || atomic.LoadInt32(&m.state) == stoppedState || o == nil {
return
}
orderNotif := "New Order:"
if isUpdate {
orderNotif = "Order Change:"
}
log.Debugf(log.WebsocketMgr,
"%s %s %s %s %s %s %s OrderID:%s ClientOrderID:%s Price:%f Amount:%f Executed Amount:%f Remaining Amount:%f",
orderNotif,
o.Exchange,
o.AssetType,
o.Pair,
o.Status,
o.Type,
o.Side,
o.OrderID,
o.ClientOrderID,
o.Price,
o.Amount,
o.ExecutedAmount,
o.RemainingAmount)
}
// printAccountHoldingsChangeSummary this function will be deprecated when a
// account holdings update is done.
func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(o account.Change) {
if m == nil || atomic.LoadInt32(&m.state) == stoppedState {
return
}
log.Debugf(log.WebsocketMgr,
"Account Holdings Balance Changed: %s %s %s has changed balance by %f for account: %s",
o.Exchange,
o.Asset,
o.Currency,
o.Amount,
o.Account)
}
// registerWebsocketDataHandler registers an externally (GCT Library) defined
// dedicated filter specific data types for internal & external strategy use.
// InterceptorOnly as true will purge all other registered handlers
// (including default) bypassing all other handling.
func (m *WebsocketRoutineManager) registerWebsocketDataHandler(fn WebsocketDataHandler, interceptorOnly bool) error {
if m == nil {
return fmt.Errorf("%T %w", m, ErrNilSubsystem)
}
if fn == nil {
return errNilWebsocketDataHandlerFunction
}
if interceptorOnly {
return m.setWebsocketDataHandler(fn)
}
m.mu.Lock()
// Push front so that any registered data handler has first preference
// over the gct default handler.
m.dataHandlers = append([]WebsocketDataHandler{fn}, m.dataHandlers...)
m.mu.Unlock()
return nil
}
// setWebsocketDataHandler sets a single websocket data handler, removing all
// pre-existing handlers.
func (m *WebsocketRoutineManager) setWebsocketDataHandler(fn WebsocketDataHandler) error {
if m == nil {
return fmt.Errorf("%T %w", m, ErrNilSubsystem)
}
if fn == nil {
return errNilWebsocketDataHandlerFunction
}
m.mu.Lock()
m.dataHandlers = []WebsocketDataHandler{fn}
m.mu.Unlock()
return nil
}