Bugfix: Syncer & websocket circular dependency (#743)

* Fixes cyclical dependency

* why not lint

* Addresses potential wait if websocket subsystem is disabled
This commit is contained in:
Scott
2021-08-11 17:06:01 +10:00
committed by GitHub
parent c7c8cac21f
commit 126187b31b
7 changed files with 112 additions and 153 deletions

View File

@@ -513,8 +513,8 @@ func (bot *Engine) Start() error {
bot.currencyPairSyncer, err = setupSyncManager(
exchangeSyncCfg,
bot.ExchangeManager,
bot.websocketRoutineManager,
&bot.Config.RemoteControl)
&bot.Config.RemoteControl,
bot.Settings.EnableWebsocketRoutine)
if err != nil {
gctlog.Errorf(gctlog.Global, "Unable to initialise exchange currency pair syncer. Err: %s", err)
} else {

View File

@@ -177,10 +177,11 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error {
SyncTimeoutREST: bot.Settings.SyncTimeoutREST,
SyncTimeoutWebsocket: bot.Settings.SyncTimeoutWebsocket,
}
bot.currencyPairSyncer, err = setupSyncManager(exchangeSyncCfg,
bot.currencyPairSyncer, err = setupSyncManager(
exchangeSyncCfg,
bot.ExchangeManager,
bot.websocketRoutineManager,
&bot.Config.RemoteControl)
&bot.Config.RemoteControl,
bot.Settings.EnableWebsocketRoutine)
if err != nil {
return err
}

View File

@@ -37,10 +37,11 @@ var (
DefaultSyncerTimeoutWebsocket = time.Minute
errNoSyncItemsEnabled = errors.New("no sync items enabled")
errUnknownSyncItem = errors.New("unknown sync item")
errSyncPairNotFound = errors.New("exchange currency pair syncer not found")
)
// setupSyncManager starts a new CurrencyPairSyncer
func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketDataReceiver iWebsocketDataReceiver, remoteConfig *config.RemoteControlConfig) (*syncManager, error) {
func setupSyncManager(c *Config, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*syncManager, error) {
if !c.SyncOrderbook && !c.SyncTicker && !c.SyncTrades {
return nil, errNoSyncItemsEnabled
}
@@ -64,10 +65,10 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData
}
s := &syncManager{
config: *c,
remoteConfig: remoteConfig,
exchangeManager: exchangeManager,
websocketDataReceiver: websocketDataReceiver,
config: *c,
remoteConfig: remoteConfig,
exchangeManager: exchangeManager,
websocketRoutineManagerEnabled: websocketRoutineManagerEnabled,
}
s.tickerBatchLastRequested = make(map[string]time.Time)
@@ -75,7 +76,7 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData
log.Debugf(log.SyncMgr,
"Exchange currency pair syncer config: continuous: %v ticker: %v"+
" orderbook: %v trades: %v workers: %v verbose: %v timeout REST: %v"+
" timeout Websocket: %v\n",
" timeout Websocket: %v",
s.config.SyncContinuously, s.config.SyncTicker, s.config.SyncOrderbook,
s.config.SyncTrades, s.config.NumWorkers, s.config.Verbose, s.config.SyncTimeoutREST,
s.config.SyncTimeoutWebsocket)
@@ -110,44 +111,17 @@ func (m *syncManager) Start() error {
if !supportsREST && !supportsWebsocket {
log.Warnf(log.SyncMgr,
"Loaded exchange %s does not support REST or Websocket.\n",
"Loaded exchange %s does not support REST or Websocket.",
exchangeName)
continue
}
var usingWebsocket bool
var usingREST bool
if supportsWebsocket && exchanges[x].IsWebsocketEnabled() {
ws, err := exchanges[x].GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr,
"%s failed to get websocket. Err: %s\n",
exchangeName,
err)
usingREST = true
}
if !ws.IsConnected() && !ws.IsConnecting() {
if m.websocketDataReceiver.IsRunning() {
go m.websocketDataReceiver.WebsocketDataReceiver(ws)
}
err = ws.Connect()
if err == nil {
err = ws.FlushChannels()
}
if err != nil {
log.Errorf(log.SyncMgr,
"%s websocket failed to connect. Err: %s\n",
exchangeName,
err)
usingREST = true
} else {
usingWebsocket = true
}
} else {
usingWebsocket = true
}
if m.websocketRoutineManagerEnabled &&
supportsWebsocket &&
exchanges[x].IsWebsocketEnabled() {
usingWebsocket = true
} else if supportsREST {
usingREST = true
}
@@ -172,7 +146,7 @@ func (m *syncManager) Start() error {
enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr,
"%s failed to get enabled pairs. Err: %s\n",
"%s failed to get enabled pairs. Err: %s",
exchangeName,
err)
continue
@@ -182,37 +156,33 @@ func (m *syncManager) Start() error {
continue
}
c := currencyPairSyncAgent{
c := &currencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
sBase := syncBase{
IsUsingREST: usingREST || !wsAssetSupported,
IsUsingWebsocket: usingWebsocket && wsAssetSupported,
}
if m.config.SyncTicker {
c.Ticker = sBase
}
if m.config.SyncOrderbook {
c.Orderbook = sBase
}
if m.config.SyncTrades {
c.Trade = sBase
}
m.add(&c)
m.add(c)
}
}
}
if atomic.CompareAndSwapInt32(&m.initSyncStarted, 0, 1) {
log.Debugf(log.SyncMgr,
"Exchange CurrencyPairSyncer initial sync started. %d items to process.\n",
"Exchange CurrencyPairSyncer initial sync started. %d items to process.",
createdCounter)
m.initSyncStartTime = time.Now()
}
@@ -220,9 +190,9 @@ func (m *syncManager) Start() error {
go func() {
m.initSyncWG.Wait()
if atomic.CompareAndSwapInt32(&m.initSyncCompleted, 0, 1) {
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.\n")
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.")
completedTime := time.Now()
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].\n",
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].",
completedTime.Sub(m.initSyncStartTime), createdCounter)
if !m.config.SyncContinuously {
@@ -248,7 +218,6 @@ func (m *syncManager) Start() error {
}
// Stop shuts down the exchange currency pair syncer
// Stop attempts to shutdown the subsystem
func (m *syncManager) Stop() error {
if m == nil {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem)
@@ -273,7 +242,7 @@ func (m *syncManager) get(exchangeName string, p currency.Pair, a asset.Item) (*
}
}
return nil, errors.New("exchange currency pair syncer not found")
return nil, fmt.Errorf("%v %v %v %w", exchangeName, a, p, errSyncPairNotFound)
}
func (m *syncManager) exists(exchangeName string, p currency.Pair, a asset.Item) bool {
@@ -297,7 +266,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
if m.config.SyncTicker {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added ticker sync item %v: using websocket: %v using REST: %v\n",
"%s: Added ticker sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Ticker.IsUsingWebsocket,
c.Ticker.IsUsingREST)
}
@@ -310,7 +279,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
if m.config.SyncOrderbook {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added orderbook sync item %v: using websocket: %v using REST: %v\n",
"%s: Added orderbook sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Orderbook.IsUsingWebsocket,
c.Orderbook.IsUsingREST)
}
@@ -323,7 +292,7 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
if m.config.SyncTrades {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added trade sync item %v: using websocket: %v using REST: %v\n",
"%s: Added trade sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Trade.IsUsingWebsocket,
c.Trade.IsUsingREST)
}
@@ -427,7 +396,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item,
m.currencyPairs[x].Ticker.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].\n",
log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
@@ -445,7 +414,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item,
m.currencyPairs[x].Orderbook.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].\n",
log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
@@ -463,7 +432,7 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item,
m.currencyPairs[x].Trade.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].\n",
log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
@@ -496,7 +465,7 @@ func (m *syncManager) worker() {
ws, err := exchanges[x].GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr,
"%s unable to get websocket pointer. Err: %s\n",
"%s unable to get websocket pointer. Err: %s",
exchangeName,
err)
usingREST = true
@@ -517,7 +486,7 @@ func (m *syncManager) worker() {
enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr,
"%s failed to get enabled pairs. Err: %s\n",
"%s failed to get enabled pairs. Err: %s",
exchangeName,
err)
continue
@@ -527,41 +496,41 @@ func (m *syncManager) worker() {
return
}
if !m.exists(exchangeName, enabledPairs[i], assetTypes[y]) {
c := currencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
sBase := syncBase{
IsUsingREST: usingREST || !wsAssetSupported,
IsUsingWebsocket: usingWebsocket && wsAssetSupported,
}
if m.config.SyncTicker {
c.Ticker = sBase
}
if m.config.SyncOrderbook {
c.Orderbook = sBase
}
if m.config.SyncTrades {
c.Trade = sBase
}
m.add(&c)
}
c, err := m.get(exchangeName, enabledPairs[i], assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err)
continue
if err == errSyncPairNotFound {
c = &currencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
sBase := syncBase{
IsUsingREST: usingREST || !wsAssetSupported,
IsUsingWebsocket: usingWebsocket && wsAssetSupported,
}
if m.config.SyncTicker {
c.Ticker = sBase
}
if m.config.SyncOrderbook {
c.Orderbook = sBase
}
if m.config.SyncTrades {
c.Trade = sBase
}
m.add(c)
} else {
log.Error(log.SyncMgr, err)
continue
}
}
if switchedToRest && usingWebsocket {
log.Warnf(log.SyncMgr,
"%s %s: Websocket re-enabled, switching from rest to websocket\n",
"%s %s: Websocket re-enabled, switching from rest to websocket",
c.Exchange, m.FormatCurrency(enabledPairs[i]).String())
switchedToRest = false
}
@@ -580,7 +549,7 @@ func (m *syncManager) worker() {
c.Orderbook.IsUsingWebsocket = false
c.Orderbook.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest\n",
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
@@ -623,7 +592,7 @@ func (m *syncManager) worker() {
c.Ticker.IsUsingWebsocket = false
c.Ticker.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest\n",
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(enabledPairs[i]).String(),
strings.ToUpper(c.AssetType.String()),
@@ -650,14 +619,14 @@ func (m *syncManager) worker() {
if batchLastDone.IsZero() || time.Since(batchLastDone) > m.config.SyncTimeoutREST {
m.mux.Lock()
if m.config.Verbose {
log.Debugf(log.SyncMgr, "%s Init'ing REST ticker batching\n", exchangeName)
log.Debugf(log.SyncMgr, "Initialising %s REST ticker batching", exchangeName)
}
result, err = exchanges[x].UpdateTicker(c.Pair, c.AssetType)
m.tickerBatchLastRequested[exchangeName] = time.Now()
m.mux.Unlock()
} else {
if m.config.Verbose {
log.Debugf(log.SyncMgr, "%s Using recent batching cache\n", exchangeName)
log.Debugf(log.SyncMgr, "%s Using recent batching cache", exchangeName)
}
result, err = exchanges[x].FetchTicker(c.Pair, c.AssetType)
}
@@ -702,7 +671,7 @@ func (m *syncManager) worker() {
func printCurrencyFormat(price float64, displayCurrency currency.Code) string {
displaySymbol, err := currency.GetSymbolByCurrencyName(displayCurrency)
if err != nil {
log.Errorf(log.SyncMgr, "Failed to get display symbol: %s\n", err)
log.Errorf(log.SyncMgr, "Failed to get display symbol: %s", err)
}
return fmt.Sprintf("%s%.8f", displaySymbol, price)
@@ -713,17 +682,17 @@ func printConvertCurrencyFormat(origCurrency currency.Code, origPrice float64, d
origCurrency,
displayCurrency)
if err != nil {
log.Errorf(log.SyncMgr, "Failed to convert currency: %s\n", err)
log.Errorf(log.SyncMgr, "Failed to convert currency: %s", err)
}
displaySymbol, err := currency.GetSymbolByCurrencyName(displayCurrency)
if err != nil {
log.Errorf(log.SyncMgr, "Failed to get display symbol: %s\n", err)
log.Errorf(log.SyncMgr, "Failed to get display symbol: %s", err)
}
origSymbol, err := currency.GetSymbolByCurrencyName(origCurrency)
if err != nil {
log.Errorf(log.SyncMgr, "Failed to get original currency symbol for %s: %s\n",
log.Errorf(log.SyncMgr, "Failed to get original currency symbol for %s: %s",
origCurrency,
err)
}
@@ -745,12 +714,12 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
}
if err != nil {
if err == common.ErrNotYetImplemented {
log.Warnf(log.SyncMgr, "Failed to get %s ticker. Error: %s\n",
log.Warnf(log.SyncMgr, "Failed to get %s ticker. Error: %s",
protocol,
err)
return
}
log.Errorf(log.SyncMgr, "Failed to get %s ticker. Error: %s\n",
log.Errorf(log.SyncMgr, "Failed to get %s ticker. Error: %s",
protocol,
err)
return
@@ -763,7 +732,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
result.Pair.Quote != m.fiatDisplayCurrency &&
!m.fiatDisplayCurrency.IsEmpty() {
origCurrency := result.Pair.Quote.Upper()
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n",
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f",
result.ExchangeName,
protocol,
m.FormatCurrency(result.Pair),
@@ -778,7 +747,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
if result.Pair.Quote.IsFiatCurrency() &&
result.Pair.Quote == m.fiatDisplayCurrency &&
!m.fiatDisplayCurrency.IsEmpty() {
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n",
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f",
result.ExchangeName,
protocol,
m.FormatCurrency(result.Pair),
@@ -790,7 +759,7 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
printCurrencyFormat(result.Low, m.fiatDisplayCurrency),
result.Volume)
} else {
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f\n",
log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f",
result.ExchangeName,
protocol,
m.FormatCurrency(result.Pair),
@@ -815,7 +784,7 @@ func (m *syncManager) FormatCurrency(p currency.Pair) currency.Pair {
}
const (
book = "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n"
book = "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s"
)
// PrintOrderbookSummary outputs orderbook results
@@ -825,13 +794,13 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
}
if err != nil {
if result == nil {
log.Errorf(log.OrderBook, "Failed to get %s orderbook. Error: %s\n",
log.Errorf(log.OrderBook, "Failed to get %s orderbook. Error: %s",
protocol,
err)
return
}
if err == common.ErrNotYetImplemented {
log.Warnf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s\n",
log.Warnf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s",
protocol,
result.Exchange,
result.Pair,
@@ -839,7 +808,7 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
err)
return
}
log.Errorf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s\n",
log.Errorf(log.OrderBook, "Failed to get %s orderbook for %s %s %s. Error: %s",
protocol,
result.Exchange,
result.Pair,
@@ -907,7 +876,7 @@ func relayWebsocketEvent(result interface{}, event, assetType, exchangeName stri
}
err := BroadcastWebsocketMessage(evt)
if !errors.Is(err, ErrWebsocketServiceNotRunning) {
log.Errorf(log.APIServerMgr, "Failed to broadcast websocket event %v. Error: %s\n",
log.Errorf(log.APIServerMgr, "Failed to broadcast websocket event %v. Error: %s",
event, err)
}
}

View File

@@ -14,22 +14,22 @@ import (
func TestSetupSyncManager(t *testing.T) {
t.Parallel()
_, err := setupSyncManager(&Config{}, nil, nil, nil)
_, err := setupSyncManager(&Config{}, nil, nil, false)
if !errors.Is(err, errNoSyncItemsEnabled) {
t.Errorf("error '%v', expected '%v'", err, errNoSyncItemsEnabled)
}
_, err = setupSyncManager(&Config{SyncTrades: true}, nil, nil, nil)
_, err = setupSyncManager(&Config{SyncTrades: true}, nil, nil, false)
if !errors.Is(err, errNilExchangeManager) {
t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager)
}
_, err = setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, nil)
_, err = setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, false)
if !errors.Is(err, errNilConfig) {
t.Errorf("error '%v', expected '%v'", err, errNilConfig)
}
m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, &config.RemoteControlConfig{})
m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -40,7 +40,7 @@ func TestSetupSyncManager(t *testing.T) {
func TestSyncManagerStart(t *testing.T) {
t.Parallel()
m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, nil, &config.RemoteControlConfig{})
m, err := setupSyncManager(&Config{SyncTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -85,7 +85,7 @@ func TestSyncManagerStop(t *testing.T) {
}
exch.SetDefaults()
em.Add(exch)
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{})
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -133,7 +133,7 @@ func TestPrintTickerSummary(t *testing.T) {
}
exch.SetDefaults()
em.Add(exch)
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{})
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -172,7 +172,7 @@ func TestPrintOrderbookSummary(t *testing.T) {
}
exch.SetDefaults()
em.Add(exch)
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, nil, &config.RemoteControlConfig{})
m, err = setupSyncManager(&Config{SyncTrades: true, SyncContinuously: true}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}

View File

@@ -44,22 +44,22 @@ type Config struct {
// syncManager stores the exchange currency pair syncer object
type syncManager struct {
initSyncCompleted int32
initSyncStarted int32
started int32
delimiter string
uppercase bool
initSyncStartTime time.Time
fiatDisplayCurrency currency.Code
mux sync.Mutex
initSyncWG sync.WaitGroup
inService sync.WaitGroup
initSyncCompleted int32
initSyncStarted int32
started int32
delimiter string
uppercase bool
initSyncStartTime time.Time
fiatDisplayCurrency currency.Code
websocketRoutineManagerEnabled bool
mux sync.Mutex
initSyncWG sync.WaitGroup
inService sync.WaitGroup
currencyPairs []currencyPairSyncAgent
tickerBatchLastRequested map[string]time.Time
remoteConfig *config.RemoteControlConfig
config Config
exchangeManager iExchangeManager
websocketDataReceiver iWebsocketDataReceiver
remoteConfig *config.RemoteControlConfig
config Config
exchangeManager iExchangeManager
}

View File

@@ -51,7 +51,7 @@ func (m *websocketRoutineManager) Start() error {
return ErrSubSystemAlreadyStarted
}
m.shutdown = make(chan struct{})
go m.websocketRoutine()
m.websocketRoutine()
return nil
}
@@ -87,7 +87,7 @@ func (m *websocketRoutineManager) websocketRoutine() {
if exchanges[i].SupportsWebsocket() {
if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: Yes Enabled: %v\n",
"Exchange %s websocket support: Yes Enabled: %v",
exchanges[i].GetName(),
common.IsEnabled(exchanges[i].IsWebsocketEnabled()),
)
@@ -97,35 +97,27 @@ func (m *websocketRoutineManager) websocketRoutine() {
if err != nil {
log.Errorf(
log.WebsocketMgr,
"Exchange %s GetWebsocket error: %s\n",
"Exchange %s GetWebsocket error: %s",
exchanges[i].GetName(),
err,
)
return
}
// Exchange sync manager might have already started ws
// service or is in the process of connecting, so check
if ws.IsConnected() || ws.IsConnecting() {
return
}
// Data handler routine
go m.WebsocketDataReceiver(ws)
if ws.IsEnabled() {
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v\n", err)
log.Errorf(log.WebsocketMgr, "%v", err)
}
go m.WebsocketDataReceiver(ws)
err = ws.FlushChannels()
if err != nil {
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v\n", err)
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v", err)
}
}
} else if m.verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: No\n",
"Exchange %s websocket support: No",
exchanges[i].GetName(),
)
}

View File

@@ -75,19 +75,16 @@ type IBotExchange interface {
GetHistoricCandlesExtended(p currency.Pair, a asset.Item, timeStart, timeEnd time.Time, interval kline.Interval) (kline.Item, error)
DisableRateLimiter() error
EnableRateLimiter() error
// Websocket specific wrapper functionality
// GetWebsocket returns a pointer to the websocket
GetWebsocket() (*stream.Websocket, error)
IsWebsocketEnabled() bool
SupportsWebsocket() bool
SubscribeToWebsocketChannels(channels []stream.ChannelSubscription) error
UnsubscribeToWebsocketChannels(channels []stream.ChannelSubscription) error
IsAssetWebsocketSupported(aType asset.Item) bool
// FlushWebsocketChannels checks and flushes subscriptions if there is a
// pair,asset, url/proxy or subscription change
FlushWebsocketChannels() error
AuthenticateWebsocket() error
// Exchange order related execution limits
GetOrderExecutionLimits(a asset.Item, cp currency.Pair) (*order.Limits, error)
CheckOrderExecutionLimits(a asset.Item, cp currency.Pair, price, amount float64, orderType order.Type) error
UpdateOrderExecutionLimits(a asset.Item) error