GCT: general updates across codebase (#699)

* orderbook: export orderbook nodes for external strategy inspection

* orderbook: Add in methods for locking and unlocking multiple books at the same time e.g. book1.LockWith(book2); defer book1.UnlockWith(book2)

* include waiting functionality for depth change alert

* backtester: add word.

* log: include logger changes to impl with downstream integration

* engine: reduce params for loading exchange

* assort: rm verbose in tests, change wording in ob, expose sync.waitgroup for ext. sync options

* ticker: reduce map look ups and contention when using RW mutex when there are over 80% writes adds find last function to get the latest rate

* engine/syncmanager: add in waitgroup for step over for external package calls

* cleaup

* engine: linter fix

* currency/fx: include all references to fiat currencies to default

* orderbook: Add in fields to Unsafe type for strategies to detect potential out of sync book operations

* syncmanager: changed config variable to display correct time

* ordermanager: Add time when none provided

* currency/manager: update getasset param to get enabled assets for minor optimizations

* ftx: use get all wallet balances for a better accounts breakdown

* orderbook: unlock in reverse order

* bithumb: fixes bug on market buy and sell orders

* bithumb: fix bug for nonce is also time window sensitive

* bithumb: get orders add required parameter

* bithumb: Add asset type to account struct

* currency: improve log output when checking currency and it fails

* bithumb: Add error return on incomplete pair

* ticker:unexport all service related methods

* ticker/currency: fixes

* orderbook: fix comment

* engine: revert variable name in LoadExchange method

* sync_manager: fix panic when enabling disabling manager

* engine: fix naming convention of exported function and comments

* engine: update comment

* orderbook: fix comment for unsafe type
This commit is contained in:
Ryan O'Hara-Reid
2021-07-29 14:42:28 +10:00
committed by GitHub
parent 4f5ab42bd8
commit a2381310da
64 changed files with 842 additions and 562 deletions

View File

@@ -306,7 +306,7 @@ func getAllActiveOrderbooks(m iExchangeManager) []EnabledExchangeOrderbooks {
var orderbookData []EnabledExchangeOrderbooks
exchanges := m.GetExchanges()
for x := range exchanges {
assets := exchanges[x].GetAssetTypes()
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
var exchangeOB EnabledExchangeOrderbooks
exchangeOB.ExchangeName = exchName
@@ -343,7 +343,7 @@ func getAllActiveTickers(m iExchangeManager) []EnabledExchangeCurrencies {
var tickers []EnabledExchangeCurrencies
exchanges := m.GetExchanges()
for x := range exchanges {
assets := exchanges[x].GetAssetTypes()
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
var exchangeTickers EnabledExchangeCurrencies
exchangeTickers.ExchangeName = exchName
@@ -380,7 +380,7 @@ func getAllActiveAccounts(m iExchangeManager) []AllEnabledExchangeAccounts {
var accounts []AllEnabledExchangeAccounts
exchanges := m.GetExchanges()
for x := range exchanges {
assets := exchanges[x].GetAssetTypes()
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
var exchangeAccounts AllEnabledExchangeAccounts
for y := range assets {

View File

@@ -705,8 +705,9 @@ func (bot *Engine) GetExchanges() []exchange.IBotExchange {
return bot.ExchangeManager.GetExchanges()
}
// LoadExchange loads an exchange by name
func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) error {
// LoadExchange loads an exchange by name. Optional wait group can be added for
// external synchronization.
func (bot *Engine) LoadExchange(name string, wg *sync.WaitGroup) error {
exch, err := bot.ExchangeManager.NewExchangeByName(name)
if err != nil {
return err
@@ -728,7 +729,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err
if bot.Settings.EnableAllPairs &&
exchCfg.CurrencyPairs != nil {
assets := exchCfg.CurrencyPairs.GetAssetTypes()
assets := exchCfg.CurrencyPairs.GetAssetTypes(false)
for x := range assets {
var pairs currency.Pairs
pairs, err = exchCfg.CurrencyPairs.GetPairs(assets[x], false)
@@ -797,7 +798,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err
base := exch.GetBase()
if base.API.AuthenticatedSupport ||
base.API.AuthenticatedWebsocketSupport {
assetTypes := base.GetAssetTypes()
assetTypes := base.GetAssetTypes(false)
var useAsset asset.Item
for a := range assetTypes {
err = base.CurrencyPairs.IsAssetEnabled(assetTypes[a])
@@ -820,7 +821,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err
}
}
if useWG {
if wg != nil {
exch.Start(wg)
} else {
tempWG := sync.WaitGroup{}
@@ -886,21 +887,20 @@ func (bot *Engine) SetupExchanges() error {
continue
}
wg.Add(1)
cfg := configs[x]
go func(currCfg config.ExchangeConfig) {
go func(c config.ExchangeConfig) {
defer wg.Done()
err := bot.LoadExchange(currCfg.Name, true, &wg)
err := bot.LoadExchange(c.Name, &wg)
if err != nil {
gctlog.Errorf(gctlog.ExchangeSys, "LoadExchange %s failed: %s\n", currCfg.Name, err)
gctlog.Errorf(gctlog.ExchangeSys, "LoadExchange %s failed: %s\n", c.Name, err)
return
}
gctlog.Debugf(gctlog.ExchangeSys,
"%s: Exchange support: Enabled (Authenticated API support: %s - Verbose mode: %s).\n",
currCfg.Name,
common.IsEnabled(currCfg.API.AuthenticatedSupport),
common.IsEnabled(currCfg.Verbose),
c.Name,
common.IsEnabled(c.API.AuthenticatedSupport),
common.IsEnabled(c.Verbose),
)
}(cfg)
}(configs[x])
}
wg.Wait()
if len(bot.ExchangeManager.GetExchanges()) == 0 {
@@ -908,3 +908,9 @@ func (bot *Engine) SetupExchanges() error {
}
return nil
}
// WaitForInitialCurrencySync allows for a routine to wait for the initial sync
// of the currency pair syncer management system.
func (bot *Engine) WaitForInitialCurrencySync() error {
return bot.currencyPairSyncer.WaitForInitialSync()
}

View File

@@ -260,7 +260,7 @@ func TestDryRunParamInteraction(t *testing.T) {
},
},
}
if err := bot.LoadExchange(testExchange, false, nil); err != nil {
if err := bot.LoadExchange(testExchange, nil); err != nil {
t.Error(err)
}
exchCfg, err := bot.Config.GetExchangeConfig(testExchange)
@@ -280,7 +280,7 @@ func TestDryRunParamInteraction(t *testing.T) {
bot.Settings.EnableDryRun = true
bot.Settings.CheckParamInteraction = true
bot.Settings.EnableExchangeVerbose = true
if err = bot.LoadExchange(testExchange, false, nil); err != nil {
if err = bot.LoadExchange(testExchange, nil); err != nil {
t.Error(err)
}

View File

@@ -718,7 +718,7 @@ func (bot *Engine) GetAllActiveTickers() []EnabledExchangeCurrencies {
var tickerData []EnabledExchangeCurrencies
exchanges := bot.GetExchanges()
for x := range exchanges {
assets := exchanges[x].GetAssetTypes()
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
var exchangeTicker EnabledExchangeCurrencies
exchangeTicker.ExchangeName = exchName

View File

@@ -80,7 +80,7 @@ func CreateTestBot(t *testing.T) *Engine {
},
},
}}}
err := bot.LoadExchange(testExchange, false, nil)
err := bot.LoadExchange(testExchange, nil)
if err != nil {
t.Fatalf("SetupTest: Failed to load exchange: %s", err)
}

View File

@@ -186,7 +186,7 @@ func (m *OrderManager) Cancel(cancel *order.Cancel) error {
return err
}
if cancel.AssetType.String() != "" && !exch.GetAssetTypes().Contains(cancel.AssetType) {
if cancel.AssetType.String() != "" && !exch.GetAssetTypes(false).Contains(cancel.AssetType) {
err = errors.New("order asset type not supported by exchange")
return err
}
@@ -413,6 +413,9 @@ func (m *OrderManager) processSubmittedOrder(newOrder *order.Submit, result orde
"Order manager: Unable to generate UUID. Err: %s",
err)
}
if newOrder.Date.IsZero() {
newOrder.Date = time.Now()
}
msg := fmt.Sprintf("Order manager: Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v side=%v type=%v for time %v.",
newOrder.Exchange,
result.OrderID,
@@ -488,7 +491,7 @@ func (m *OrderManager) processOrders() {
"Order manager: Processing orders for exchange %v.",
exchanges[i].GetName())
supportedAssets := exchanges[i].GetAssetTypes()
supportedAssets := exchanges[i].GetAssetTypes(true)
for y := range supportedAssets {
pairs, err := exchanges[i].GetEnabledPairs(supportedAssets[y])
if err != nil {

View File

@@ -243,7 +243,7 @@ func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExcha
}
continue
}
assetTypes := exchanges[x].GetAssetTypes()
assetTypes := exchanges[x].GetAssetTypes(false) // left as available for now, to sync the full spectrum
var exchangeHoldings account.Holdings
for y := range assetTypes {
accountHoldings, err := exchanges[x].FetchAccountInfo(assetTypes[y])

View File

@@ -277,7 +277,7 @@ func (s *RPCServer) DisableExchange(_ context.Context, r *gctrpc.GenericExchange
// EnableExchange enables an exchange
func (s *RPCServer) EnableExchange(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GenericResponse, error) {
err := s.LoadExchange(r.Exchange, false, nil)
err := s.LoadExchange(r.Exchange, nil)
if err != nil {
return nil, err
}
@@ -316,7 +316,7 @@ func (s *RPCServer) GetExchangeInfo(_ context.Context, r *gctrpc.GenericExchange
}
resp.SupportedAssets = make(map[string]*gctrpc.PairsSupported)
assets := exchCfg.CurrencyPairs.GetAssetTypes()
assets := exchCfg.CurrencyPairs.GetAssetTypes(false)
for i := range assets {
ps, err := exchCfg.CurrencyPairs.Get(assets[i])
if err != nil {
@@ -473,7 +473,7 @@ func (s *RPCServer) GetOrderbooks(_ context.Context, _ *gctrpc.GetOrderbooksRequ
if !exchanges[x].IsEnabled() {
continue
}
assets := exchanges[x].GetAssetTypes()
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
for y := range assets {
currencies, err := exchanges[x].GetEnabledPairs(assets[y])
@@ -1658,7 +1658,7 @@ func (s *RPCServer) GetExchangePairs(_ context.Context, r *gctrpc.GetExchangePai
if err != nil {
return nil, err
}
assetTypes := exchCfg.CurrencyPairs.GetAssetTypes()
assetTypes := exchCfg.CurrencyPairs.GetAssetTypes(false)
var a asset.Item
if r.Asset != "" {
@@ -2545,7 +2545,7 @@ func (s *RPCServer) SetAllExchangePairs(_ context.Context, r *gctrpc.SetExchange
return nil, errExchangeBaseNotFound
}
assets := base.CurrencyPairs.GetAssetTypes()
assets := base.CurrencyPairs.GetAssetTypes(false)
if r.Enable {
for i := range assets {
@@ -2615,7 +2615,7 @@ func (s *RPCServer) GetExchangeAssets(_ context.Context, r *gctrpc.GetExchangeAs
}
return &gctrpc.GetExchangeAssetsResponse{
Assets: exch.GetAssetTypes().JoinToString(","),
Assets: exch.GetAssetTypes(false).JoinToString(","),
}, nil
}

View File

@@ -79,6 +79,7 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData
s.config.SyncContinuously, s.config.SyncTicker, s.config.SyncOrderbook,
s.config.SyncTrades, s.config.NumWorkers, s.config.Verbose, s.config.SyncTimeoutREST,
s.config.SyncTimeoutWebsocket)
s.inService.Add(1)
return s, nil
}
@@ -98,12 +99,13 @@ func (m *syncManager) Start() error {
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return ErrSubSystemAlreadyStarted
}
m.initSyncWG.Add(1)
m.inService.Done()
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer started.")
exchanges := m.exchangeManager.GetExchanges()
for x := range exchanges {
exchangeName := exchanges[x].GetName()
supportsWebsocket := exchanges[x].SupportsWebsocket()
assetTypes := exchanges[x].GetAssetTypes()
supportsREST := exchanges[x].SupportsREST()
if !supportsREST && !supportsWebsocket {
@@ -150,6 +152,7 @@ func (m *syncManager) Start() error {
usingREST = true
}
assetTypes := exchanges[x].GetAssetTypes(false)
for y := range assetTypes {
if exchanges[x].GetBase().CurrencyPairs.IsAssetEnabled(assetTypes[y]) != nil {
log.Warnf(log.SyncMgr,
@@ -240,6 +243,7 @@ func (m *syncManager) Start() error {
for i := 0; i < m.config.NumWorkers; i++ {
go m.worker()
}
m.initSyncWG.Done()
return nil
}
@@ -252,6 +256,7 @@ func (m *syncManager) Stop() error {
if !atomic.CompareAndSwapInt32(&m.started, 1, 0) {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrSubSystemNotStarted)
}
m.inService.Add(1)
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopped.")
return nil
}
@@ -482,7 +487,6 @@ func (m *syncManager) worker() {
exchanges := m.exchangeManager.GetExchanges()
for x := range exchanges {
exchangeName := exchanges[x].GetName()
assetTypes := exchanges[x].GetAssetTypes()
supportsREST := exchanges[x].SupportsREST()
supportsRESTTickerBatching := exchanges[x].SupportsRESTTickerBatchUpdates()
var usingREST bool
@@ -507,10 +511,8 @@ func (m *syncManager) worker() {
usingREST = true
}
assetTypes := exchanges[x].GetAssetTypes(true)
for y := range assetTypes {
if exchanges[x].GetBase().CurrencyPairs.IsAssetEnabled(assetTypes[y]) != nil {
continue
}
wsAssetSupported := exchanges[x].IsAssetWebsocketSupported(assetTypes[y])
enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y])
if err != nil {
@@ -582,7 +584,7 @@ func (m *syncManager) worker() {
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
m.config.SyncTimeoutREST,
m.config.SyncTimeoutWebsocket,
)
switchedToRest = true
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false)
@@ -879,6 +881,23 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
)
}
// WaitForInitialSync allows for a routine to wait for an initial sync to be
// completed without exposing the underlying type. This needs to be called in a
// separate routine.
func (m *syncManager) WaitForInitialSync() error {
if m == nil {
return fmt.Errorf("sync manager %w", ErrNilSubsystem)
}
m.inService.Wait()
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("sync manager %w", ErrSubSystemNotStarted)
}
m.initSyncWG.Wait()
return nil
}
func relayWebsocketEvent(result interface{}, event, assetType, exchangeName string) {
evt := WebsocketEvent{
Data: result,

View File

@@ -207,3 +207,23 @@ func TestRelayWebsocketEvent(t *testing.T) {
relayWebsocketEvent(nil, "", "", "")
}
func TestWaitForInitialSync(t *testing.T) {
var m *syncManager
err := m.WaitForInitialSync()
if !errors.Is(err, ErrNilSubsystem) {
t.Fatalf("received %v, but expected: %v", err, ErrNilSubsystem)
}
m = &syncManager{}
err = m.WaitForInitialSync()
if !errors.Is(err, ErrSubSystemNotStarted) {
t.Fatalf("received %v, but expected: %v", err, ErrSubSystemNotStarted)
}
m.started = 1
err = m.WaitForInitialSync()
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
}

View File

@@ -53,6 +53,7 @@ type syncManager struct {
fiatDisplayCurrency currency.Code
mux sync.Mutex
initSyncWG sync.WaitGroup
inService sync.WaitGroup
currencyPairs []currencyPairSyncAgent
tickerBatchLastRequested map[string]time.Time