egnine/sync-manager: add config support (#1326)

* allows sync manager customisation for values and logs

* config-example add

* who doesnt like more coverage?

* ensures you can actually disable it via config el oh el

* less ifs, better control

* fix verbose

* sync trades default false

* fix summary being printed when not enabled

* fixes config checker and output

* nits

* I can put this behind me now

* Fixed logCaSiNg

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* combines if statements

---------

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
This commit is contained in:
Scott
2023-08-29 15:48:41 +10:00
committed by GitHub
parent 04320f7aee
commit d3102a08dc
10 changed files with 281 additions and 108 deletions

View File

@@ -179,6 +179,12 @@ func validateSettings(b *Engine, s *Settings, flagSet FlagSet) {
flagSet.WithBool("currencystatemanager", &b.Settings.EnableCurrencyStateManager, b.Config.CurrencyStateManager.Enabled != nil && *b.Config.CurrencyStateManager.Enabled)
flagSet.WithBool("gctscriptmanager", &b.Settings.EnableGCTScriptManager, b.Config.GCTScript.Enabled)
flagSet.WithBool("tickersync", &b.Settings.EnableTickerSyncing, b.Config.SyncManagerConfig.SynchronizeTicker)
flagSet.WithBool("orderbooksync", &b.Settings.EnableOrderbookSyncing, b.Config.SyncManagerConfig.SynchronizeOrderbook)
flagSet.WithBool("tradesync", &b.Settings.EnableTradeSyncing, b.Config.SyncManagerConfig.SynchronizeTrades)
flagSet.WithBool("synccontinuously", &b.Settings.SyncContinuously, b.Config.SyncManagerConfig.SynchronizeContinuously)
flagSet.WithBool("syncmanager", &b.Settings.EnableExchangeSyncManager, b.Config.SyncManagerConfig.Enabled)
if b.Settings.EnablePortfolioManager &&
b.Settings.PortfolioManagerDelay <= 0 {
b.Settings.PortfolioManagerDelay = PortfolioSleepDelay
@@ -491,21 +497,27 @@ func (bot *Engine) Start() error {
}
if bot.Settings.EnableExchangeSyncManager {
exchangeSyncCfg := &SyncManagerConfig{
SynchronizeTicker: bot.Settings.EnableTickerSyncing,
SynchronizeOrderbook: bot.Settings.EnableOrderbookSyncing,
SynchronizeTrades: bot.Settings.EnableTradeSyncing,
SynchronizeContinuously: bot.Settings.SyncContinuously,
TimeoutREST: bot.Settings.SyncTimeoutREST,
TimeoutWebsocket: bot.Settings.SyncTimeoutWebsocket,
NumWorkers: bot.Settings.SyncWorkersCount,
Verbose: bot.Settings.Verbose,
FiatDisplayCurrency: bot.Config.Currency.FiatDisplayCurrency,
PairFormatDisplay: bot.Config.Currency.CurrencyPairFormat,
}
cfg := bot.Config.SyncManagerConfig
cfg.SynchronizeTicker = bot.Settings.EnableTickerSyncing
cfg.SynchronizeOrderbook = bot.Settings.EnableOrderbookSyncing
cfg.SynchronizeContinuously = bot.Settings.SyncContinuously
cfg.SynchronizeTrades = bot.Settings.EnableTradeSyncing
cfg.Verbose = bot.Settings.Verbose || cfg.Verbose
if cfg.TimeoutREST != bot.Settings.SyncTimeoutREST &&
bot.Settings.SyncTimeoutREST != config.DefaultSyncerTimeoutREST {
cfg.TimeoutREST = bot.Settings.SyncTimeoutREST
}
if cfg.TimeoutWebsocket != bot.Settings.SyncTimeoutWebsocket &&
bot.Settings.SyncTimeoutWebsocket != config.DefaultSyncerTimeoutWebsocket {
cfg.TimeoutWebsocket = bot.Settings.SyncTimeoutWebsocket
}
if cfg.NumWorkers != bot.Settings.SyncWorkersCount &&
bot.Settings.SyncWorkersCount != config.DefaultSyncerWorkers {
cfg.NumWorkers = bot.Settings.SyncWorkersCount
}
if s, err := setupSyncManager(
exchangeSyncCfg,
&cfg,
bot.ExchangeManager,
&bot.Config.RemoteControl,
bot.Settings.EnableWebsocketRoutine,

View File

@@ -185,19 +185,27 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error {
case SyncManagerName:
if enable {
if bot.currencyPairSyncer == nil {
exchangeSyncCfg := &SyncManagerConfig{
SynchronizeTicker: bot.Settings.EnableTickerSyncing,
SynchronizeOrderbook: bot.Settings.EnableOrderbookSyncing,
SynchronizeTrades: bot.Settings.EnableTradeSyncing,
SynchronizeContinuously: bot.Settings.SyncContinuously,
TimeoutREST: bot.Settings.SyncTimeoutREST,
TimeoutWebsocket: bot.Settings.SyncTimeoutWebsocket,
NumWorkers: bot.Settings.SyncWorkersCount,
FiatDisplayCurrency: bot.Config.Currency.FiatDisplayCurrency,
Verbose: bot.Settings.Verbose,
cfg := bot.Config.SyncManagerConfig
cfg.SynchronizeTicker = bot.Settings.EnableTickerSyncing
cfg.SynchronizeOrderbook = bot.Settings.EnableOrderbookSyncing
cfg.SynchronizeContinuously = bot.Settings.SyncContinuously
cfg.SynchronizeTrades = bot.Settings.EnableTradeSyncing
cfg.Verbose = bot.Settings.Verbose || cfg.Verbose
if cfg.TimeoutREST != bot.Settings.SyncTimeoutREST &&
bot.Settings.SyncTimeoutREST != config.DefaultSyncerTimeoutREST {
cfg.TimeoutREST = bot.Settings.SyncTimeoutREST
}
if cfg.TimeoutWebsocket != bot.Settings.SyncTimeoutWebsocket &&
bot.Settings.SyncTimeoutWebsocket != config.DefaultSyncerTimeoutWebsocket {
cfg.TimeoutWebsocket = bot.Settings.SyncTimeoutWebsocket
}
if cfg.NumWorkers != bot.Settings.SyncWorkersCount &&
bot.Settings.SyncWorkersCount != config.DefaultSyncerWorkers {
cfg.NumWorkers = bot.Settings.SyncWorkersCount
}
bot.currencyPairSyncer, err = setupSyncManager(
exchangeSyncCfg,
&cfg,
bot.ExchangeManager,
&bot.Config.RemoteControl,
bot.Settings.EnableWebsocketRoutine)

View File

@@ -32,21 +32,15 @@ const (
)
var (
createdCounter = 0
removedCounter = 0
// DefaultSyncerWorkers limits the number of sync workers
DefaultSyncerWorkers = 15
// DefaultSyncerTimeoutREST the default time to switch from REST to websocket protocols without a response
DefaultSyncerTimeoutREST = time.Second * 15
// DefaultSyncerTimeoutWebsocket the default time to switch from websocket to REST protocols without a response
DefaultSyncerTimeoutWebsocket = time.Minute
errNoSyncItemsEnabled = errors.New("no sync items enabled")
errUnknownSyncItem = errors.New("unknown sync item")
errCouldNotSyncNewData = errors.New("could not sync new data")
createdCounter = 0
removedCounter = 0
errNoSyncItemsEnabled = errors.New("no sync items enabled")
errUnknownSyncItem = errors.New("unknown sync item")
errCouldNotSyncNewData = errors.New("could not sync new data")
)
// setupSyncManager starts a new CurrencyPairSyncer
func setupSyncManager(c *SyncManagerConfig, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*syncManager, error) {
func setupSyncManager(c *config.SyncManagerConfig, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*syncManager, error) {
if c == nil {
return nil, fmt.Errorf("%T %w", c, common.ErrNilPointer)
}
@@ -62,15 +56,15 @@ func setupSyncManager(c *SyncManagerConfig, exchangeManager iExchangeManager, re
}
if c.NumWorkers <= 0 {
c.NumWorkers = DefaultSyncerWorkers
c.NumWorkers = config.DefaultSyncerWorkers
}
if c.TimeoutREST <= time.Duration(0) {
c.TimeoutREST = DefaultSyncerTimeoutREST
c.TimeoutREST = config.DefaultSyncerTimeoutREST
}
if c.TimeoutWebsocket <= time.Duration(0) {
c.TimeoutWebsocket = DefaultSyncerTimeoutWebsocket
c.TimeoutWebsocket = config.DefaultSyncerTimeoutWebsocket
}
if c.FiatDisplayCurrency.IsEmpty() {
@@ -120,6 +114,11 @@ func (m *syncManager) Start() error {
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return ErrSubSystemAlreadyStarted
}
if !m.config.SynchronizeTicker &&
!m.config.SynchronizeOrderbook &&
!m.config.SynchronizeTrades {
return errNoSyncItemsEnabled
}
m.shutdown = make(chan bool)
m.initSyncWG.Add(1)
m.inService.Done()
@@ -196,19 +195,22 @@ func (m *syncManager) Start() error {
}
if atomic.CompareAndSwapInt32(&m.initSyncStarted, 0, 1) {
log.Debugf(log.SyncMgr,
"Exchange CurrencyPairSyncer initial sync started. %d items to process.",
createdCounter)
if m.config.LogInitialSyncEvents {
log.Debugf(log.SyncMgr,
"Exchange CurrencyPairSyncer initial sync started. %d items to process.",
createdCounter)
}
m.initSyncStartTime = time.Now()
}
go func() {
m.initSyncWG.Wait()
if atomic.CompareAndSwapInt32(&m.initSyncCompleted, 0, 1) {
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.")
completedTime := time.Now()
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].",
completedTime.Sub(m.initSyncStartTime), createdCounter)
if m.config.LogInitialSyncEvents {
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.")
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].",
time.Since(m.initSyncStartTime), createdCounter)
}
if !m.config.SynchronizeContinuously {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopping.")
@@ -382,13 +384,15 @@ func (m *syncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a as
if !s.IsUsingWebsocket {
s.IsUsingWebsocket = true
s.IsUsingREST = false
log.Warnf(log.SyncMgr,
"%s %s %s: %s Websocket re-enabled, switching from rest to websocket",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
syncType,
)
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: %s Websocket re-enabled, switching from rest to websocket",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
syncType,
)
}
}
return m.update(c, syncType, err)
@@ -410,12 +414,14 @@ func (m *syncManager) update(c *currencyPairSyncAgent, syncType syncItemType, er
s.HaveData = true
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s %s sync complete %v [%d/%d].",
c.Exchange,
syncType,
m.FormatCurrency(c.Pair),
removedCounter,
createdCounter)
if m.config.LogInitialSyncEvents {
log.Debugf(log.SyncMgr, "%s %s sync complete %v [%d/%d].",
c.Exchange,
syncType,
m.FormatCurrency(c.Pair),
removedCounter,
createdCounter)
}
m.initSyncWG.Done()
}
@@ -532,13 +538,15 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
// Downgrade to REST
s.IsUsingWebsocket = false
s.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
}
}
if s.IsUsingREST && time.Since(s.LastUpdated) > m.config.TimeoutREST {
@@ -605,13 +613,15 @@ func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExc
// Downgrade to REST
s.IsUsingWebsocket = false
s.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
}
}
if s.IsUsingREST && time.Since(s.LastUpdated) > m.config.TimeoutREST {
@@ -691,6 +701,9 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return
}
if !m.config.SynchronizeTicker {
return
}
if err != nil {
if err == common.ErrNotYetImplemented {
log.Warnf(log.SyncMgr, "Failed to get %s ticker. Error: %s",
@@ -706,6 +719,9 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
// ignoring error as not all tickers have volume populated and error is not actionable
_ = stats.Add(result.ExchangeName, result.Pair, result.AssetType, result.Last, result.Volume)
if !m.config.LogSyncUpdateEvents {
return
}
if result.Pair.Quote.IsFiatCurrency() &&
!result.Pair.Quote.Equal(m.fiatDisplayCurrency) &&
@@ -771,6 +787,9 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return
}
if !m.config.SynchronizeOrderbook {
return
}
if err != nil {
if result == nil {
log.Errorf(log.OrderBook, "Failed to get %s orderbook. Error: %s",
@@ -795,7 +814,9 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
err)
return
}
if !m.config.LogSyncUpdateEvents {
return
}
bidsAmount, bidsValue := result.TotalBidsAmount()
asksAmount, asksValue := result.TotalAsksAmount()

View File

@@ -20,37 +20,37 @@ func TestSetupSyncManager(t *testing.T) {
t.Errorf("error '%v', expected '%v'", err, common.ErrNilPointer)
}
_, err = setupSyncManager(&SyncManagerConfig{}, nil, nil, false)
_, err = setupSyncManager(&config.SyncManagerConfig{}, nil, nil, false)
if !errors.Is(err, errNoSyncItemsEnabled) {
t.Errorf("error '%v', expected '%v'", err, errNoSyncItemsEnabled)
}
_, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true}, nil, nil, false)
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, nil, nil, false)
if !errors.Is(err, errNilExchangeManager) {
t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager)
}
_, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, nil, false)
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, nil, false)
if !errors.Is(err, errNilConfig) {
t.Errorf("error '%v', expected '%v'", err, errNilConfig)
}
_, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, currency.ErrCurrencyCodeEmpty) {
t.Errorf("error '%v', expected '%v'", err, currency.ErrCurrencyCodeEmpty)
}
_, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.BTC}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.BTC}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, currency.ErrFiatDisplayCurrencyIsNotFiat) {
t.Errorf("error '%v', expected '%v'", err, currency.ErrFiatDisplayCurrencyIsNotFiat)
}
_, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, common.ErrNilPointer) {
t.Errorf("error '%v', expected '%v'", err, common.ErrNilPointer)
}
m, err := setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
m, err := setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -61,7 +61,7 @@ func TestSetupSyncManager(t *testing.T) {
func TestSyncManagerStart(t *testing.T) {
t.Parallel()
m, err := setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
m, err := setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -112,7 +112,7 @@ func TestSyncManagerStop(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -163,7 +163,7 @@ func TestPrintTickerSummary(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -205,7 +205,7 @@ func TestPrintOrderbookSummary(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}

View File

@@ -33,20 +33,6 @@ type currencyPairSyncAgent struct {
locks []sync.Mutex
}
// SyncManagerConfig stores the currency pair synchronization manager config
type SyncManagerConfig struct {
SynchronizeTicker bool
SynchronizeOrderbook bool
SynchronizeTrades bool
SynchronizeContinuously bool
TimeoutREST time.Duration
TimeoutWebsocket time.Duration
NumWorkers int
FiatDisplayCurrency currency.Code
PairFormatDisplay *currency.PairFormat
Verbose bool
}
// syncManager stores the exchange currency pair syncer object
type syncManager struct {
initSyncCompleted int32
@@ -65,6 +51,6 @@ type syncManager struct {
tickerBatchLastRequested map[string]time.Time
remoteConfig *config.RemoteControlConfig
config SyncManagerConfig
config config.SyncManagerConfig
exchangeManager iExchangeManager
}