diff --git a/engine/engine.go b/engine/engine.go index 2d46d393..0879d40d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -513,8 +513,8 @@ func (bot *Engine) Start() error { bot.currencyPairSyncer, err = setupSyncManager( exchangeSyncCfg, bot.ExchangeManager, - bot.websocketRoutineManager, - &bot.Config.RemoteControl) + &bot.Config.RemoteControl, + bot.Settings.EnableWebsocketRoutine) if err != nil { gctlog.Errorf(gctlog.Global, "Unable to initialise exchange currency pair syncer. Err: %s", err) } else { diff --git a/engine/helpers.go b/engine/helpers.go index d42f8877..18b24992 100644 --- a/engine/helpers.go +++ b/engine/helpers.go @@ -177,10 +177,11 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error { SyncTimeoutREST: bot.Settings.SyncTimeoutREST, SyncTimeoutWebsocket: bot.Settings.SyncTimeoutWebsocket, } - bot.currencyPairSyncer, err = setupSyncManager(exchangeSyncCfg, + bot.currencyPairSyncer, err = setupSyncManager( + exchangeSyncCfg, bot.ExchangeManager, - bot.websocketRoutineManager, - &bot.Config.RemoteControl) + &bot.Config.RemoteControl, + bot.Settings.EnableWebsocketRoutine) if err != nil { return err } diff --git a/engine/sync_manager.go b/engine/sync_manager.go index 71d760a9..2e869329 100644 --- a/engine/sync_manager.go +++ b/engine/sync_manager.go @@ -37,10 +37,11 @@ var ( DefaultSyncerTimeoutWebsocket = time.Minute errNoSyncItemsEnabled = errors.New("no sync items enabled") errUnknownSyncItem = errors.New("unknown sync item") + errSyncPairNotFound = errors.New("exchange currency pair syncer not found") ) // setupSyncManager starts a new CurrencyPairSyncer -func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketDataReceiver iWebsocketDataReceiver, remoteConfig *config.RemoteControlConfig) (*syncManager, error) { +func setupSyncManager(c *Config, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*syncManager, error) { if !c.SyncOrderbook && !c.SyncTicker && !c.SyncTrades { return nil, errNoSyncItemsEnabled } @@ -64,10 +65,10 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData } s := &syncManager{ - config: *c, - remoteConfig: remoteConfig, - exchangeManager: exchangeManager, - websocketDataReceiver: websocketDataReceiver, + config: *c, + remoteConfig: remoteConfig, + exchangeManager: exchangeManager, + websocketRoutineManagerEnabled: websocketRoutineManagerEnabled, } s.tickerBatchLastRequested = make(map[string]time.Time) @@ -75,7 +76,7 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData log.Debugf(log.SyncMgr, "Exchange currency pair syncer config: continuous: %v ticker: %v"+ " orderbook: %v trades: %v workers: %v verbose: %v timeout REST: %v"+ - " timeout Websocket: %v\n", + " timeout Websocket: %v", s.config.SyncContinuously, s.config.SyncTicker, s.config.SyncOrderbook, s.config.SyncTrades, s.config.NumWorkers, s.config.Verbose, s.config.SyncTimeoutREST, s.config.SyncTimeoutWebsocket) @@ -110,44 +111,17 @@ func (m *syncManager) Start() error { if !supportsREST && !supportsWebsocket { log.Warnf(log.SyncMgr, - "Loaded exchange %s does not support REST or Websocket.\n", + "Loaded exchange %s does not support REST or Websocket.", exchangeName) continue } var usingWebsocket bool var usingREST bool - if supportsWebsocket && exchanges[x].IsWebsocketEnabled() { - ws, err := exchanges[x].GetWebsocket() - if err != nil { - log.Errorf(log.SyncMgr, - "%s failed to get websocket. Err: %s\n", - exchangeName, - err) - usingREST = true - } - - if !ws.IsConnected() && !ws.IsConnecting() { - if m.websocketDataReceiver.IsRunning() { - go m.websocketDataReceiver.WebsocketDataReceiver(ws) - } - - err = ws.Connect() - if err == nil { - err = ws.FlushChannels() - } - if err != nil { - log.Errorf(log.SyncMgr, - "%s websocket failed to connect. Err: %s\n", - exchangeName, - err) - usingREST = true - } else { - usingWebsocket = true - } - } else { - usingWebsocket = true - } + if m.websocketRoutineManagerEnabled && + supportsWebsocket && + exchanges[x].IsWebsocketEnabled() { + usingWebsocket = true } else if supportsREST { usingREST = true } @@ -172,7 +146,7 @@ func (m *syncManager) Start() error { enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y]) if err != nil { log.Errorf(log.SyncMgr, - "%s failed to get enabled pairs. Err: %s\n", + "%s failed to get enabled pairs. Err: %s", exchangeName, err) continue @@ -182,37 +156,33 @@ func (m *syncManager) Start() error { continue } - c := currencyPairSyncAgent{ + c := ¤cyPairSyncAgent{ AssetType: assetTypes[y], Exchange: exchangeName, Pair: enabledPairs[i], } - sBase := syncBase{ IsUsingREST: usingREST || !wsAssetSupported, IsUsingWebsocket: usingWebsocket && wsAssetSupported, } - if m.config.SyncTicker { c.Ticker = sBase } - if m.config.SyncOrderbook { c.Orderbook = sBase } - if m.config.SyncTrades { c.Trade = sBase } - m.add(&c) + m.add(c) } } } if atomic.CompareAndSwapInt32(&m.initSyncStarted, 0, 1) { log.Debugf(log.SyncMgr, - "Exchange CurrencyPairSyncer initial sync started. %d items to process.\n", + "Exchange CurrencyPairSyncer initial sync started. %d items to process.", createdCounter) m.initSyncStartTime = time.Now() } @@ -220,9 +190,9 @@ func (m *syncManager) Start() error { go func() { m.initSyncWG.Wait() if atomic.CompareAndSwapInt32(&m.initSyncCompleted, 0, 1) { - log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.\n") + log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.") completedTime := time.Now() - log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].\n", + log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].", completedTime.Sub(m.initSyncStartTime), createdCounter) if !m.config.SyncContinuously { @@ -248,7 +218,6 @@ func (m *syncManager) Start() error { } // Stop shuts down the exchange currency pair syncer -// Stop attempts to shutdown the subsystem func (m *syncManager) Stop() error { if m == nil { return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem) @@ -273,7 +242,7 @@ func (m *syncManager) get(exchangeName string, p currency.Pair, a asset.Item) (* } } - return nil, errors.New("exchange currency pair syncer not found") + return nil, fmt.Errorf("%v %v %v %w", exchangeName, a, p, errSyncPairNotFound) } func (m *syncManager) exists(exchangeName string, p currency.Pair, a asset.Item) bool { @@ -297,7 +266,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) { if m.config.SyncTicker { if m.config.Verbose { log.Debugf(log.SyncMgr, - "%s: Added ticker sync item %v: using websocket: %v using REST: %v\n", + "%s: Added ticker sync item %v: using websocket: %v using REST: %v", c.Exchange, m.FormatCurrency(c.Pair).String(), c.Ticker.IsUsingWebsocket, c.Ticker.IsUsingREST) } @@ -310,7 +279,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) { if m.config.SyncOrderbook { if m.config.Verbose { log.Debugf(log.SyncMgr, - "%s: Added orderbook sync item %v: using websocket: %v using REST: %v\n", + "%s: Added orderbook sync item %v: using websocket: %v using REST: %v", c.Exchange, m.FormatCurrency(c.Pair).String(), c.Orderbook.IsUsingWebsocket, c.Orderbook.IsUsingREST) } @@ -323,7 +292,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) { if m.config.SyncTrades { if m.config.Verbose { log.Debugf(log.SyncMgr, - "%s: Added trade sync item %v: using websocket: %v using REST: %v\n", + "%s: Added trade sync item %v: using websocket: %v using REST: %v", c.Exchange, m.FormatCurrency(c.Pair).String(), c.Trade.IsUsingWebsocket, c.Trade.IsUsingREST) } @@ -427,7 +396,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item, m.currencyPairs[x].Ticker.IsProcessing = false if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData { removedCounter++ - log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].\n", + log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].", exchangeName, m.FormatCurrency(p).String(), removedCounter, @@ -445,7 +414,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item, m.currencyPairs[x].Orderbook.IsProcessing = false if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData { removedCounter++ - log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].\n", + log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].", exchangeName, m.FormatCurrency(p).String(), removedCounter, @@ -463,7 +432,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item, m.currencyPairs[x].Trade.IsProcessing = false if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData { removedCounter++ - log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].\n", + log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].", exchangeName, m.FormatCurrency(p).String(), removedCounter, @@ -496,7 +465,7 @@ func (m *syncManager) worker() { ws, err := exchanges[x].GetWebsocket() if err != nil { log.Errorf(log.SyncMgr, - "%s unable to get websocket pointer. Err: %s\n", + "%s unable to get websocket pointer. Err: %s", exchangeName, err) usingREST = true @@ -517,7 +486,7 @@ func (m *syncManager) worker() { enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y]) if err != nil { log.Errorf(log.SyncMgr, - "%s failed to get enabled pairs. Err: %s\n", + "%s failed to get enabled pairs. Err: %s", exchangeName, err) continue @@ -527,41 +496,41 @@ func (m *syncManager) worker() { return } - if !m.exists(exchangeName, enabledPairs[i], assetTypes[y]) { - c := currencyPairSyncAgent{ - AssetType: assetTypes[y], - Exchange: exchangeName, - Pair: enabledPairs[i], - } - - sBase := syncBase{ - IsUsingREST: usingREST || !wsAssetSupported, - IsUsingWebsocket: usingWebsocket && wsAssetSupported, - } - - if m.config.SyncTicker { - c.Ticker = sBase - } - - if m.config.SyncOrderbook { - c.Orderbook = sBase - } - - if m.config.SyncTrades { - c.Trade = sBase - } - - m.add(&c) - } - c, err := m.get(exchangeName, enabledPairs[i], assetTypes[y]) if err != nil { - log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err) - continue + if err == errSyncPairNotFound { + c = ¤cyPairSyncAgent{ + AssetType: assetTypes[y], + Exchange: exchangeName, + Pair: enabledPairs[i], + } + + sBase := syncBase{ + IsUsingREST: usingREST || !wsAssetSupported, + IsUsingWebsocket: usingWebsocket && wsAssetSupported, + } + + if m.config.SyncTicker { + c.Ticker = sBase + } + + if m.config.SyncOrderbook { + c.Orderbook = sBase + } + + if m.config.SyncTrades { + c.Trade = sBase + } + + m.add(c) + } else { + log.Error(log.SyncMgr, err) + continue + } } if switchedToRest && usingWebsocket { log.Warnf(log.SyncMgr, - "%s %s: Websocket re-enabled, switching from rest to websocket\n", + "%s %s: Websocket re-enabled, switching from rest to websocket", c.Exchange, m.FormatCurrency(enabledPairs[i]).String()) switchedToRest = false } @@ -580,7 +549,7 @@ func (m *syncManager) worker() { c.Orderbook.IsUsingWebsocket = false c.Orderbook.IsUsingREST = true log.Warnf(log.SyncMgr, - "%s %s %s: No orderbook update after %s, switching from websocket to rest\n", + "%s %s %s: No orderbook update after %s, switching from websocket to rest", c.Exchange, m.FormatCurrency(c.Pair).String(), strings.ToUpper(c.AssetType.String()), @@ -623,7 +592,7 @@ func (m *syncManager) worker() { c.Ticker.IsUsingWebsocket = false c.Ticker.IsUsingREST = true log.Warnf(log.SyncMgr, - "%s %s %s: No ticker update after %s, switching from websocket to rest\n", + "%s %s %s: No ticker update after %s, switching from websocket to rest", c.Exchange, m.FormatCurrency(enabledPairs[i]).String(), strings.ToUpper(c.AssetType.String()), @@ -650,14 +619,14 @@ func (m *syncManager) worker() { if batchLastDone.IsZero() || time.Since(batchLastDone) > m.config.SyncTimeoutREST { m.mux.Lock() if m.config.Verbose { - log.Debugf(log.SyncMgr, "%s Init'ing REST ticker batching\n", exchangeName) + log.Debugf(log.SyncMgr, "Initialising %s REST ticker batching", exchangeName) } result, err = exchanges[x].UpdateTicker(c.Pair, c.AssetType) m.tickerBatchLastRequested[exchangeName] = time.Now() m.mux.Unlock() } else { if m.config.Verbose { - log.Debugf(log.SyncMgr, "%s Using recent batching cache\n", exchangeName) + log.Debugf(log.SyncMgr, "%s Using recent batching cache", exchangeName) } result, err = exchanges[x].FetchTicker(c.Pair, c.AssetType) } @@ -702,7 +671,7 @@ func (m *syncManager) worker() { func printCurrencyFormat(price float64, displayCurrency currency.Code) string { displaySymbol, err := currency.GetSymbolByCurrencyName(displayCurrency) if err != nil { - log.Errorf(log.SyncMgr, "Failed to get display symbol: %s\n", err) + log.Errorf(log.SyncMgr, "Failed to get display symbol: %s", err) } return fmt.Sprintf("%s%.8f", displaySymbol, price) @@ -713,17 +682,17 @@ func printConvertCurrencyFormat(origCurrency currency.Code, origPrice float64, d origCurrency, displayCurrency) if err != nil { - log.Errorf(log.SyncMgr, "Failed to convert currency: %s\n", err) + log.Errorf(log.SyncMgr, "Failed to convert currency: %s", err) } displaySymbol, err := currency.GetSymbolByCurrencyName(displayCurrency) if err != nil { - log.Errorf(log.SyncMgr, "Failed to get display symbol: %s\n", err) + log.Errorf(log.SyncMgr, "Failed to get display symbol: %s", err) } origSymbol, err := currency.GetSymbolByCurrencyName(origCurrency) if err != nil { - log.Errorf(log.SyncMgr, "Failed to get original currency symbol for %s: %s\n", + log.Errorf(log.SyncMgr, "Failed to get original currency symbol for %s: %s", origCurrency, err) } @@ -745,12 +714,12 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string, } if err != nil { if err == common.ErrNotYetImplemented { - log.Warnf(log.SyncMgr, "Failed to get %s ticker. Error: %s\n", + log.Warnf(log.SyncMgr, "Failed to get %s ticker. Error: %s", protocol, err) return } - log.Errorf(log.SyncMgr, "Failed to get %s ticker. Error: %s\n", + log.Errorf(log.SyncMgr, "Failed to get %s ticker. Error: %s", protocol, err) return @@ -763,7 +732,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string, result.Pair.Quote != m.fiatDisplayCurrency && !m.fiatDisplayCurrency.IsEmpty() { origCurrency := result.Pair.Quote.Upper() - log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f", result.ExchangeName, protocol, m.FormatCurrency(result.Pair), @@ -778,7 +747,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string, if result.Pair.Quote.IsFiatCurrency() && result.Pair.Quote == m.fiatDisplayCurrency && !m.fiatDisplayCurrency.IsEmpty() { - log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f", result.ExchangeName, protocol, m.FormatCurrency(result.Pair), @@ -790,7 +759,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string, printCurrencyFormat(result.Low, m.fiatDisplayCurrency), result.Volume) } else { - log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f", result.ExchangeName, protocol, m.FormatCurrency(result.Pair), @@ -815,7 +784,7 @@ func (m *syncManager) FormatCurrency(p currency.Pair) currency.Pair { } const ( - book = "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n" + book = "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s" ) // PrintOrderbookSummary outputs orderbook results @@ -825,13 +794,13 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str } if err != nil { if result == nil { - log.Errorf(log.OrderBook, "Failed to get %s orderbook. Error: %s\n", + log.Errorf(log.OrderBook, "Failed to get %s orderbook. Error: %s", protocol, err) return } if err == common.ErrNotYetImplemented { - log.Warnf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s\n", + log.Warnf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s", protocol, result.Exchange, result.Pair, @@ -839,7 +808,7 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str err) return } - log.Errorf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s\n", + log.Errorf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s", protocol, result.Exchange, result.Pair, @@ -907,7 +876,7 @@ func relayWebsocketEvent(result interface{}, event, assetType, exchangeName stri } err := BroadcastWebsocketMessage(evt) if !errors.Is(err, ErrWebsocketServiceNotRunning) { - log.Errorf(log.APIServerMgr, "Failed to broadcast websocket event %v. Error: %s\n", + log.Errorf(log.APIServerMgr, "Failed to broadcast websocket event %v. Error: %s", event, err) } } diff --git a/engine/sync_manager_test.go b/engine/sync_manager_test.go index 4e0cb4b4..ac963591 100644 --- a/engine/sync_manager_test.go +++ b/engine/sync_manager_test.go @@ -14,22 +14,22 @@ import ( func TestSetupSyncManager(t *testing.T) { t.Parallel() - _, err := setupSyncManager(&Config{}, nil, nil, nil) + _, err := setupSyncManager(&Config{}, nil, nil, false) if !errors.Is(err, errNoSyncItemsEnabled) { t.Errorf("error '%v', expected '%v'", err, errNoSyncItemsEnabled) } - _, err = setupSyncManager(&Config{SyncTrades: true}, nil, nil, nil) + _, err = setupSyncManager(&Config{SyncTrades: true}, nil, nil, false) if !errors.Is(err, errNilExchangeManager) { t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager) } - _, err = setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, nil) + _, err = setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, false) if !errors.Is(err, errNilConfig) { t.Errorf("error '%v', expected '%v'", err, errNilConfig) } - m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, &config.RemoteControlConfig{}) + m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -40,7 +40,7 @@ func TestSetupSyncManager(t *testing.T) { func TestSyncManagerStart(t *testing.T) { t.Parallel() - m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, &config.RemoteControlConfig{}) + m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -85,7 +85,7 @@ func TestSyncManagerStop(t *testing.T) { } exch.SetDefaults() em.Add(exch) - m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{}) + m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -133,7 +133,7 @@ func TestPrintTickerSummary(t *testing.T) { } exch.SetDefaults() em.Add(exch) - m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{}) + m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -172,7 +172,7 @@ func TestPrintOrderbookSummary(t *testing.T) { } exch.SetDefaults() em.Add(exch) - m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{}) + m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } diff --git a/engine/sync_manager_types.go b/engine/sync_manager_types.go index 9beb1f9e..854eac89 100644 --- a/engine/sync_manager_types.go +++ b/engine/sync_manager_types.go @@ -44,22 +44,22 @@ type Config struct { // syncManager stores the exchange currency pair syncer object type syncManager struct { - initSyncCompleted int32 - initSyncStarted int32 - started int32 - delimiter string - uppercase bool - initSyncStartTime time.Time - fiatDisplayCurrency currency.Code - mux sync.Mutex - initSyncWG sync.WaitGroup - inService sync.WaitGroup + initSyncCompleted int32 + initSyncStarted int32 + started int32 + delimiter string + uppercase bool + initSyncStartTime time.Time + fiatDisplayCurrency currency.Code + websocketRoutineManagerEnabled bool + mux sync.Mutex + initSyncWG sync.WaitGroup + inService sync.WaitGroup currencyPairs []currencyPairSyncAgent tickerBatchLastRequested map[string]time.Time - remoteConfig *config.RemoteControlConfig - config Config - exchangeManager iExchangeManager - websocketDataReceiver iWebsocketDataReceiver + remoteConfig *config.RemoteControlConfig + config Config + exchangeManager iExchangeManager } diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index 0702005d..8074da8e 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -51,7 +51,7 @@ func (m *websocketRoutineManager) Start() error { return ErrSubSystemAlreadyStarted } m.shutdown = make(chan struct{}) - go m.websocketRoutine() + m.websocketRoutine() return nil } @@ -87,7 +87,7 @@ func (m *websocketRoutineManager) websocketRoutine() { if exchanges[i].SupportsWebsocket() { if m.verbose { log.Debugf(log.WebsocketMgr, - "Exchange %s websocket support: Yes Enabled: %v\n", + "Exchange %s websocket support: Yes Enabled: %v", exchanges[i].GetName(), common.IsEnabled(exchanges[i].IsWebsocketEnabled()), ) @@ -97,35 +97,27 @@ func (m *websocketRoutineManager) websocketRoutine() { if err != nil { log.Errorf( log.WebsocketMgr, - "Exchange %s GetWebsocket error: %s\n", + "Exchange %s GetWebsocket error: %s", exchanges[i].GetName(), err, ) return } - // Exchange sync manager might have already started ws - // service or is in the process of connecting, so check - if ws.IsConnected() || ws.IsConnecting() { - return - } - - // Data handler routine - go m.WebsocketDataReceiver(ws) - if ws.IsEnabled() { err = ws.Connect() if err != nil { - log.Errorf(log.WebsocketMgr, "%v\n", err) + log.Errorf(log.WebsocketMgr, "%v", err) } + go m.WebsocketDataReceiver(ws) err = ws.FlushChannels() if err != nil { - log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v\n", err) + log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err) } } } else if m.verbose { log.Debugf(log.WebsocketMgr, - "Exchange %s websocket support: No\n", + "Exchange %s websocket support: No", exchanges[i].GetName(), ) } diff --git a/exchanges/interfaces.go b/exchanges/interfaces.go index da5b0c9e..5c7fd737 100644 --- a/exchanges/interfaces.go +++ b/exchanges/interfaces.go @@ -75,19 +75,16 @@ type IBotExchange interface { GetHistoricCandlesExtended(p currency.Pair, a asset.Item, timeStart, timeEnd time.Time, interval kline.Interval) (kline.Item, error) DisableRateLimiter() error EnableRateLimiter() error - // Websocket specific wrapper functionality - // GetWebsocket returns a pointer to the websocket + GetWebsocket() (*stream.Websocket, error) IsWebsocketEnabled() bool SupportsWebsocket() bool SubscribeToWebsocketChannels(channels []stream.ChannelSubscription) error UnsubscribeToWebsocketChannels(channels []stream.ChannelSubscription) error IsAssetWebsocketSupported(aType asset.Item) bool - // FlushWebsocketChannels checks and flushes subscriptions if there is a - // pair,asset, url/proxy or subscription change FlushWebsocketChannels() error AuthenticateWebsocket() error - // Exchange order related execution limits + GetOrderExecutionLimits(a asset.Item, cp currency.Pair) (*order.Limits, error) CheckOrderExecutionLimits(a asset.Item, cp currency.Pair, price, amount float64, orderType order.Type) error UpdateOrderExecutionLimits(a asset.Item) error