From a2381310daa672a55fb4c407b7944dec91ee2604 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Thu, 29 Jul 2021 14:42:28 +1000 Subject: [PATCH] GCT: general updates across codebase (#699) * orderbook: export orderbook nodes for external strategy inspection * orderbook: Add in methods for locking and unlocking multiple books at the same time e.g. book1.LockWith(book2); defer book1.UnlockWith(book2) * include waiting functionality for depth change alert * backtester: add word. * log: include logger changes to impl with downstream integration * engine: reduce params for loading exchange * assort: rm verbose in tests, change wording in ob, expose sync.waitgroup for ext. sync options * ticker: reduce map look ups and contention when using RW mutex when there are over 80% writes adds find last function to get the latest rate * engine/syncmanager: add in waitgroup for step over for external package calls * cleaup * engine: linter fix * currency/fx: include all references to fiat currencies to default * orderbook: Add in fields to Unsafe type for strategies to detect potential out of sync book operations * syncmanager: changed config variable to display correct time * ordermanager: Add time when none provided * currency/manager: update getasset param to get enabled assets for minor optimizations * ftx: use get all wallet balances for a better accounts breakdown * orderbook: unlock in reverse order * bithumb: fixes bug on market buy and sell orders * bithumb: fix bug for nonce is also time window sensitive * bithumb: get orders add required parameter * bithumb: Add asset type to account struct * currency: improve log output when checking currency and it fails * bithumb: Add error return on incomplete pair * ticker:unexport all service related methods * ticker/currency: fixes * orderbook: fix comment * engine: revert variable name in LoadExchange method * sync_manager: fix panic when enabling disabling manager * engine: fix naming convention of exported function and comments * engine: update comment * orderbook: fix comment for unsafe type --- backtester/backtest/backtest.go | 2 +- .../eventhandlers/exchange/exchange_test.go | 1 - backtester/main.go | 2 +- cmd/config_builder/builder.go | 2 +- cmd/exchange_wrapper_coverage/main.go | 4 +- cmd/exchange_wrapper_issues/main.go | 4 +- communications/slack/slack_test.go | 1 - config/config.go | 6 +- config/config_test.go | 2 +- currency/currency_test.go | 2 +- currency/forexprovider/base/base_interface.go | 23 +- currency/manager.go | 7 +- currency/manager_test.go | 47 ++-- currency/storage.go | 11 +- engine/apiserver.go | 6 +- engine/engine.go | 32 ++- engine/engine_test.go | 4 +- engine/helpers.go | 2 +- engine/helpers_test.go | 2 +- engine/order_manager.go | 7 +- engine/portfolio_manager.go | 2 +- engine/rpcserver.go | 12 +- engine/sync_manager.go | 31 ++- engine/sync_manager_test.go | 20 ++ engine/sync_manager_types.go | 1 + exchanges/binance/binance_websocket.go | 2 +- exchanges/binance/binance_wrapper.go | 18 +- exchanges/bitfinex/bitfinex_websocket.go | 2 +- exchanges/bitfinex/bitfinex_wrapper.go | 2 +- exchanges/bitflyer/bitflyer_wrapper.go | 2 +- exchanges/bithumb/bithumb.go | 27 +- exchanges/bithumb/bithumb_test.go | 6 +- exchanges/bithumb/bithumb_wrapper.go | 146 ++++++----- exchanges/bitmex/bitmex_websocket.go | 2 +- exchanges/btse/btse_test.go | 2 +- exchanges/btse/btse_wrapper.go | 2 +- exchanges/coinbene/coinbene_wrapper.go | 2 +- exchanges/exchange.go | 17 +- exchanges/exchange_test.go | 2 +- exchanges/ftx/ftx_test.go | 9 +- exchanges/ftx/ftx_websocket.go | 2 +- exchanges/ftx/ftx_wrapper.go | 35 +-- exchanges/interfaces.go | 2 +- exchanges/kraken/kraken_wrapper.go | 2 +- exchanges/okex/okex_wrapper.go | 2 +- exchanges/okgroup/okgroup_websocket.go | 2 +- exchanges/orderbook/linked_list.go | 242 +++++++++--------- exchanges/orderbook/linked_list_test.go | 92 +++---- exchanges/orderbook/node.go | 26 +- exchanges/orderbook/node_test.go | 14 +- exchanges/orderbook/orderbook.go | 3 + exchanges/orderbook/unsafe.go | 62 +++++ exchanges/orderbook/unsafe_test.go | 28 ++ exchanges/stream/websocket_test.go | 2 - exchanges/ticker/ticker.go | 220 +++++++++------- exchanges/ticker/ticker_test.go | 67 ++--- exchanges/ticker/ticker_types.go | 2 +- .../wrappers/gct/exchange/exchange_test.go | 2 +- log/logger.go | 21 +- log/logger_setup.go | 4 +- log/logger_test.go | 22 +- log/logger_types.go | 4 +- log/loggers.go | 26 +- log/sublogger_types.go | 48 ++-- 64 files changed, 842 insertions(+), 562 deletions(-) create mode 100644 exchanges/orderbook/unsafe.go create mode 100644 exchanges/orderbook/unsafe_test.go diff --git a/backtester/backtest/backtest.go b/backtester/backtest/backtest.go index 37e73263..6c0e0536 100644 --- a/backtester/backtest/backtest.go +++ b/backtester/backtest/backtest.go @@ -373,7 +373,7 @@ func (bt *BackTest) setupBot(cfg *config.Config, bot *engine.Engine) error { } bt.Bot.ExchangeManager = engine.SetupExchangeManager() for i := range cfg.CurrencySettings { - err = bt.Bot.LoadExchange(cfg.CurrencySettings[i].ExchangeName, false, nil) + err = bt.Bot.LoadExchange(cfg.CurrencySettings[i].ExchangeName, nil) if err != nil && !errors.Is(err, engine.ErrExchangeAlreadyLoaded) { return err } diff --git a/backtester/eventhandlers/exchange/exchange_test.go b/backtester/eventhandlers/exchange/exchange_test.go index 152a7b0e..5eb414e4 100644 --- a/backtester/eventhandlers/exchange/exchange_test.go +++ b/backtester/eventhandlers/exchange/exchange_test.go @@ -150,7 +150,6 @@ func TestPlaceOrder(t *testing.T) { if err != nil { t.Error(err) } - e := Exchange{} _, err = e.placeOrder(1, 1, false, true, nil, nil) if !errors.Is(err, common.ErrNilEvent) { diff --git a/backtester/main.go b/backtester/main.go index 4c21539a..7903d898 100644 --- a/backtester/main.go +++ b/backtester/main.go @@ -20,7 +20,7 @@ func main() { var printLogo, generateReport bool wd, err := os.Getwd() if err != nil { - fmt.Printf("Could get working directory. Error: %v.\n", err) + fmt.Printf("Could not get working directory. Error: %v.\n", err) os.Exit(1) } flag.StringVar( diff --git a/cmd/config_builder/builder.go b/cmd/config_builder/builder.go index 6d6074ca..03af1cca 100644 --- a/cmd/config_builder/builder.go +++ b/cmd/config_builder/builder.go @@ -21,7 +21,7 @@ func main() { var wg sync.WaitGroup for x := range exchange.Exchanges { name := exchange.Exchanges[x] - err = engine.Bot.LoadExchange(name, true, &wg) + err = engine.Bot.LoadExchange(name, &wg) if err != nil { log.Printf("Failed to load exchange %s. Err: %s", name, err) continue diff --git a/cmd/exchange_wrapper_coverage/main.go b/cmd/exchange_wrapper_coverage/main.go index 729124e5..efb14b81 100644 --- a/cmd/exchange_wrapper_coverage/main.go +++ b/cmd/exchange_wrapper_coverage/main.go @@ -33,7 +33,7 @@ func main() { log.Printf("Loading exchanges..") var wg sync.WaitGroup for x := range exchange.Exchanges { - err := engine.Bot.LoadExchange(exchange.Exchanges[x], true, &wg) + err := engine.Bot.LoadExchange(exchange.Exchanges[x], &wg) if err != nil { log.Printf("Failed to load exchange %s. Err: %s", exchange.Exchanges[x], @@ -76,7 +76,7 @@ func testWrappers(e exchange.IBotExchange) []string { p := currency.NewPair(currency.BTC, currency.USD) assetType := asset.Spot if !e.SupportsAsset(assetType) { - assets := e.GetAssetTypes() + assets := e.GetAssetTypes(false) rand.Seed(time.Now().Unix()) assetType = assets[rand.Intn(len(assets))] // nolint:gosec // basic number generation required, no need for crypo/rand } diff --git a/cmd/exchange_wrapper_issues/main.go b/cmd/exchange_wrapper_issues/main.go index 52ab56dc..321bb4e4 100644 --- a/cmd/exchange_wrapper_issues/main.go +++ b/cmd/exchange_wrapper_issues/main.go @@ -65,7 +65,7 @@ func main() { wrapperConfig.Exchanges[strings.ToLower(name)] = &config.APICredentialsConfig{} } if shouldLoadExchange(name) { - err = bot.LoadExchange(name, true, &wg) + err = bot.LoadExchange(name, &wg) if err != nil { log.Printf("Failed to load exchange %s. Err: %s", name, err) continue @@ -285,7 +285,7 @@ func testWrappers(e exchange.IBotExchange, base *exchange.Base, config *Config) var response []ExchangeAssetPairResponses testOrderSide := parseOrderSide(config.OrderSubmission.OrderSide) testOrderType := parseOrderType(config.OrderSubmission.OrderType) - assetTypes := base.GetAssetTypes() + assetTypes := base.GetAssetTypes(false) if assetTypeOverride != "" { a, err := asset.New(assetTypeOverride) if err != nil { diff --git a/communications/slack/slack_test.go b/communications/slack/slack_test.go index 465d1ada..352cc89c 100644 --- a/communications/slack/slack_test.go +++ b/communications/slack/slack_test.go @@ -20,7 +20,6 @@ func TestSetup(t *testing.T) { cfg := &config.Config{Communications: base.CommunicationsConfig{}} commsCfg := cfg.GetCommunicationsConfig() s.Setup(&commsCfg) - s.Verbose = true } func TestConnect(t *testing.T) { diff --git a/config/config.go b/config/config.go index 6ed252be..212fab37 100644 --- a/config/config.go +++ b/config/config.go @@ -347,7 +347,7 @@ func (c *Config) GetExchangeAssetTypes(exchName string) (asset.Items, error) { return nil, fmt.Errorf("exchange %s currency pairs is nil", exchName) } - return exchCfg.CurrencyPairs.GetAssetTypes(), nil + return exchCfg.CurrencyPairs.GetAssetTypes(false), nil } // SupportsExchangeAssetType returns whether or not the exchange supports the supplied asset type @@ -367,7 +367,7 @@ func (c *Config) SupportsExchangeAssetType(exchName string, assetType asset.Item assetType) } - if !exchCfg.CurrencyPairs.GetAssetTypes().Contains(assetType) { + if !exchCfg.CurrencyPairs.GetAssetTypes(false).Contains(assetType) { return fmt.Errorf("exchange %s unsupported asset type %s", exchName, assetType) @@ -899,7 +899,7 @@ func (c *Config) CheckExchangeConfigValues() error { c.Exchanges[i].AvailablePairs = nil c.Exchanges[i].EnabledPairs = nil } else { - assets := c.Exchanges[i].CurrencyPairs.GetAssetTypes() + assets := c.Exchanges[i].CurrencyPairs.GetAssetTypes(false) var atLeastOne bool for index := range assets { err := c.Exchanges[i].CurrencyPairs.IsAssetEnabled(assets[index]) diff --git a/config/config_test.go b/config/config_test.go index 15233761..cfdd76cc 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1478,7 +1478,7 @@ func TestCheckExchangeConfigValues(t *testing.T) { t.Error("unexpected request format values") } - if !cfg.Exchanges[0].CurrencyPairs.GetAssetTypes().Contains(asset.Spot) || + if !cfg.Exchanges[0].CurrencyPairs.GetAssetTypes(false).Contains(asset.Spot) || !cfg.Exchanges[0].CurrencyPairs.UseGlobalFormat { t.Error("unexpected results") } diff --git a/currency/currency_test.go b/currency/currency_test.go index 73568ba5..64977a26 100644 --- a/currency/currency_test.go +++ b/currency/currency_test.go @@ -65,7 +65,7 @@ func TestGetDefaulCryptoCurrencies(t *testing.T) { } func TestGetDefaultFiatCurrencies(t *testing.T) { - expected := Currencies{USD, AUD, EUR, CNY} + expected := Currencies{BZD, KYD, LRD, SAR, MKD, SRD, BMD, KHR, COP, CRC, GIP, NIO, CHF, VEF, ILS, BSD, CUP, HKD, IDR, SYP, AWG, TTD, DOP, JPY, PAB, SHP, BGN, JEP, AZN, JMD, MXN, CAD, GGP, RUR, GBP, GTQ, LBP, THB, MZN, RSD, ARS, BYN, HRK, GHS, MUR, ANG, QAR, ZWD, CLP, INR, IRR, NOK, PHP, LKR, TRY, BAM, EGP, TVD, SVC, FJD, PEN, RUB, SOS, XCD, KZT, BWP, ISK, KPW, KRW, PKR, UYU, BND, MNT, SEK, UAH, BBD, GYD, NZD, SCR, ZAR, FKP, HUF, RON, AFN, PLN, OMR, USD, CZK, YER, AUD, EUR, TWD, BRL, DKK, KGS, PYG, SBD, UZS, IMP, MYR, NAD, NPR, LAK, VND, ALL, BOB, HNL, SGD, CNY, NGN} if !GetDefaultFiatCurrencies().Match(expected) { t.Errorf("GetDefaultFiatCurrencies() expected %s but received %s", expected, GetDefaultFiatCurrencies()) diff --git a/currency/forexprovider/base/base_interface.go b/currency/forexprovider/base/base_interface.go index 18e9bbf4..5300a8a8 100644 --- a/currency/forexprovider/base/base_interface.go +++ b/currency/forexprovider/base/base_interface.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/log" ) // IFXProvider enforces standard functions for all foreign exchange providers @@ -89,20 +90,20 @@ func (f *FXHandler) GetCurrencyData(baseCurrency string, currencies []string) (m } if len(shunt) != 0 { - rateNew, err := f.backupGetRate(baseCurrency, shunt) - if err != nil { - return nil, fmt.Errorf("failed to update rate map for currencies %v %v", - shunt, - err) - } - - for key, val := range rateNew { - rates[key] = val - } - return rates, nil } + rateNew, err := f.backupGetRate(baseCurrency, shunt) + if err != nil { + log.Warnf(log.Global, "%s and subsequent providers, failed to update rate map for currencies %v %v", + f.Primary.Provider.GetName(), + shunt, + err) + } + + for key, val := range rateNew { + rates[key] = val + } return rates, nil } diff --git a/currency/manager.go b/currency/manager.go index b5fd2d64..37165f5c 100644 --- a/currency/manager.go +++ b/currency/manager.go @@ -9,11 +9,14 @@ import ( ) // GetAssetTypes returns a list of stored asset types -func (p *PairsManager) GetAssetTypes() asset.Items { +func (p *PairsManager) GetAssetTypes(enabled bool) asset.Items { p.m.RLock() defer p.m.RUnlock() var assetTypes asset.Items - for k := range p.Pairs { + for k, ps := range p.Pairs { + if enabled && (ps.AssetEnabled == nil || !*ps.AssetEnabled) { + continue + } assetTypes = append(assetTypes, k) } return assetTypes diff --git a/currency/manager_test.go b/currency/manager_test.go index 554c4f00..3e96a61f 100644 --- a/currency/manager_test.go +++ b/currency/manager_test.go @@ -3,6 +3,7 @@ package currency import ( "testing" + "github.com/thrasher-corp/gocryptotrader/common/convert" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) @@ -19,26 +20,36 @@ func initTest(t *testing.T) { t.Fatal(err) } - p.Store(asset.Spot, - PairStore{ - Available: spotAvailable, - Enabled: spotEnabled, - RequestFormat: &PairFormat{ - Uppercase: true, - }, - ConfigFormat: &PairFormat{ - Uppercase: true, - Delimiter: "-", - }, - }, - ) + spot := PairStore{ + AssetEnabled: convert.BoolPtr(true), + Available: spotAvailable, + Enabled: spotEnabled, + RequestFormat: &PairFormat{Uppercase: true}, + ConfigFormat: &PairFormat{Uppercase: true, Delimiter: "-"}, + } + + futures := PairStore{ + AssetEnabled: convert.BoolPtr(false), + Available: spotAvailable, + Enabled: spotEnabled, + RequestFormat: &PairFormat{Uppercase: true}, + ConfigFormat: &PairFormat{Uppercase: true, Delimiter: "-"}, + } + + p.Store(asset.Spot, spot) + p.Store(asset.Futures, futures) } func TestGetAssetTypes(t *testing.T) { initTest(t) - a := p.GetAssetTypes() - if len(a) == 0 { + a := p.GetAssetTypes(false) + if len(a) != 2 { + t.Errorf("expected 2 but received: %d", len(a)) + } + + a = p.GetAssetTypes(true) + if len(a) != 1 { t.Errorf("GetAssetTypes shouldn't be nil") } @@ -55,9 +66,9 @@ func TestGet(t *testing.T) { t.Error(err) } - _, err = p.Get(asset.Futures) + _, err = p.Get(asset.CoinMarginedFutures) if err == nil { - t.Error("Futures should be nil") + t.Error("CoinMarginedFutures should be nil") } } @@ -312,6 +323,8 @@ func TestIsAssetEnabled_SetAssetEnabled(t *testing.T) { // Test asset type which doesn't exist initTest(t) + p.Pairs[asset.Spot].AssetEnabled = nil + err = p.IsAssetEnabled(asset.Spot) if err == nil { t.Error("unexpected result") diff --git a/currency/storage.go b/currency/storage.go index 8819446c..eb67ece8 100644 --- a/currency/storage.go +++ b/currency/storage.go @@ -23,10 +23,19 @@ func init() { func (s *Storage) SetDefaults() { s.defaultBaseCurrency = USD s.baseCurrency = s.defaultBaseCurrency - err := s.SetDefaultFiatCurrencies(USD, AUD, EUR, CNY) + var fiatCurrencies []Code + for item := range symbols { + if item == USDT.Item { + continue + } + fiatCurrencies = append(fiatCurrencies, Code{Item: item, UpperCase: true}) + } + + err := s.SetDefaultFiatCurrencies(fiatCurrencies...) if err != nil { log.Errorf(log.Global, "Currency Storage: Setting default fiat currencies error: %s", err) } + err = s.SetDefaultCryptocurrencies(BTC, LTC, ETH, DOGE, DASH, XRP, XMR) if err != nil { log.Errorf(log.Global, "Currency Storage: Setting default cryptocurrencies error: %s", err) diff --git a/engine/apiserver.go b/engine/apiserver.go index 3e8286ab..0bee8f68 100644 --- a/engine/apiserver.go +++ b/engine/apiserver.go @@ -306,7 +306,7 @@ func getAllActiveOrderbooks(m iExchangeManager) []EnabledExchangeOrderbooks { var orderbookData []EnabledExchangeOrderbooks exchanges := m.GetExchanges() for x := range exchanges { - assets := exchanges[x].GetAssetTypes() + assets := exchanges[x].GetAssetTypes(true) exchName := exchanges[x].GetName() var exchangeOB EnabledExchangeOrderbooks exchangeOB.ExchangeName = exchName @@ -343,7 +343,7 @@ func getAllActiveTickers(m iExchangeManager) []EnabledExchangeCurrencies { var tickers []EnabledExchangeCurrencies exchanges := m.GetExchanges() for x := range exchanges { - assets := exchanges[x].GetAssetTypes() + assets := exchanges[x].GetAssetTypes(true) exchName := exchanges[x].GetName() var exchangeTickers EnabledExchangeCurrencies exchangeTickers.ExchangeName = exchName @@ -380,7 +380,7 @@ func getAllActiveAccounts(m iExchangeManager) []AllEnabledExchangeAccounts { var accounts []AllEnabledExchangeAccounts exchanges := m.GetExchanges() for x := range exchanges { - assets := exchanges[x].GetAssetTypes() + assets := exchanges[x].GetAssetTypes(true) exchName := exchanges[x].GetName() var exchangeAccounts AllEnabledExchangeAccounts for y := range assets { diff --git a/engine/engine.go b/engine/engine.go index 44b1fa3d..2d46d393 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -705,8 +705,9 @@ func (bot *Engine) GetExchanges() []exchange.IBotExchange { return bot.ExchangeManager.GetExchanges() } -// LoadExchange loads an exchange by name -func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) error { +// LoadExchange loads an exchange by name. Optional wait group can be added for +// external synchronization. +func (bot *Engine) LoadExchange(name string, wg *sync.WaitGroup) error { exch, err := bot.ExchangeManager.NewExchangeByName(name) if err != nil { return err @@ -728,7 +729,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err if bot.Settings.EnableAllPairs && exchCfg.CurrencyPairs != nil { - assets := exchCfg.CurrencyPairs.GetAssetTypes() + assets := exchCfg.CurrencyPairs.GetAssetTypes(false) for x := range assets { var pairs currency.Pairs pairs, err = exchCfg.CurrencyPairs.GetPairs(assets[x], false) @@ -797,7 +798,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err base := exch.GetBase() if base.API.AuthenticatedSupport || base.API.AuthenticatedWebsocketSupport { - assetTypes := base.GetAssetTypes() + assetTypes := base.GetAssetTypes(false) var useAsset asset.Item for a := range assetTypes { err = base.CurrencyPairs.IsAssetEnabled(assetTypes[a]) @@ -820,7 +821,7 @@ func (bot *Engine) LoadExchange(name string, useWG bool, wg *sync.WaitGroup) err } } - if useWG { + if wg != nil { exch.Start(wg) } else { tempWG := sync.WaitGroup{} @@ -886,21 +887,20 @@ func (bot *Engine) SetupExchanges() error { continue } wg.Add(1) - cfg := configs[x] - go func(currCfg config.ExchangeConfig) { + go func(c config.ExchangeConfig) { defer wg.Done() - err := bot.LoadExchange(currCfg.Name, true, &wg) + err := bot.LoadExchange(c.Name, &wg) if err != nil { - gctlog.Errorf(gctlog.ExchangeSys, "LoadExchange %s failed: %s\n", currCfg.Name, err) + gctlog.Errorf(gctlog.ExchangeSys, "LoadExchange %s failed: %s\n", c.Name, err) return } gctlog.Debugf(gctlog.ExchangeSys, "%s: Exchange support: Enabled (Authenticated API support: %s - Verbose mode: %s).\n", - currCfg.Name, - common.IsEnabled(currCfg.API.AuthenticatedSupport), - common.IsEnabled(currCfg.Verbose), + c.Name, + common.IsEnabled(c.API.AuthenticatedSupport), + common.IsEnabled(c.Verbose), ) - }(cfg) + }(configs[x]) } wg.Wait() if len(bot.ExchangeManager.GetExchanges()) == 0 { @@ -908,3 +908,9 @@ func (bot *Engine) SetupExchanges() error { } return nil } + +// WaitForInitialCurrencySync allows for a routine to wait for the initial sync +// of the currency pair syncer management system. +func (bot *Engine) WaitForInitialCurrencySync() error { + return bot.currencyPairSyncer.WaitForInitialSync() +} diff --git a/engine/engine_test.go b/engine/engine_test.go index d2988370..fa5d5e00 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -260,7 +260,7 @@ func TestDryRunParamInteraction(t *testing.T) { }, }, } - if err := bot.LoadExchange(testExchange, false, nil); err != nil { + if err := bot.LoadExchange(testExchange, nil); err != nil { t.Error(err) } exchCfg, err := bot.Config.GetExchangeConfig(testExchange) @@ -280,7 +280,7 @@ func TestDryRunParamInteraction(t *testing.T) { bot.Settings.EnableDryRun = true bot.Settings.CheckParamInteraction = true bot.Settings.EnableExchangeVerbose = true - if err = bot.LoadExchange(testExchange, false, nil); err != nil { + if err = bot.LoadExchange(testExchange, nil); err != nil { t.Error(err) } diff --git a/engine/helpers.go b/engine/helpers.go index d7f0342f..d42f8877 100644 --- a/engine/helpers.go +++ b/engine/helpers.go @@ -718,7 +718,7 @@ func (bot *Engine) GetAllActiveTickers() []EnabledExchangeCurrencies { var tickerData []EnabledExchangeCurrencies exchanges := bot.GetExchanges() for x := range exchanges { - assets := exchanges[x].GetAssetTypes() + assets := exchanges[x].GetAssetTypes(true) exchName := exchanges[x].GetName() var exchangeTicker EnabledExchangeCurrencies exchangeTicker.ExchangeName = exchName diff --git a/engine/helpers_test.go b/engine/helpers_test.go index 1c736eb8..34047372 100644 --- a/engine/helpers_test.go +++ b/engine/helpers_test.go @@ -80,7 +80,7 @@ func CreateTestBot(t *testing.T) *Engine { }, }, }}} - err := bot.LoadExchange(testExchange, false, nil) + err := bot.LoadExchange(testExchange, nil) if err != nil { t.Fatalf("SetupTest: Failed to load exchange: %s", err) } diff --git a/engine/order_manager.go b/engine/order_manager.go index 82a345a5..34e04359 100644 --- a/engine/order_manager.go +++ b/engine/order_manager.go @@ -186,7 +186,7 @@ func (m *OrderManager) Cancel(cancel *order.Cancel) error { return err } - if cancel.AssetType.String() != "" && !exch.GetAssetTypes().Contains(cancel.AssetType) { + if cancel.AssetType.String() != "" && !exch.GetAssetTypes(false).Contains(cancel.AssetType) { err = errors.New("order asset type not supported by exchange") return err } @@ -413,6 +413,9 @@ func (m *OrderManager) processSubmittedOrder(newOrder *order.Submit, result orde "Order manager: Unable to generate UUID. Err: %s", err) } + if newOrder.Date.IsZero() { + newOrder.Date = time.Now() + } msg := fmt.Sprintf("Order manager: Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v side=%v type=%v for time %v.", newOrder.Exchange, result.OrderID, @@ -488,7 +491,7 @@ func (m *OrderManager) processOrders() { "Order manager: Processing orders for exchange %v.", exchanges[i].GetName()) - supportedAssets := exchanges[i].GetAssetTypes() + supportedAssets := exchanges[i].GetAssetTypes(true) for y := range supportedAssets { pairs, err := exchanges[i].GetEnabledPairs(supportedAssets[y]) if err != nil { diff --git a/engine/portfolio_manager.go b/engine/portfolio_manager.go index 86505aad..4568cbf0 100644 --- a/engine/portfolio_manager.go +++ b/engine/portfolio_manager.go @@ -243,7 +243,7 @@ func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExcha } continue } - assetTypes := exchanges[x].GetAssetTypes() + assetTypes := exchanges[x].GetAssetTypes(false) // left as available for now, to sync the full spectrum var exchangeHoldings account.Holdings for y := range assetTypes { accountHoldings, err := exchanges[x].FetchAccountInfo(assetTypes[y]) diff --git a/engine/rpcserver.go b/engine/rpcserver.go index 06839969..ae5bdbc0 100644 --- a/engine/rpcserver.go +++ b/engine/rpcserver.go @@ -277,7 +277,7 @@ func (s *RPCServer) DisableExchange(_ context.Context, r *gctrpc.GenericExchange // EnableExchange enables an exchange func (s *RPCServer) EnableExchange(_ context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GenericResponse, error) { - err := s.LoadExchange(r.Exchange, false, nil) + err := s.LoadExchange(r.Exchange, nil) if err != nil { return nil, err } @@ -316,7 +316,7 @@ func (s *RPCServer) GetExchangeInfo(_ context.Context, r *gctrpc.GenericExchange } resp.SupportedAssets = make(map[string]*gctrpc.PairsSupported) - assets := exchCfg.CurrencyPairs.GetAssetTypes() + assets := exchCfg.CurrencyPairs.GetAssetTypes(false) for i := range assets { ps, err := exchCfg.CurrencyPairs.Get(assets[i]) if err != nil { @@ -473,7 +473,7 @@ func (s *RPCServer) GetOrderbooks(_ context.Context, _ *gctrpc.GetOrderbooksRequ if !exchanges[x].IsEnabled() { continue } - assets := exchanges[x].GetAssetTypes() + assets := exchanges[x].GetAssetTypes(true) exchName := exchanges[x].GetName() for y := range assets { currencies, err := exchanges[x].GetEnabledPairs(assets[y]) @@ -1658,7 +1658,7 @@ func (s *RPCServer) GetExchangePairs(_ context.Context, r *gctrpc.GetExchangePai if err != nil { return nil, err } - assetTypes := exchCfg.CurrencyPairs.GetAssetTypes() + assetTypes := exchCfg.CurrencyPairs.GetAssetTypes(false) var a asset.Item if r.Asset != "" { @@ -2545,7 +2545,7 @@ func (s *RPCServer) SetAllExchangePairs(_ context.Context, r *gctrpc.SetExchange return nil, errExchangeBaseNotFound } - assets := base.CurrencyPairs.GetAssetTypes() + assets := base.CurrencyPairs.GetAssetTypes(false) if r.Enable { for i := range assets { @@ -2615,7 +2615,7 @@ func (s *RPCServer) GetExchangeAssets(_ context.Context, r *gctrpc.GetExchangeAs } return &gctrpc.GetExchangeAssetsResponse{ - Assets: exch.GetAssetTypes().JoinToString(","), + Assets: exch.GetAssetTypes(false).JoinToString(","), }, nil } diff --git a/engine/sync_manager.go b/engine/sync_manager.go index 500253d3..71d760a9 100644 --- a/engine/sync_manager.go +++ b/engine/sync_manager.go @@ -79,6 +79,7 @@ func setupSyncManager(c *Config, exchangeManager iExchangeManager, websocketData s.config.SyncContinuously, s.config.SyncTicker, s.config.SyncOrderbook, s.config.SyncTrades, s.config.NumWorkers, s.config.Verbose, s.config.SyncTimeoutREST, s.config.SyncTimeoutWebsocket) + s.inService.Add(1) return s, nil } @@ -98,12 +99,13 @@ func (m *syncManager) Start() error { if !atomic.CompareAndSwapInt32(&m.started, 0, 1) { return ErrSubSystemAlreadyStarted } + m.initSyncWG.Add(1) + m.inService.Done() log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer started.") exchanges := m.exchangeManager.GetExchanges() for x := range exchanges { exchangeName := exchanges[x].GetName() supportsWebsocket := exchanges[x].SupportsWebsocket() - assetTypes := exchanges[x].GetAssetTypes() supportsREST := exchanges[x].SupportsREST() if !supportsREST && !supportsWebsocket { @@ -150,6 +152,7 @@ func (m *syncManager) Start() error { usingREST = true } + assetTypes := exchanges[x].GetAssetTypes(false) for y := range assetTypes { if exchanges[x].GetBase().CurrencyPairs.IsAssetEnabled(assetTypes[y]) != nil { log.Warnf(log.SyncMgr, @@ -240,6 +243,7 @@ func (m *syncManager) Start() error { for i := 0; i < m.config.NumWorkers; i++ { go m.worker() } + m.initSyncWG.Done() return nil } @@ -252,6 +256,7 @@ func (m *syncManager) Stop() error { if !atomic.CompareAndSwapInt32(&m.started, 1, 0) { return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrSubSystemNotStarted) } + m.inService.Add(1) log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopped.") return nil } @@ -482,7 +487,6 @@ func (m *syncManager) worker() { exchanges := m.exchangeManager.GetExchanges() for x := range exchanges { exchangeName := exchanges[x].GetName() - assetTypes := exchanges[x].GetAssetTypes() supportsREST := exchanges[x].SupportsREST() supportsRESTTickerBatching := exchanges[x].SupportsRESTTickerBatchUpdates() var usingREST bool @@ -507,10 +511,8 @@ func (m *syncManager) worker() { usingREST = true } + assetTypes := exchanges[x].GetAssetTypes(true) for y := range assetTypes { - if exchanges[x].GetBase().CurrencyPairs.IsAssetEnabled(assetTypes[y]) != nil { - continue - } wsAssetSupported := exchanges[x].IsAssetWebsocketSupported(assetTypes[y]) enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y]) if err != nil { @@ -582,7 +584,7 @@ func (m *syncManager) worker() { c.Exchange, m.FormatCurrency(c.Pair).String(), strings.ToUpper(c.AssetType.String()), - m.config.SyncTimeoutREST, + m.config.SyncTimeoutWebsocket, ) switchedToRest = true m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false) @@ -879,6 +881,23 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str ) } +// WaitForInitialSync allows for a routine to wait for an initial sync to be +// completed without exposing the underlying type. This needs to be called in a +// separate routine. +func (m *syncManager) WaitForInitialSync() error { + if m == nil { + return fmt.Errorf("sync manager %w", ErrNilSubsystem) + } + + m.inService.Wait() + if atomic.LoadInt32(&m.started) == 0 { + return fmt.Errorf("sync manager %w", ErrSubSystemNotStarted) + } + + m.initSyncWG.Wait() + return nil +} + func relayWebsocketEvent(result interface{}, event, assetType, exchangeName string) { evt := WebsocketEvent{ Data: result, diff --git a/engine/sync_manager_test.go b/engine/sync_manager_test.go index 4c546418..4e0cb4b4 100644 --- a/engine/sync_manager_test.go +++ b/engine/sync_manager_test.go @@ -207,3 +207,23 @@ func TestRelayWebsocketEvent(t *testing.T) { relayWebsocketEvent(nil, "", "", "") } + +func TestWaitForInitialSync(t *testing.T) { + var m *syncManager + err := m.WaitForInitialSync() + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received %v, but expected: %v", err, ErrNilSubsystem) + } + + m = &syncManager{} + err = m.WaitForInitialSync() + if !errors.Is(err, ErrSubSystemNotStarted) { + t.Fatalf("received %v, but expected: %v", err, ErrSubSystemNotStarted) + } + + m.started = 1 + err = m.WaitForInitialSync() + if !errors.Is(err, nil) { + t.Fatalf("received %v, but expected: %v", err, nil) + } +} diff --git a/engine/sync_manager_types.go b/engine/sync_manager_types.go index 38121ad2..9beb1f9e 100644 --- a/engine/sync_manager_types.go +++ b/engine/sync_manager_types.go @@ -53,6 +53,7 @@ type syncManager struct { fiatDisplayCurrency currency.Code mux sync.Mutex initSyncWG sync.WaitGroup + inService sync.WaitGroup currencyPairs []currencyPairSyncAgent tickerBatchLastRequested map[string]time.Time diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 0a7762ce..d250e25d 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -516,7 +516,7 @@ func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) { func (b *Binance) GenerateSubscriptions() ([]stream.ChannelSubscription, error) { var channels = []string{"@ticker", "@trade", "@kline_1m", "@depth@100ms"} var subscriptions []stream.ChannelSubscription - assets := b.GetAssetTypes() + assets := b.GetAssetTypes(true) for x := range assets { if assets[x] == asset.Spot { pairs, err := b.GetEnabledPairs(assets[x]) diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index 171ba89c..0e7d65aa 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -315,16 +315,14 @@ func (b *Binance) Run() { } } - a := b.GetAssetTypes() + a := b.GetAssetTypes(true) for x := range a { - if err = b.CurrencyPairs.IsAssetEnabled(a[x]); err == nil { - err = b.UpdateOrderExecutionLimits(a[x]) - if err != nil { - log.Errorf(log.ExchangeSys, - "%s failed to set exchange order execution limits. Err: %v", - b.Name, - err) - } + err = b.UpdateOrderExecutionLimits(a[x]) + if err != nil { + log.Errorf(log.ExchangeSys, + "%s failed to set exchange order execution limits. Err: %v", + b.Name, + err) } } @@ -404,7 +402,7 @@ func (b *Binance) FetchTradablePairs(a asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (b *Binance) UpdateTradablePairs(forceUpdate bool) error { - assetTypes := b.GetAssetTypes() + assetTypes := b.GetAssetTypes(false) for i := range assetTypes { p, err := b.FetchTradablePairs(assetTypes[i]) if err != nil { diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 23d8918b..cd1cea44 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1062,7 +1062,7 @@ func (b *Bitfinex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, } var subscriptions []stream.ChannelSubscription - assets := b.GetAssetTypes() + assets := b.GetAssetTypes(true) for i := range assets { enabledPairs, err := b.GetEnabledPairs(assets[i]) if err != nil { diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 8dbf9ab0..e73cd368 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -303,7 +303,7 @@ func (b *Bitfinex) FetchTradablePairs(a asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (b *Bitfinex) UpdateTradablePairs(forceUpdate bool) error { - assets := b.CurrencyPairs.GetAssetTypes() + assets := b.CurrencyPairs.GetAssetTypes(false) for i := range assets { pairs, err := b.FetchTradablePairs(assets[i]) if err != nil { diff --git a/exchanges/bitflyer/bitflyer_wrapper.go b/exchanges/bitflyer/bitflyer_wrapper.go index b3cccd36..392ddfbf 100644 --- a/exchanges/bitflyer/bitflyer_wrapper.go +++ b/exchanges/bitflyer/bitflyer_wrapper.go @@ -167,7 +167,7 @@ func (b *Bitflyer) FetchTradablePairs(assetType asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (b *Bitflyer) UpdateTradablePairs(forceUpdate bool) error { - assets := b.CurrencyPairs.GetAssetTypes() + assets := b.CurrencyPairs.GetAssetTypes(false) for x := range assets { pairs, err := b.FetchTradablePairs(assets[x]) if err != nil { diff --git a/exchanges/bithumb/bithumb.go b/exchanges/bithumb/bithumb.go index e89b839e..1c1a0def 100644 --- a/exchanges/bithumb/bithumb.go +++ b/exchanges/bithumb/bithumb.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "strings" + "time" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" @@ -262,8 +263,14 @@ func (b *Bithumb) GetLastTransaction() (LastTransactionTicker, error) { // (2014-11-28 16:40:01 = 1417160401000) func (b *Bithumb) GetOrders(orderID, transactionType, count, after, currency string) (Orders, error) { response := Orders{} - params := url.Values{} + + if currency == "" { + return response, errors.New("order currency is required") + } + + params.Set("order_currency", strings.ToUpper(currency)) + if len(orderID) > 0 { params.Set("order_id", orderID) } @@ -280,10 +287,6 @@ func (b *Bithumb) GetOrders(orderID, transactionType, count, after, currency str params.Set("after", after) } - if len(currency) > 0 { - params.Set("currency", strings.ToUpper(currency)) - } - return response, b.SendAuthenticatedHTTPRequest(exchange.RestSpot, privateOrders, params, &response) } @@ -422,11 +425,12 @@ func (b *Bithumb) RequestKRWWithdraw(bank, account string, price int64) (ActionS // currency: BTC, ETH, DASH, LTC, ETC, XRP, BCH, XMR, ZEC, QTUM, BTG, EOS // (default value: BTC) // units: Order quantity -func (b *Bithumb) MarketBuyOrder(currency string, units float64) (MarketBuy, error) { +func (b *Bithumb) MarketBuyOrder(pair currency.Pair, units float64) (MarketBuy, error) { response := MarketBuy{} params := url.Values{} - params.Set("currency", strings.ToUpper(currency)) + params.Set("order_currency", strings.ToUpper(pair.Base.String())) + params.Set("payment_currency", strings.ToUpper(pair.Quote.String())) params.Set("units", strconv.FormatFloat(units, 'f', -1, 64)) return response, @@ -438,11 +442,12 @@ func (b *Bithumb) MarketBuyOrder(currency string, units float64) (MarketBuy, err // currency: BTC, ETH, DASH, LTC, ETC, XRP, BCH, XMR, ZEC, QTUM, BTG, EOS // (default value: BTC) // units: Order quantity -func (b *Bithumb) MarketSellOrder(currency string, units float64) (MarketSell, error) { +func (b *Bithumb) MarketSellOrder(pair currency.Pair, units float64) (MarketSell, error) { response := MarketSell{} params := url.Values{} - params.Set("currency", strings.ToUpper(currency)) + params.Set("order_currency", strings.ToUpper(pair.Base.String())) + params.Set("payment_currency", strings.ToUpper(pair.Quote.String())) params.Set("units", strconv.FormatFloat(units, 'f', -1, 64)) return response, @@ -478,7 +483,9 @@ func (b *Bithumb) SendAuthenticatedHTTPRequest(ep exchange.URL, path string, par params = url.Values{} } - n := b.Requester.GetNonceMilli().String() + // This is time window sensitive + tnMS := time.Now().UnixNano() / int64(time.Millisecond) + n := strconv.FormatInt(tnMS, 10) params.Set("endpoint", path) payload := params.Encode() diff --git a/exchanges/bithumb/bithumb_test.go b/exchanges/bithumb/bithumb_test.go index 61224713..181ff49c 100644 --- a/exchanges/bithumb/bithumb_test.go +++ b/exchanges/bithumb/bithumb_test.go @@ -211,7 +211,8 @@ func TestRequestKRWWithdraw(t *testing.T) { func TestMarketBuyOrder(t *testing.T) { t.Parallel() - _, err := b.MarketBuyOrder(testCurrency, 0) + p := currency.NewPair(currency.BTC, currency.KRW) + _, err := b.MarketBuyOrder(p, 0) if err == nil { t.Error("Bithumb MarketBuyOrder() Expected error") } @@ -219,7 +220,8 @@ func TestMarketBuyOrder(t *testing.T) { func TestMarketSellOrder(t *testing.T) { t.Parallel() - _, err := b.MarketSellOrder(testCurrency, 0) + p := currency.NewPair(currency.BTC, currency.KRW) + _, err := b.MarketSellOrder(p, 0) if err == nil { t.Error("Bithumb MarketSellOrder() Expected error") } diff --git a/exchanges/bithumb/bithumb_wrapper.go b/exchanges/bithumb/bithumb_wrapper.go index 8028507e..a98ec08b 100644 --- a/exchanges/bithumb/bithumb_wrapper.go +++ b/exchanges/bithumb/bithumb_wrapper.go @@ -26,6 +26,8 @@ import ( "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) +var errNotEnoughPairs = errors.New("at least one currency is required to fetch order history") + // GetDefaultConfig returns a default exchange config func (b *Bithumb) GetDefaultConfig() (*config.ExchangeConfig, error) { b.SetDefaults() @@ -307,6 +309,7 @@ func (b *Bithumb) UpdateAccountInfo(assetType asset.Item) (account.Holdings, err info.Accounts = append(info.Accounts, account.SubAccount{ Currencies: exchangeBalances, + AssetType: assetType, }) info.Exchange = b.Name @@ -403,14 +406,14 @@ func (b *Bithumb) SubmitOrder(s *order.Submit) (order.SubmitResponse, error) { var orderID string if s.Side == order.Buy { var result MarketBuy - result, err = b.MarketBuyOrder(fPair.Base.String(), s.Amount) + result, err = b.MarketBuyOrder(fPair, s.Amount) if err != nil { return submitOrderResponse, err } orderID = result.OrderID } else if s.Side == order.Sell { var result MarketSell - result, err = b.MarketSellOrder(fPair.Base.String(), s.Amount) + result, err = b.MarketSellOrder(fPair, s.Amount) if err != nil { return submitOrderResponse, err } @@ -583,43 +586,50 @@ func (b *Bithumb) GetActiveOrders(req *order.GetOrdersRequest) ([]order.Detail, if err := req.Validate(); err != nil { return nil, err } + + if len(req.Pairs) == 0 { + return nil, errNotEnoughPairs + } + + format, err := b.GetPairFormat(req.AssetType, false) + if err != nil { + return nil, err + } + var orders []order.Detail - resp, err := b.GetOrders("", "", "1000", "", "") - if err != nil { - return nil, err - } - - format, err := b.GetPairFormat(asset.Spot, false) - if err != nil { - return nil, err - } - - for i := range resp.Data { - if resp.Data[i].Status != "placed" { - continue + for x := range req.Pairs { + resp, err := b.GetOrders("", "", "1000", "", req.Pairs[x].Base.String()) + if err != nil { + return nil, err } - orderDate := time.Unix(resp.Data[i].OrderDate, 0) - orderDetail := order.Detail{ - Amount: resp.Data[i].Units, - Exchange: b.Name, - ID: resp.Data[i].OrderID, - Date: orderDate, - Price: resp.Data[i].Price, - RemainingAmount: resp.Data[i].UnitsRemaining, - Status: order.Active, - Pair: currency.NewPairWithDelimiter(resp.Data[i].OrderCurrency, - resp.Data[i].PaymentCurrency, - format.Delimiter), - } + for i := range resp.Data { + if resp.Data[i].Status != "placed" { + continue + } - if resp.Data[i].Type == "bid" { - orderDetail.Side = order.Buy - } else if resp.Data[i].Type == "ask" { - orderDetail.Side = order.Sell - } + orderDate := time.Unix(resp.Data[i].OrderDate, 0) + orderDetail := order.Detail{ + Amount: resp.Data[i].Units, + Exchange: b.Name, + ID: resp.Data[i].OrderID, + Date: orderDate, + Price: resp.Data[i].Price, + RemainingAmount: resp.Data[i].UnitsRemaining, + Status: order.Active, + Pair: currency.NewPairWithDelimiter(resp.Data[i].OrderCurrency, + resp.Data[i].PaymentCurrency, + format.Delimiter), + } - orders = append(orders, orderDetail) + if resp.Data[i].Type == "bid" { + orderDetail.Side = order.Buy + } else if resp.Data[i].Type == "ask" { + orderDetail.Side = order.Sell + } + + orders = append(orders, orderDetail) + } } order.FilterOrdersBySide(&orders, req.Side) @@ -635,42 +645,48 @@ func (b *Bithumb) GetOrderHistory(req *order.GetOrdersRequest) ([]order.Detail, return nil, err } + if len(req.Pairs) == 0 { + return nil, errNotEnoughPairs + } + + format, err := b.GetPairFormat(req.AssetType, false) + if err != nil { + return nil, err + } + var orders []order.Detail - resp, err := b.GetOrders("", "", "1000", "", "") - if err != nil { - return nil, err - } - - format, err := b.GetPairFormat(asset.Spot, false) - if err != nil { - return nil, err - } - - for i := range resp.Data { - if resp.Data[i].Status == "placed" { - continue + for x := range req.Pairs { + resp, err := b.GetOrders("", "", "1000", "", req.Pairs[x].Base.String()) + if err != nil { + return nil, err } - orderDate := time.Unix(resp.Data[i].OrderDate, 0) - orderDetail := order.Detail{ - Amount: resp.Data[i].Units, - Exchange: b.Name, - ID: resp.Data[i].OrderID, - Date: orderDate, - Price: resp.Data[i].Price, - RemainingAmount: resp.Data[i].UnitsRemaining, - Pair: currency.NewPairWithDelimiter(resp.Data[i].OrderCurrency, - resp.Data[i].PaymentCurrency, - format.Delimiter), - } + for i := range resp.Data { + if resp.Data[i].Status == "placed" { + continue + } - if resp.Data[i].Type == "bid" { - orderDetail.Side = order.Buy - } else if resp.Data[i].Type == "ask" { - orderDetail.Side = order.Sell - } + orderDate := time.Unix(resp.Data[i].OrderDate, 0) + orderDetail := order.Detail{ + Amount: resp.Data[i].Units, + Exchange: b.Name, + ID: resp.Data[i].OrderID, + Date: orderDate, + Price: resp.Data[i].Price, + RemainingAmount: resp.Data[i].UnitsRemaining, + Pair: currency.NewPairWithDelimiter(resp.Data[i].OrderCurrency, + resp.Data[i].PaymentCurrency, + format.Delimiter), + } - orders = append(orders, orderDetail) + if resp.Data[i].Type == "bid" { + orderDetail.Side = order.Buy + } else if resp.Data[i].Type == "ask" { + orderDetail.Side = order.Sell + } + + orders = append(orders, orderDetail) + } } order.FilterOrdersBySide(&orders, req.Side) diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 198ace92..8c65f9f9 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -557,7 +557,7 @@ func (b *Bitmex) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e }, } - assets := b.GetAssetTypes() + assets := b.GetAssetTypes(true) for x := range assets { contracts, err := b.GetEnabledPairs(assets[x]) if err != nil { diff --git a/exchanges/btse/btse_test.go b/exchanges/btse/btse_test.go index e6cd2ec8..7d7d6585 100644 --- a/exchanges/btse/btse_test.go +++ b/exchanges/btse/btse_test.go @@ -678,7 +678,7 @@ func TestStatusToStandardStatus(t *testing.T) { func TestFetchTradablePairs(t *testing.T) { t.Parallel() - assets := b.GetAssetTypes() + assets := b.GetAssetTypes(false) for i := range assets { data, err := b.FetchTradablePairs(assets[i]) if err != nil { diff --git a/exchanges/btse/btse_wrapper.go b/exchanges/btse/btse_wrapper.go index a6175f5e..fa910196 100644 --- a/exchanges/btse/btse_wrapper.go +++ b/exchanges/btse/btse_wrapper.go @@ -264,7 +264,7 @@ func (b *BTSE) FetchTradablePairs(a asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (b *BTSE) UpdateTradablePairs(forceUpdate bool) error { - a := b.GetAssetTypes() + a := b.GetAssetTypes(false) for i := range a { pairs, err := b.FetchTradablePairs(a[i]) if err != nil { diff --git a/exchanges/coinbene/coinbene_wrapper.go b/exchanges/coinbene/coinbene_wrapper.go index 4ebcaaaa..9fdb2635 100644 --- a/exchanges/coinbene/coinbene_wrapper.go +++ b/exchanges/coinbene/coinbene_wrapper.go @@ -277,7 +277,7 @@ func (c *Coinbene) FetchTradablePairs(a asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them func (c *Coinbene) UpdateTradablePairs(forceUpdate bool) error { - assets := c.GetAssetTypes() + assets := c.GetAssetTypes(false) for x := range assets { pairs, err := c.FetchTradablePairs(assets[x]) if err != nil { diff --git a/exchanges/exchange.go b/exchanges/exchange.go index b4b2c82e..af81acd8 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -214,16 +214,17 @@ func (b *Base) GetLastPairsUpdateTime() int64 { return b.CurrencyPairs.LastUpdated } -// GetAssetTypes returns the available asset types for an individual exchange -func (b *Base) GetAssetTypes() asset.Items { - return b.CurrencyPairs.GetAssetTypes() +// GetAssetTypes returns the either the enabled or available asset types for an +// individual exchange +func (b *Base) GetAssetTypes(enabled bool) asset.Items { + return b.CurrencyPairs.GetAssetTypes(enabled) } // GetPairAssetType returns the associated asset type for the currency pair // This method is only useful for exchanges that have pair names with multiple delimiters (BTC-USD-0626) // Helpful if the exchange has only a single asset type but in that case the asset type can be hard coded func (b *Base) GetPairAssetType(c currency.Pair) (asset.Item, error) { - assetTypes := b.GetAssetTypes() + assetTypes := b.GetAssetTypes(false) for i := range assetTypes { avail, err := b.GetAvailablePairs(assetTypes[i]) if err != nil { @@ -271,7 +272,7 @@ func (b *Base) SetCurrencyPairFormat() { b.Config.CurrencyPairs.RequestFormat = nil } - assetTypes := b.GetAssetTypes() + assetTypes := b.GetAssetTypes(false) for x := range assetTypes { if _, err := b.Config.CurrencyPairs.Get(assetTypes[x]); err != nil { ps, err := b.CurrencyPairs.Get(assetTypes[x]) @@ -285,8 +286,8 @@ func (b *Base) SetCurrencyPairFormat() { // SetConfigPairs sets the exchanges currency pairs to the pairs set in the config func (b *Base) SetConfigPairs() error { - assetTypes := b.Config.CurrencyPairs.GetAssetTypes() - exchangeAssets := b.CurrencyPairs.GetAssetTypes() + assetTypes := b.Config.CurrencyPairs.GetAssetTypes(false) + exchangeAssets := b.CurrencyPairs.GetAssetTypes(false) for x := range assetTypes { if !exchangeAssets.Contains(assetTypes[x]) { log.Warnf(log.ExchangeSys, @@ -413,7 +414,7 @@ func (b *Base) GetEnabledPairs(a asset.Item) (currency.Pairs, error) { // GetRequestFormattedPairAndAssetType is a method that returns the enabled currency pair of // along with its asset type. Only use when there is no chance of the same name crossing over func (b *Base) GetRequestFormattedPairAndAssetType(p string) (currency.Pair, asset.Item, error) { - assetTypes := b.GetAssetTypes() + assetTypes := b.GetAssetTypes(false) var response currency.Pair for i := range assetTypes { format, err := b.GetPairFormat(assetTypes[i], true) diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index b0ac321c..6f417e4e 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -431,7 +431,7 @@ func TestGetAssetTypes(t *testing.T) { }, } - aT := testExchange.GetAssetTypes() + aT := testExchange.GetAssetTypes(false) if len(aT) != 3 { t.Error("TestGetAssetTypes failed") } diff --git a/exchanges/ftx/ftx_test.go b/exchanges/ftx/ftx_test.go index 21d76636..c314eca1 100644 --- a/exchanges/ftx/ftx_test.go +++ b/exchanges/ftx/ftx_test.go @@ -1358,13 +1358,16 @@ func TestAcceptOTCQuote(t *testing.T) { func TestGetHistoricTrades(t *testing.T) { t.Parallel() - assets := f.GetAssetTypes() + assets := f.GetAssetTypes(false) for i := range assets { enabledPairs, err := f.GetEnabledPairs(assets[i]) if err != nil { t.Fatal(err) } - _, err = f.GetHistoricTrades(enabledPairs.GetRandomPair(), assets[i], time.Now().Add(-time.Minute*15), time.Now()) + _, err = f.GetHistoricTrades(enabledPairs.GetRandomPair(), + assets[i], + time.Now().Add(-time.Minute*15), + time.Now()) if err != nil { t.Error(err) } @@ -1373,7 +1376,7 @@ func TestGetHistoricTrades(t *testing.T) { func TestGetRecentTrades(t *testing.T) { t.Parallel() - assets := f.GetAssetTypes() + assets := f.GetAssetTypes(false) for i := range assets { enabledPairs, err := f.GetEnabledPairs(assets[i]) if err != nil { diff --git a/exchanges/ftx/ftx_websocket.go b/exchanges/ftx/ftx_websocket.go index 4a10e541..93a70189 100644 --- a/exchanges/ftx/ftx_websocket.go +++ b/exchanges/ftx/ftx_websocket.go @@ -174,7 +174,7 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, erro Channel: wsMarkets, }) var channels = []string{wsTicker, wsTrades, wsOrderbook} - assets := f.GetAssetTypes() + assets := f.GetAssetTypes(true) for a := range assets { pairs, err := f.GetEnabledPairs(assets[a]) if err != nil { diff --git a/exchanges/ftx/ftx_wrapper.go b/exchanges/ftx/ftx_wrapper.go index 55f36694..4b1cdfb3 100644 --- a/exchanges/ftx/ftx_wrapper.go +++ b/exchanges/ftx/ftx_wrapper.go @@ -281,7 +281,7 @@ func (f *FTX) FetchTradablePairs(a asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (f *FTX) UpdateTradablePairs(forceUpdate bool) error { - assets := f.GetAssetTypes() + assets := f.GetAssetTypes(false) for x := range assets { pairs, err := f.FetchTradablePairs(assets[x]) if err != nil { @@ -396,25 +396,30 @@ func (f *FTX) UpdateOrderbook(p currency.Pair, assetType asset.Item) (*orderbook } // UpdateAccountInfo retrieves balances for all enabled currencies -func (f *FTX) UpdateAccountInfo(assetType asset.Item) (account.Holdings, error) { +func (f *FTX) UpdateAccountInfo(a asset.Item) (account.Holdings, error) { var resp account.Holdings - data, err := f.GetBalances() + // Get all wallet balances used so we can transfer between accounts if + // needed. + data, err := f.GetAllWalletBalances() if err != nil { return resp, err } - var acc account.SubAccount - for i := range data { - c := currency.NewCode(data[i].Coin) - hold := data[i].Total - data[i].Free - total := data[i].Total - acc.Currencies = append(acc.Currencies, - account.Balance{CurrencyName: c, - TotalValue: total, - Hold: hold}) - } - resp.Accounts = append(resp.Accounts, acc) - resp.Exchange = f.Name + for subName, balances := range data { + // "main" defines the main account in the sub account list + var acc = account.SubAccount{ID: subName, AssetType: a} + for x := range balances { + c := currency.NewCode(balances[x].Coin) + hold := balances[x].Total - balances[x].Free + acc.Currencies = append(acc.Currencies, + account.Balance{CurrencyName: c, + TotalValue: balances[x].Total, + Hold: hold}) + } + resp.Accounts = append(resp.Accounts, acc) + } + + resp.Exchange = f.Name err = account.Process(&resp) if err != nil { return account.Holdings{}, err diff --git a/exchanges/interfaces.go b/exchanges/interfaces.go index a956ba00..a98adf27 100644 --- a/exchanges/interfaces.go +++ b/exchanges/interfaces.go @@ -39,7 +39,7 @@ type IBotExchange interface { UpdateAccountInfo(a asset.Item) (account.Holdings, error) GetAuthenticatedAPISupport(endpoint uint8) bool SetPairs(pairs currency.Pairs, a asset.Item, enabled bool) error - GetAssetTypes() asset.Items + GetAssetTypes(enabled bool) asset.Items GetRecentTrades(p currency.Pair, a asset.Item) ([]trade.Data, error) GetHistoricTrades(p currency.Pair, a asset.Item, startTime, endTime time.Time) ([]trade.Data, error) SupportsAutoPairUpdates() bool diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index b8fa2259..a8b78333 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -388,7 +388,7 @@ func (k *Kraken) FetchTradablePairs(assetType asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (k *Kraken) UpdateTradablePairs(forceUpdate bool) error { - assets := k.GetAssetTypes() + assets := k.GetAssetTypes(false) for x := range assets { pairs, err := k.FetchTradablePairs(assets[x]) if err != nil { diff --git a/exchanges/okex/okex_wrapper.go b/exchanges/okex/okex_wrapper.go index 71ec57c4..69d9879b 100644 --- a/exchanges/okex/okex_wrapper.go +++ b/exchanges/okex/okex_wrapper.go @@ -361,7 +361,7 @@ func (o *OKEX) FetchTradablePairs(i asset.Item) ([]string, error) { // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (o *OKEX) UpdateTradablePairs(forceUpdate bool) error { - assets := o.CurrencyPairs.GetAssetTypes() + assets := o.CurrencyPairs.GetAssetTypes(false) for x := range assets { if assets[x] == asset.Index { // Update from futures diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index 8e1a161a..a5b11dfd 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -782,7 +782,7 @@ func (o *OKGroup) CalculateUpdateOrderbookChecksum(orderbookData *orderbook.Base // handled by ManageSubscriptions() func (o *OKGroup) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { var subscriptions []stream.ChannelSubscription - assets := o.GetAssetTypes() + assets := o.GetAssetTypes(true) for x := range assets { pairs, err := o.GetEnabledPairs(assets[x]) if err != nil { diff --git a/exchanges/orderbook/linked_list.go b/exchanges/orderbook/linked_list.go index d0050311..acfe4ff8 100644 --- a/exchanges/orderbook/linked_list.go +++ b/exchanges/orderbook/linked_list.go @@ -13,7 +13,7 @@ var errAmountCannotBeLessOrEqualToZero = errors.New("amount cannot be less or eq // to and from a stack. type linkedList struct { length int - head *node + head *Node } // comparison defines expected functionality to compare between two reference @@ -27,43 +27,43 @@ func (ll *linkedList) load(items Items, stack *stack) { // Tip sets up a pointer to a struct field variable pointer. This is used // so when a node is popped from the stack we can reference that current // nodes' struct 'next' field and set on next iteration without utilising - // assignment for example `prev.next = *node`. + // assignment for example `prev.Next = *node`. var tip = &ll.head // Prev denotes a place holder to node and all of its next references need // to be pushed back onto stack. - var prev *node + var prev *Node for i := range items { if *tip == nil { // Extend node chain *tip = stack.Pop() // Set current node prev to last node - (*tip).prev = prev + (*tip).Prev = prev ll.length++ } // Set item value - (*tip).value = items[i] + (*tip).Value = items[i] // Set previous to current node prev = *tip // Set tip to next node - tip = &(*tip).next + tip = &(*tip).Next } // Push has references to dangling nodes that need to be removed and pushed // back onto stack for re-use - var push *node + var push *Node // Cleave unused reference chain from main chain if prev == nil { // The entire chain will need to be pushed back on to stack push = *tip ll.head = nil } else { - push = prev.next - prev.next = nil + push = prev.Next + prev.Next = nil } // Push unused pointers back on stack for push != nil { - pending := push.next + pending := push.Next stack.Push(push, getNow()) ll.length-- push = pending @@ -74,16 +74,16 @@ func (ll *linkedList) load(items Items, stack *stack) { func (ll *linkedList) updateByID(updts []Item) error { updates: for x := range updts { - for tip := ll.head; tip != nil; tip = tip.next { - if updts[x].ID != tip.value.ID { // Filter IDs that don't match + for tip := ll.head; tip != nil; tip = tip.Next { + if updts[x].ID != tip.Value.ID { // Filter IDs that don't match continue } if updts[x].Price > 0 { // Only apply changes when zero values are not present, Bitmex // for example sends 0 price values. - tip.value.Price = updts[x].Price + tip.Value.Price = updts[x].Price } - tip.value.Amount = updts[x].Amount + tip.Value.Amount = updts[x].Amount continue updates } return fmt.Errorf("update error: %w %d not found", @@ -97,8 +97,8 @@ updates: func (ll *linkedList) deleteByID(updts Items, stack *stack, bypassErr bool) error { updates: for x := range updts { - for tip := &ll.head; *tip != nil; tip = &(*tip).next { - if updts[x].ID != (*tip).value.ID { + for tip := &ll.head; *tip != nil; tip = &(*tip).Next { + if updts[x].ID != (*tip).Value.ID { continue } stack.Push(deleteAtTip(ll, tip), getNow()) @@ -122,15 +122,15 @@ func (ll *linkedList) cleanup(maxChainLength int, stack *stack) { // cleaved after that update, new update might not applied correctly. n := ll.head for i := 0; i < maxChainLength; i++ { - if n.next == nil { + if n.Next == nil { return } - n = n.next + n = n.Next } // cleave reference to current node - if n.prev != nil { - n.prev.next = nil + if n.Prev != nil { + n.Prev.Next = nil } else { ll.head = nil } @@ -138,7 +138,7 @@ func (ll *linkedList) cleanup(maxChainLength int, stack *stack) { var pruned int for n != nil { pruned++ - pending := n.next + pending := n.Next stack.Push(n, getNow()) n = pending } @@ -147,9 +147,9 @@ func (ll *linkedList) cleanup(maxChainLength int, stack *stack) { // amount returns total depth liquidity and value func (ll *linkedList) amount() (liquidity, value float64) { - for tip := ll.head; tip != nil; tip = tip.next { - liquidity += tip.value.Amount - value += tip.value.Amount * tip.value.Price + for tip := ll.head; tip != nil; tip = tip.Next { + liquidity += tip.Value.Amount + value += tip.Value.Amount * tip.Value.Price } return } @@ -158,8 +158,8 @@ func (ll *linkedList) amount() (liquidity, value float64) { func (ll *linkedList) retrieve() Items { depth := make(Items, ll.length) iterator := 0 - for tip := ll.head; tip != nil; tip = tip.next { - depth[iterator] = tip.value + for tip := ll.head; tip != nil; tip = tip.Next { + depth[iterator] = tip.Value iterator++ } return depth @@ -169,21 +169,21 @@ func (ll *linkedList) retrieve() Items { // updates func (ll *linkedList) updateInsertByPrice(updts Items, stack *stack, maxChainLength int, compare func(float64, float64) bool, tn now) { for x := range updts { - for tip := &ll.head; ; tip = &(*tip).next { + for tip := &ll.head; ; tip = &(*tip).Next { if *tip == nil { insertHeadSpecific(ll, updts[x], stack) break } - if (*tip).value.Price == updts[x].Price { // Match check + if (*tip).Value.Price == updts[x].Price { // Match check if updts[x].Amount <= 0 { // Capture delete update stack.Push(deleteAtTip(ll, tip), tn) } else { // Amend current amount value - (*tip).value.Amount = updts[x].Amount + (*tip).Value.Amount = updts[x].Amount } break // Continue updates } - if compare((*tip).value.Price, updts[x].Price) { // Insert + if compare((*tip).Value.Price, updts[x].Price) { // Insert // This check below filters zero values and provides an // optimisation for when select exchanges send a delete update // to a non-existent price level (OTC/Hidden order) so we can @@ -194,7 +194,7 @@ func (ll *linkedList) updateInsertByPrice(updts Items, stack *stack, maxChainLen break // Continue updates } - if (*tip).next == nil { // Tip is at tail + if (*tip).Next == nil { // Tip is at tail // This check below is just a catch all in the event the above // zero value check fails if updts[x].Amount > 0 { @@ -229,14 +229,14 @@ updates: // from the stack and then pushing to the stack later for cleanup. // If the ID is not found we can pop from stack then insert into that // price level - var bookmark *node - for tip := ll.head; tip != nil; tip = tip.next { - if tip.value.ID == updts[x].ID { - if tip.value.Price != updts[x].Price { // Price level change - if tip.next == nil { + var bookmark *Node + for tip := ll.head; tip != nil; tip = tip.Next { + if tip.Value.ID == updts[x].ID { + if tip.Value.Price != updts[x].Price { // Price level change + if tip.Next == nil { // no movement needed just a re-adjustment - tip.value.Price = updts[x].Price - tip.value.Amount = updts[x].Amount + tip.Value.Price = updts[x].Price + tip.Value.Amount = updts[x].Amount continue updates } // bookmark tip to move this node to correct price level @@ -244,55 +244,55 @@ updates: continue // continue through node depth } // no price change, amend amount and continue update - tip.value.Amount = updts[x].Amount + tip.Value.Amount = updts[x].Amount continue updates // continue to next update } - if compare(tip.value.Price, updts[x].Price) { + if compare(tip.Value.Price, updts[x].Price) { if bookmark != nil { // shift bookmarked node to current tip - bookmark.value = updts[x] + bookmark.Value = updts[x] move(&ll.head, bookmark, tip) continue updates } // search for ID - for n := tip.next; n != nil; n = n.next { - if n.value.ID == updts[x].ID { - n.value = updts[x] + for n := tip.Next; n != nil; n = n.Next { + if n.Value.ID == updts[x].ID { + n.Value = updts[x] // inserting before the tip move(&ll.head, n, tip) continue updates } } // ID not matched in depth so add correct level for insert - if tip.next == nil { + if tip.Next == nil { n := stack.Pop() - n.value = updts[x] + n.Value = updts[x] ll.length++ - if tip.prev == nil { - tip.prev = n - n.next = tip + if tip.Prev == nil { + tip.Prev = n + n.Next = tip ll.head = n continue updates } - tip.prev.next = n - n.prev = tip.prev - tip.prev = n - n.next = tip + tip.Prev.Next = n + n.Prev = tip.Prev + tip.Prev = n + n.Next = tip continue updates } bookmark = tip break } - if tip.next == nil { + if tip.Next == nil { if shiftBookmark(tip, &bookmark, &ll.head, updts[x]) { continue updates } } } n := stack.Pop() - n.value = updts[x] + n.Value = updts[x] insertNodeAtBookmark(ll, bookmark, n) // Won't inline with stack } return nil @@ -301,38 +301,38 @@ updates: // insertUpdates inserts new updates for bids or asks based on price level func (ll *linkedList) insertUpdates(updts Items, stack *stack, comp comparison) error { for x := range updts { - var prev *node - for tip := &ll.head; ; tip = &(*tip).next { + var prev *Node + for tip := &ll.head; ; tip = &(*tip).Next { if *tip == nil { // Head n := stack.Pop() - n.value = updts[x] - n.prev = prev + n.Value = updts[x] + n.Prev = prev ll.length++ *tip = n break // Continue updates } - if (*tip).value.Price == updts[x].Price { // Price already found + if (*tip).Value.Price == updts[x].Price { // Price already found return fmt.Errorf("%w for price %f", errCollisionDetected, updts[x].Price) } - if comp((*tip).value.Price, updts[x].Price) { // Alignment + if comp((*tip).Value.Price, updts[x].Price) { // Alignment n := stack.Pop() - n.value = updts[x] - n.prev = prev + n.Value = updts[x] + n.Prev = prev ll.length++ // Reference current with new node - (*tip).prev = n + (*tip).Prev = n // Push tip to the right - n.next = *tip - // This is the same as prev.next = n + n.Next = *tip + // This is the same as prev.Next = n *tip = n break // Continue updates } - if (*tip).next == nil { // Tail + if (*tip).Next == nil { // Tail insertAtTail(ll, tip, updts[x], stack) break // Continue updates } @@ -399,80 +399,80 @@ func (ll *asks) insertUpdates(updts Items, stack *stack) error { // move moves a node from a point in a node chain to another node position, // this left justified towards head as element zero is the top of the depth // side. (can inline) -func move(head **node, from, to *node) { - if from.next != nil { // From is at tail - from.next.prev = from.prev +func move(head **Node, from, to *Node) { + if from.Next != nil { // From is at tail + from.Next.Prev = from.Prev } - if from.prev == nil { // From is at head - (*head).next.prev = nil - *head = (*head).next + if from.Prev == nil { // From is at head + (*head).Next.Prev = nil + *head = (*head).Next } else { - from.prev.next = from.next + from.Prev.Next = from.Next } // insert from node next to 'to' node - if to.prev == nil { // Destination is at head position + if to.Prev == nil { // Destination is at head position *head = from } else { - to.prev.next = from + to.Prev.Next = from } - from.prev = to.prev - to.prev = from - from.next = to + from.Prev = to.Prev + to.Prev = from + from.Next = to } // deleteAtTip removes a node from tip target returns old node (can inline) -func deleteAtTip(ll *linkedList, tip **node) *node { +func deleteAtTip(ll *linkedList, tip **Node) *Node { // Old is a placeholder for current tips node value to push // back on to the stack. old := *tip switch { - case old.prev == nil: // At head position + case old.Prev == nil: // At head position // shift current tip head to the right - *tip = old.next + *tip = old.Next // Remove reference to node from chain - if old.next != nil { // This is when liquidity hits zero - old.next.prev = nil + if old.Next != nil { // This is when liquidity hits zero + old.Next.Prev = nil } - case old.next == nil: // At tail position + case old.Next == nil: // At tail position // Remove reference to node from chain - old.prev.next = nil + old.Prev.Next = nil default: // Reference prior node in chain to next node in chain // bypassing current node - old.prev.next = old.next - old.next.prev = old.prev + old.Prev.Next = old.Next + old.Next.Prev = old.Prev } ll.length-- return old } // insertAtTip inserts at a tip target (can inline) -func insertAtTip(ll *linkedList, tip **node, updt Item, stack *stack) { +func insertAtTip(ll *linkedList, tip **Node, updt Item, stack *stack) { n := stack.Pop() - n.value = updt - n.next = *tip - n.prev = (*tip).prev - if (*tip).prev == nil { // Tip is at head + n.Value = updt + n.Next = *tip + n.Prev = (*tip).Prev + if (*tip).Prev == nil { // Tip is at head // Replace head which will push everything to the right // when this node will reference new node below *tip = n } else { // Reference new node to previous node - (*tip).prev.next = n + (*tip).Prev.Next = n } // Reference next node to new node - n.next.prev = n + n.Next.Prev = n ll.length++ } // insertAtTail inserts at tail end of node chain (can inline) -func insertAtTail(ll *linkedList, tip **node, updt Item, stack *stack) { +func insertAtTail(ll *linkedList, tip **Node, updt Item, stack *stack) { n := stack.Pop() - n.value = updt + n.Value = updt // Reference tip to new node - (*tip).next = n + (*tip).Next = n // Reference new node with current tip - n.prev = *tip + n.Prev = *tip ll.length++ } @@ -481,50 +481,50 @@ func insertAtTail(ll *linkedList, tip **node, updt Item, stack *stack) { // endpoint then it comes back online. (can inline) func insertHeadSpecific(ll *linkedList, updt Item, stack *stack) { n := stack.Pop() - n.value = updt + n.Value = updt ll.head = n ll.length++ } // insertNodeAtBookmark inserts a new node at a bookmarked node position // returns if a node needs to replace head (can inline) -func insertNodeAtBookmark(ll *linkedList, bookmark, n *node) { +func insertNodeAtBookmark(ll *linkedList, bookmark, n *Node) { switch { case bookmark == nil: // Zero liquidity and we are rebuilding from scratch ll.head = n - case bookmark.prev == nil: - n.prev = bookmark.prev - bookmark.prev = n - n.next = bookmark + case bookmark.Prev == nil: + n.Prev = bookmark.Prev + bookmark.Prev = n + n.Next = bookmark ll.head = n - case bookmark.next == nil: - n.prev = bookmark - bookmark.next = n + case bookmark.Next == nil: + n.Prev = bookmark + bookmark.Next = n default: - bookmark.prev.next = n - n.prev = bookmark.prev - bookmark.prev = n - n.next = bookmark + bookmark.Prev.Next = n + n.Prev = bookmark.Prev + bookmark.Prev = n + n.Next = bookmark } ll.length++ } // shiftBookmark moves a bookmarked node to the tip's next position or if nil, // sets tip as bookmark (can inline) -func shiftBookmark(tip *node, bookmark, head **node, updt Item) bool { +func shiftBookmark(tip *Node, bookmark, head **Node, updt Item) bool { if *bookmark == nil { // End of the chain and no bookmark set *bookmark = tip // Set tip to bookmark so we can set a new node there return false } - (*bookmark).value = updt - (*bookmark).next.prev = (*bookmark).prev - if (*bookmark).prev == nil { // Bookmark is at head - *head = (*bookmark).next + (*bookmark).Value = updt + (*bookmark).Next.Prev = (*bookmark).Prev + if (*bookmark).Prev == nil { // Bookmark is at head + *head = (*bookmark).Next } else { - (*bookmark).prev.next = (*bookmark).next + (*bookmark).Prev.Next = (*bookmark).Next } - tip.next = *bookmark - (*bookmark).prev = tip - (*bookmark).next = nil + tip.Next = *bookmark + (*bookmark).Prev = tip + (*bookmark).Next = nil return true } diff --git a/exchanges/orderbook/linked_list_test.go b/exchanges/orderbook/linked_list_test.go index 21d7df69..588548f9 100644 --- a/exchanges/orderbook/linked_list_test.go +++ b/exchanges/orderbook/linked_list_test.go @@ -32,7 +32,7 @@ var ask = Items{ // Display displays depth content for tests func (ll *linkedList) display() { - for tip := ll.head; tip != nil; tip = tip.next { + for tip := ll.head; tip != nil; tip = tip.Next { fmt.Printf("NODE: %+v %p \n", tip, tip) } fmt.Println() @@ -1261,23 +1261,23 @@ func Check(depth interface{}, liquidity, value float64, nodeCount int, t *testin return } - var tail *node + var tail *Node var price float64 - for tip := ll.head; ; tip = tip.next { + for tip := ll.head; ; tip = tip.Next { switch { case price == 0: - price = tip.value.Price - case isBid && price < tip.value.Price: + price = tip.Value.Price + case isBid && price < tip.Value.Price: ll.display() t.Fatal("Bid pricing out of order should be descending") - case isAsk && price > tip.value.Price: + case isAsk && price > tip.Value.Price: ll.display() t.Fatal("Ask pricing out of order should be ascending") default: - price = tip.value.Price + price = tip.Value.Price } - if tip.next == nil { + if tip.Next == nil { tail = tip break } @@ -1285,9 +1285,9 @@ func Check(depth interface{}, liquidity, value float64, nodeCount int, t *testin var liqReversed, valReversed float64 var nodeReversed int - for tip := tail; tip != nil; tip = tip.prev { - liqReversed += tip.value.Amount - valReversed += tip.value.Amount * tip.value.Price + for tip := tail; tip != nil; tip = tip.Prev { + liqReversed += tip.Value.Amount + valReversed += tip.Value.Amount * tip.Value.Price nodeReversed++ } @@ -1339,92 +1339,92 @@ func TestAmount(t *testing.T) { } func TestShiftBookmark(t *testing.T) { - bookmarkedNode := &node{ - value: Item{ + bookmarkedNode := &Node{ + Value: Item{ ID: 1337, Amount: 1, Price: 2, }, - next: nil, - prev: nil, + Next: nil, + Prev: nil, shelved: time.Time{}, } - originalBookmarkPrev := &node{ - value: Item{ + originalBookmarkPrev := &Node{ + Value: Item{ ID: 1336, }, - next: bookmarkedNode, - prev: nil, // At head + Next: bookmarkedNode, + Prev: nil, // At head shelved: time.Time{}, } - originalBookmarkNext := &node{ - value: Item{ + originalBookmarkNext := &Node{ + Value: Item{ ID: 1338, }, - next: nil, // This can be left nil in actuality this will be + Next: nil, // This can be left nil in actuality this will be // populated - prev: bookmarkedNode, + Prev: bookmarkedNode, shelved: time.Time{}, } // associate previous and next nodes to bookmarked node - bookmarkedNode.prev = originalBookmarkPrev - bookmarkedNode.next = originalBookmarkNext + bookmarkedNode.Prev = originalBookmarkPrev + bookmarkedNode.Next = originalBookmarkNext - tip := &node{ - value: Item{ + tip := &Node{ + Value: Item{ ID: 69420, }, - next: nil, // In this case tip will be at tail - prev: nil, + Next: nil, // In this case tip will be at tail + Prev: nil, shelved: time.Time{}, } - tipprev := &node{ - value: Item{ + tipprev := &Node{ + Value: Item{ ID: 69419, }, - next: tip, - prev: nil, // This can be left nil in actuality this will be + Next: tip, + Prev: nil, // This can be left nil in actuality this will be // populated shelved: time.Time{}, } // associate tips prev field with the correct prev node - tip.prev = tipprev + tip.Prev = tipprev if !shiftBookmark(tip, &bookmarkedNode, nil, Item{Amount: 1336, ID: 1337, Price: 9999}) { t.Fatal("There should be liquidity so we don't need to set tip to bookmark") } - if bookmarkedNode.value.Price != 9999 || - bookmarkedNode.value.Amount != 1336 || - bookmarkedNode.value.ID != 1337 { + if bookmarkedNode.Value.Price != 9999 || + bookmarkedNode.Value.Amount != 1336 || + bookmarkedNode.Value.ID != 1337 { t.Fatal("bookmarked details are not set correctly with shift") } - if bookmarkedNode.prev != tip { + if bookmarkedNode.Prev != tip { t.Fatal("bookmarked prev memory address does not point to tip") } - if bookmarkedNode.next != nil { + if bookmarkedNode.Next != nil { t.Fatal("bookmarked next is at tail and should be nil") } - if bookmarkedNode.next != nil { + if bookmarkedNode.Next != nil { t.Fatal("bookmarked next is at tail and should be nil") } - if originalBookmarkPrev.next != originalBookmarkNext { + if originalBookmarkPrev.Next != originalBookmarkNext { t.Fatal("original bookmarked prev node should be associated with original bookmarked next node") } - if originalBookmarkNext.prev != originalBookmarkPrev { + if originalBookmarkNext.Prev != originalBookmarkPrev { t.Fatal("original bookmarked next node should be associated with original bookmarked prev node") } - var nilBookmark *node + var nilBookmark *Node if shiftBookmark(tip, &nilBookmark, nil, Item{Amount: 1336, ID: 1337, Price: 9999}) { t.Fatal("there should not be a bookmarked node") @@ -1435,9 +1435,9 @@ func TestShiftBookmark(t *testing.T) { } head := bookmarkedNode - bookmarkedNode.prev = nil - bookmarkedNode.next = originalBookmarkNext - tip.next = nil + bookmarkedNode.Prev = nil + bookmarkedNode.Next = originalBookmarkNext + tip.Next = nil if !shiftBookmark(tip, &bookmarkedNode, &head, Item{Amount: 1336, ID: 1337, Price: 9999}) { t.Fatal("There should be liquidity so we don't need to set tip to bookmark") diff --git a/exchanges/orderbook/node.go b/exchanges/orderbook/node.go index c35b72bc..005ecce8 100644 --- a/exchanges/orderbook/node.go +++ b/exchanges/orderbook/node.go @@ -15,11 +15,11 @@ var ( defaultAllowance = time.Second * 30 ) -// node defines a linked list node for an orderbook item -type node struct { - value Item - next *node - prev *node +// Node defines a linked list node for an orderbook item +type Node struct { + Value Item + Next *Node + Prev *Node // Denotes time pushed to stack, this will influence cleanup routine when // there is a pause or minimal actions during period shelved time.Time @@ -27,7 +27,7 @@ type node struct { // stack defines a FILO list of reusable nodes type stack struct { - nodes []*node + nodes []*Node sema uint32 count int32 } @@ -51,7 +51,7 @@ func getNow() now { // Push pushes a node pointer into the stack to be reused the time is passed in // to allow for inlining which sets the time at which the node is theoretically // pushed to a stack. -func (s *stack) Push(n *node, tn now) { +func (s *stack) Push(n *Node, tn now) { if !atomic.CompareAndSwapUint32(&s.sema, neutral, active) { // Stack is in use, for now we can dereference pointer n = nil @@ -59,9 +59,9 @@ func (s *stack) Push(n *node, tn now) { } // Adds a time when its placed back on to stack. n.shelved = time.Time(tn) - n.next = nil - n.prev = nil - n.value = Item{} + n.Next = nil + n.Prev = nil + n.Value = Item{} // Allows for resize when overflow TODO: rethink this s.nodes = append(s.nodes[:s.count], n) @@ -71,17 +71,17 @@ func (s *stack) Push(n *node, tn now) { // Pop returns the last pointer off the stack and reduces the count and if empty // will produce a lovely fresh node -func (s *stack) Pop() *node { +func (s *stack) Pop() *Node { if !atomic.CompareAndSwapUint32(&s.sema, neutral, active) { // Stack is in use, for now we can allocate a new node pointer - return &node{} + return &Node{} } if s.count == 0 { // Create an empty node when no nodes are in slice or when cleaning // service is running atomic.StoreUint32(&s.sema, neutral) - return &node{} + return &Node{} } s.count-- n := s.nodes[s.count] diff --git a/exchanges/orderbook/node_test.go b/exchanges/orderbook/node_test.go index faebd86f..5baa7ed1 100644 --- a/exchanges/orderbook/node_test.go +++ b/exchanges/orderbook/node_test.go @@ -9,7 +9,7 @@ import ( func TestPushPop(t *testing.T) { s := newStack() - var nSlice [100]*node + var nSlice [100]*Node for i := 0; i < 100; i++ { nSlice[i] = s.Pop() } @@ -29,7 +29,7 @@ func TestPushPop(t *testing.T) { func TestCleaner(t *testing.T) { s := newStack() - var nSlice [100]*node + var nSlice [100]*Node for i := 0; i < 100; i++ { nSlice[i] = s.Pop() } @@ -64,20 +64,20 @@ func (s *stack) Display() { // 158 9,521,717 ns/op 9600104 B/op 100001 allocs/op func BenchmarkWithoutStack(b *testing.B) { - var n *node + var n *Node b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < 100000; j++ { - n = new(node) - n.value.Price = 1337 + n = new(Node) + n.Value.Price = 1337 } } } // 316 3,485,211 ns/op 1 B/op 0 allocs/op func BenchmarkWithStack(b *testing.B) { - var n *node + var n *Node stack := newStack() b.ReportAllocs() b.ResetTimer() @@ -85,7 +85,7 @@ func BenchmarkWithStack(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < 100000; j++ { n = stack.Pop() - n.value.Price = 1337 + n.Value.Price = 1337 stack.Push(n, tn) } } diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index d15768c0..c245df5d 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -124,6 +124,9 @@ func (s *Service) DeployDepth(exchange string, p currency.Pair, a asset.Item) (* book, ok := m3[p.Quote.Item] if !ok { book = newDepth(m1.ID) + book.exchange = exchange + book.pair = p + book.asset = a m3[p.Quote.Item] = book } return book, nil diff --git a/exchanges/orderbook/unsafe.go b/exchanges/orderbook/unsafe.go new file mode 100644 index 00000000..08545c0d --- /dev/null +++ b/exchanges/orderbook/unsafe.go @@ -0,0 +1,62 @@ +package orderbook + +import ( + "sync" + "time" +) + +// Unsafe is an exported linked list reference to the current bid/ask heads and +// a reference to the underlying depth mutex. This allows for the exposure of +// the internal list to an external strategy or subsystem. The bid and ask +// fields point to the actual head fields contained on both linked list structs, +// so that this struct can be reusable and not needed to be called on each +// inspection. +type Unsafe struct { + BidHead **Node + AskHead **Node + m *sync.Mutex + + // UpdatedViaREST defines if sync manager is updating this book via the REST + // protocol then this book is not considered live and cannot be trusted. + UpdatedViaREST *bool + LastUpdated *time.Time + *Alert +} + +// Lock locks down the underlying linked list which inhibits all pending updates +// for strategy inspection. +func (src *Unsafe) Lock() { + src.m.Lock() +} + +// Unlock unlocks the underlying linked list after inspection by a strategy to +// resume normal operations +func (src *Unsafe) Unlock() { + src.m.Unlock() +} + +// LockWith locks both books for the context of cross orderbook inspection. +// WARNING: When inspecting diametrically opposed books a higher order mutex +// MUST be used or a dead lock will occur. +func (src *Unsafe) LockWith(dst sync.Locker) { + src.m.Lock() + dst.Lock() +} + +// UnlockWith unlocks both books for the context of cross orderbook inspection +func (src *Unsafe) UnlockWith(dst sync.Locker) { + dst.Unlock() // Unlock in reverse order + src.m.Unlock() +} + +// GetUnsafe returns an unsafe orderbook with pointers to the linked list heads. +func (d *Depth) GetUnsafe() Unsafe { + return Unsafe{ + BidHead: &d.bids.linkedList.head, + AskHead: &d.asks.linkedList.head, + m: &d.m, + Alert: &d.Alert, + UpdatedViaREST: &d.options.restSnapshot, + LastUpdated: &d.options.lastUpdated, + } +} diff --git a/exchanges/orderbook/unsafe_test.go b/exchanges/orderbook/unsafe_test.go new file mode 100644 index 00000000..60abd9f4 --- /dev/null +++ b/exchanges/orderbook/unsafe_test.go @@ -0,0 +1,28 @@ +package orderbook + +import ( + "testing" + + "github.com/gofrs/uuid" +) + +var unsafeID, _ = uuid.NewV4() + +type externalBook struct{} + +func (e *externalBook) Lock() {} +func (e *externalBook) Unlock() {} + +func TestUnsafe(t *testing.T) { + d := newDepth(unsafeID) + ob := d.GetUnsafe() + if ob.AskHead == nil || ob.BidHead == nil || ob.m == nil { + t.Fatal("these items should not be nil") + } + + ob2 := &externalBook{} + ob.Lock() + ob.Unlock() // nolint:go-staticcheck // Not needed in test + ob.LockWith(ob2) + ob.UnlockWith(ob2) +} diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index e6fcf775..c45486e2 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -375,7 +375,6 @@ func TestWebsocket(t *testing.T) { t.Fatal("should not be connected to able to shut down") } - ws.verbose = true ws.setConnectedStatus(true) ws.Conn = &dodgyConnection{} err = ws.Shutdown() @@ -546,7 +545,6 @@ func TestConnectionMonitorNoConnection(t *testing.T) { ws.ShutdownC = make(chan struct{}, 1) ws.exchangeName = "hello" ws.trafficTimeout = 1 - ws.verbose = true ws.Wg = &sync.WaitGroup{} ws.connectionMonitor() if !ws.IsConnectionMonitorRunning() { diff --git a/exchanges/ticker/ticker.go b/exchanges/ticker/ticker.go index be37acd5..373fb704 100644 --- a/exchanges/ticker/ticker.go +++ b/exchanges/ticker/ticker.go @@ -12,6 +12,12 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" ) +var ( + errInvalidTicker = errors.New("invalid ticker") + errTickerNotFound = errors.New("ticker not found") + errExchangeNameIsEmpty = errors.New("exchange name is empty") +) + func init() { service = new(Service) service.Tickers = make(map[string]map[*currency.Item]map[*currency.Item]map[asset.Item]*Ticker) @@ -23,8 +29,8 @@ func init() { // stream new ticker updates func SubscribeTicker(exchange string, p currency.Pair, a asset.Item) (dispatch.Pipe, error) { exchange = strings.ToLower(exchange) - service.RLock() - defer service.RUnlock() + service.Lock() + defer service.Unlock() tick, ok := service.Tickers[exchange][p.Base.Item][p.Quote.Item][a] if !ok { @@ -39,8 +45,8 @@ func SubscribeTicker(exchange string, p currency.Pair, a asset.Item) (dispatch.P // SubscribeToExchangeTickers subcribes to all tickers on an exchange func SubscribeToExchangeTickers(exchange string) (dispatch.Pipe, error) { exchange = strings.ToLower(exchange) - service.RLock() - defer service.RUnlock() + service.Lock() + defer service.Unlock() id, ok := service.Exchange[exchange] if !ok { return dispatch.Pipe{}, fmt.Errorf("%s exchange tickers not found", @@ -51,108 +57,137 @@ func SubscribeToExchangeTickers(exchange string) (dispatch.Pipe, error) { } // GetTicker checks and returns a requested ticker if it exists -func GetTicker(exchange string, p currency.Pair, tickerType asset.Item) (*Price, error) { +func GetTicker(exchange string, p currency.Pair, a asset.Item) (*Price, error) { exchange = strings.ToLower(exchange) - service.RLock() - defer service.RUnlock() - if service.Tickers[exchange] == nil { + service.Lock() + defer service.Unlock() + m1, ok := service.Tickers[exchange] + if !ok { return nil, fmt.Errorf("no tickers for %s exchange", exchange) } - if service.Tickers[exchange][p.Base.Item] == nil { + m2, ok := m1[p.Base.Item] + if !ok { return nil, fmt.Errorf("no tickers associated with base currency %s", p.Base) } - if service.Tickers[exchange][p.Base.Item][p.Quote.Item] == nil { + m3, ok := m2[p.Quote.Item] + if !ok { return nil, fmt.Errorf("no tickers associated with quote currency %s", p.Quote) } - if service.Tickers[exchange][p.Base.Item][p.Quote.Item][tickerType] == nil { + t, ok := m3[a] + if !ok { return nil, fmt.Errorf("no tickers associated with asset type %s", - tickerType) + a) } - return &service.Tickers[exchange][p.Base.Item][p.Quote.Item][tickerType].Price, nil + cpy := t.Price // Don't let external functions have access to underlying + return &cpy, nil +} + +// FindLast searches for a currency pair and returns the first available +func FindLast(p currency.Pair, a asset.Item) (float64, error) { + service.Lock() + defer service.Unlock() + for _, m1 := range service.Tickers { + m2, ok := m1[p.Base.Item] + if !ok { + continue + } + m3, ok := m2[p.Quote.Item] + if !ok { + continue + } + t, ok := m3[a] + if !ok { + continue + } + + if t.Last == 0 { + return 0, errInvalidTicker + } + return t.Last, nil + } + return 0, fmt.Errorf("%w %s %s", errTickerNotFound, p, a) } // ProcessTicker processes incoming tickers, creating or updating the Tickers // list -func ProcessTicker(tickerNew *Price) error { - if tickerNew.ExchangeName == "" { - return fmt.Errorf(ErrExchangeNameUnset) - } - - if tickerNew.Pair.IsEmpty() { - return fmt.Errorf("%s %s", tickerNew.ExchangeName, errPairNotSet) - } - - if tickerNew.AssetType == "" { - return fmt.Errorf("%s %s %s", - tickerNew.ExchangeName, - tickerNew.Pair, - errAssetTypeNotSet) - } - - if tickerNew.LastUpdated.IsZero() { - tickerNew.LastUpdated = time.Now() - } - - return service.Update(tickerNew) -} - -// Update updates ticker price -func (s *Service) Update(p *Price) error { - name := strings.ToLower(p.ExchangeName) - s.Lock() - - ticker, ok := s.Tickers[name][p.Pair.Base.Item][p.Pair.Quote.Item][p.AssetType] - if ok { - ticker.Last = p.Last - ticker.High = p.High - ticker.Low = p.Low - ticker.Bid = p.Bid - ticker.Ask = p.Ask - ticker.Volume = p.Volume - ticker.QuoteVolume = p.QuoteVolume - ticker.PriceATH = p.PriceATH - ticker.Open = p.Open - ticker.Close = p.Close - ticker.LastUpdated = p.LastUpdated - ids := append(ticker.Assoc, ticker.Main) - s.Unlock() - return s.mux.Publish(ids, p) - } - - switch { - case s.Tickers[name] == nil: - s.Tickers[name] = make(map[*currency.Item]map[*currency.Item]map[asset.Item]*Ticker) - fallthrough - case s.Tickers[name][p.Pair.Base.Item] == nil: - s.Tickers[name][p.Pair.Base.Item] = make(map[*currency.Item]map[asset.Item]*Ticker) - fallthrough - case s.Tickers[name][p.Pair.Base.Item][p.Pair.Quote.Item] == nil: - s.Tickers[name][p.Pair.Base.Item][p.Pair.Quote.Item] = make(map[asset.Item]*Ticker) - } - - err := s.SetItemID(p, name) - if err != nil { - s.Unlock() - return err - } - - s.Unlock() - return nil -} - -// SetItemID retrieves and sets dispatch mux publish IDs -func (s *Service) SetItemID(p *Price, fmtName string) error { +func ProcessTicker(p *Price) error { if p == nil { return errors.New(errTickerPriceIsNil) } - ids, err := s.GetAssociations(p, fmtName) + if p.ExchangeName == "" { + return fmt.Errorf(ErrExchangeNameUnset) + } + + if p.Pair.IsEmpty() { + return fmt.Errorf("%s %s", p.ExchangeName, errPairNotSet) + } + + if p.AssetType == "" { + return fmt.Errorf("%s %s %s", + p.ExchangeName, + p.Pair, + errAssetTypeNotSet) + } + + if p.LastUpdated.IsZero() { + p.LastUpdated = time.Now() + } + + return service.update(p) +} + +// update updates ticker price +func (s *Service) update(p *Price) error { + name := strings.ToLower(p.ExchangeName) + s.Lock() + + m1, ok := service.Tickers[name] + if !ok { + m1 = make(map[*currency.Item]map[*currency.Item]map[asset.Item]*Ticker) + service.Tickers[name] = m1 + } + + m2, ok := m1[p.Pair.Base.Item] + if !ok { + m2 = make(map[*currency.Item]map[asset.Item]*Ticker) + m1[p.Pair.Base.Item] = m2 + } + + m3, ok := m2[p.Pair.Quote.Item] + if !ok { + m3 = make(map[asset.Item]*Ticker) + m2[p.Pair.Quote.Item] = m3 + } + + t, ok := m3[p.AssetType] + if !ok || t == nil { + newTicker := &Ticker{} + err := s.setItemID(newTicker, p, name) + if err != nil { + s.Unlock() + return err + } + m3[p.AssetType] = newTicker + s.Unlock() + return nil + } + + t.Price = *p + ids := append(t.Assoc, t.Main) + s.Unlock() + return s.mux.Publish(ids, p) +} + +// setItemID retrieves and sets dispatch mux publish IDs +func (s *Service) setItemID(t *Ticker, p *Price, exch string) error { + ids, err := s.getAssociations(exch) if err != nil { return err } @@ -161,28 +196,27 @@ func (s *Service) SetItemID(p *Price, fmtName string) error { return err } - s.Tickers[fmtName][p.Pair.Base.Item][p.Pair.Quote.Item][p.AssetType] = &Ticker{Price: *p, - Main: singleID, - Assoc: ids} + t.Price = *p + t.Main = singleID + t.Assoc = ids return nil } -// GetAssociations links a singular book with it's dispatch associations -func (s *Service) GetAssociations(p *Price, fmtName string) ([]uuid.UUID, error) { - if p == nil || *p == (Price{}) { - return nil, errors.New(errTickerPriceIsNil) +// getAssociations links a singular book with it's dispatch associations +func (s *Service) getAssociations(exch string) ([]uuid.UUID, error) { + if exch == "" { + return nil, errExchangeNameIsEmpty } var ids []uuid.UUID - exchangeID, ok := s.Exchange[fmtName] + exchangeID, ok := s.Exchange[exch] if !ok { var err error exchangeID, err = s.mux.GetID() if err != nil { return nil, err } - s.Exchange[fmtName] = exchangeID + s.Exchange[exch] = exchangeID } - ids = append(ids, exchangeID) return ids, nil } diff --git a/exchanges/ticker/ticker_test.go b/exchanges/ticker/ticker_test.go index 3cd5eb3b..311abca1 100644 --- a/exchanges/ticker/ticker_test.go +++ b/exchanges/ticker/ticker_test.go @@ -1,6 +1,7 @@ package ticker import ( + "errors" "log" "math/rand" "os" @@ -80,7 +81,7 @@ func TestSubscribeTicker(t *testing.T) { ExchangeName: "subscribetest", AssetType: asset.Spot}) if err != nil { - t.Error("error cannot be nil") + t.Fatal(err) } _, err = SubscribeTicker("subscribetest", p, asset.Spot) @@ -198,6 +199,38 @@ func TestGetTicker(t *testing.T) { } } +func TestFindLast(t *testing.T) { + cp := currency.NewPair(currency.BTC, currency.XRP) + _, err := FindLast(cp, asset.Spot) + if !errors.Is(err, errTickerNotFound) { + t.Errorf("received: %v but expected: %v", err, errTickerNotFound) + } + + err = service.update(&Price{Last: 0, ExchangeName: "testerinos", Pair: cp, AssetType: asset.Spot}) + if err != nil { + t.Fatal(err) + } + + _, err = FindLast(cp, asset.Spot) + if !errors.Is(err, errInvalidTicker) { + t.Errorf("received: %v but expected: %v", err, errInvalidTicker) + } + + err = service.update(&Price{Last: 1337, ExchangeName: "testerinos", Pair: cp, AssetType: asset.Spot}) + if err != nil { + t.Fatal(err) + } + + last, err := FindLast(cp, asset.Spot) + if !errors.Is(err, nil) { + t.Errorf("received: %v but expected: %v", err, nil) + } + + if last != 1337 { + t.Fatal("unexpected value") + } +} + func TestProcessTicker(t *testing.T) { // non-appending function to tickers exchName := "bitstamp" newPair, err := currency.NewPairFromStrings("BTC", "USD") @@ -368,39 +401,15 @@ func TestProcessTicker(t *testing.T) { // non-appending function to tickers wg.Wait() } -func TestSetItemID(t *testing.T) { - err := service.SetItemID(nil, "") - if err == nil { - t.Error("error cannot be nil") - } - - err = service.SetItemID(&Price{}, "") - if err == nil { - t.Error("error cannot be nil") - } - - p := currency.NewPair(currency.CYC, currency.CYG) - - service.mux = nil - err = service.SetItemID(&Price{Pair: p, ExchangeName: "SetItemID"}, "setitemid") - if err == nil { - t.Error("error cannot be nil") - } - - service.mux = cpyMux -} - func TestGetAssociation(t *testing.T) { - _, err := service.GetAssociations(nil, "") - if err == nil { - t.Error("error cannot be nil") + _, err := service.getAssociations("") + if !errors.Is(err, errExchangeNameIsEmpty) { + t.Errorf("received: %v but expected: %v", err, errExchangeNameIsEmpty) } - p := currency.NewPair(currency.CYC, currency.CYG) - service.mux = nil - _, err = service.GetAssociations(&Price{Pair: p, ExchangeName: "GetAssociation"}, "getassociation") + _, err = service.getAssociations("getassociation") if err == nil { t.Error("error cannot be nil") } diff --git a/exchanges/ticker/ticker_types.go b/exchanges/ticker/ticker_types.go index ce30358e..78354727 100644 --- a/exchanges/ticker/ticker_types.go +++ b/exchanges/ticker/ticker_types.go @@ -28,7 +28,7 @@ type Service struct { Tickers map[string]map[*currency.Item]map[*currency.Item]map[asset.Item]*Ticker Exchange map[string]uuid.UUID mux *dispatch.Mux - sync.RWMutex + sync.Mutex } // Price struct stores the currency pair and pricing information diff --git a/gctscript/wrappers/gct/exchange/exchange_test.go b/gctscript/wrappers/gct/exchange/exchange_test.go index 4060d377..fa2872f6 100644 --- a/gctscript/wrappers/gct/exchange/exchange_test.go +++ b/gctscript/wrappers/gct/exchange/exchange_test.go @@ -209,7 +209,7 @@ func setupEngine() (err error) { em := engine.SetupExchangeManager() engine.Bot.ExchangeManager = em - return engine.Bot.LoadExchange(exchName, false, nil) + return engine.Bot.LoadExchange(exchName, nil) } func cleanup() { diff --git a/log/logger.go b/log/logger.go index 66ffaa95..93f7fa32 100644 --- a/log/logger.go +++ b/log/logger.go @@ -4,9 +4,28 @@ import ( "errors" "fmt" "io" + "strings" "time" ) +var ( + errEmptyLoggerName = errors.New("cannot have empty logger name") + errSubLoggerAlreadyregistered = errors.New("sub logger already registered") +) + +// NewSubLogger allows for a new sub logger to be registered. +func NewSubLogger(name string) (*SubLogger, error) { + if name == "" { + return nil, errEmptyLoggerName + } + name = strings.ToUpper(name) + _, ok := subLoggers[name] + if ok { + return nil, errSubLoggerAlreadyregistered + } + return registerNewSubLogger(name), nil +} + func newLogger(c *Config) *Logger { return &Logger{ Timestamp: c.AdvancedSettings.TimeStampFormat, @@ -53,7 +72,7 @@ func CloseLogger() error { return GlobalLogFile.Close() } -func validSubLogger(s string) (bool, *subLogger) { +func validSubLogger(s string) (bool, *SubLogger) { if v, found := subLoggers[s]; found { return true, v } diff --git a/log/logger_setup.go b/log/logger_setup.go index 2503a31f..2a53574d 100644 --- a/log/logger_setup.go +++ b/log/logger_setup.go @@ -122,8 +122,8 @@ func splitLevel(level string) (l Levels) { return } -func registerNewSubLogger(logger string) *subLogger { - temp := subLogger{ +func registerNewSubLogger(logger string) *SubLogger { + temp := SubLogger{ name: strings.ToUpper(logger), output: os.Stdout, } diff --git a/log/logger_test.go b/log/logger_test.go index 4ebef873..22b0299a 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -2,6 +2,7 @@ package log import ( "bytes" + "errors" "io/ioutil" "os" "strings" @@ -224,7 +225,7 @@ func TestNewLogEvent(t *testing.T) { func TestInfo(t *testing.T) { w := &bytes.Buffer{} - tempSL := subLogger{ + tempSL := SubLogger{ "TESTYMCTESTALOT", splitLevel("INFO|WARN|DEBUG|ERROR"), w, @@ -262,3 +263,22 @@ func TestSubLoggerName(t *testing.T) { t.Error("Unexpected SUBLOGGER in output") } } + +func TestNewSubLogger(t *testing.T) { + _, err := NewSubLogger("") + if !errors.Is(err, errEmptyLoggerName) { + t.Fatalf("received: %v but expected: %v", err, errEmptyLoggerName) + } + + sl, err := NewSubLogger("TESTERINOS") + if !errors.Is(err, nil) { + t.Fatalf("received: %v but expected: %v", err, nil) + } + + Debug(sl, "testerinos") + + _, err = NewSubLogger("TESTERINOS") + if !errors.Is(err, errSubLoggerAlreadyregistered) { + t.Fatalf("received: %v but expected: %v", err, errSubLoggerAlreadyregistered) + } +} diff --git a/log/logger_types.go b/log/logger_types.go index c18664aa..2a671a86 100644 --- a/log/logger_types.go +++ b/log/logger_types.go @@ -85,7 +85,9 @@ type Levels struct { Info, Debug, Warn, Error bool } -type subLogger struct { +// SubLogger defines a sub logger can be used externally for packages wanted to +// leverage GCT library logger features. +type SubLogger struct { name string Levels output io.Writer diff --git a/log/loggers.go b/log/loggers.go index fcb356d1..22a40fa7 100644 --- a/log/loggers.go +++ b/log/loggers.go @@ -6,7 +6,7 @@ import ( ) // Info takes a pointer subLogger struct and string sends to newLogEvent -func Info(sl *subLogger, data string) { +func Info(sl *SubLogger, data string) { fields := getFields(sl) if fields == nil { return @@ -19,7 +19,7 @@ func Info(sl *subLogger, data string) { } // Infoln takes a pointer subLogger struct and interface sends to newLogEvent -func Infoln(sl *subLogger, v ...interface{}) { +func Infoln(sl *SubLogger, v ...interface{}) { fields := getFields(sl) if fields == nil { return @@ -32,12 +32,12 @@ func Infoln(sl *subLogger, v ...interface{}) { } // Infof takes a pointer subLogger struct, string & interface formats and sends to Info() -func Infof(sl *subLogger, data string, v ...interface{}) { +func Infof(sl *SubLogger, data string, v ...interface{}) { Info(sl, fmt.Sprintf(data, v...)) } // Debug takes a pointer subLogger struct and string sends to multiwriter -func Debug(sl *subLogger, data string) { +func Debug(sl *SubLogger, data string) { fields := getFields(sl) if fields == nil { return @@ -50,7 +50,7 @@ func Debug(sl *subLogger, data string) { } // Debugln takes a pointer subLogger struct, string and interface sends to newLogEvent -func Debugln(sl *subLogger, v ...interface{}) { +func Debugln(sl *SubLogger, v ...interface{}) { fields := getFields(sl) if fields == nil { return @@ -63,12 +63,12 @@ func Debugln(sl *subLogger, v ...interface{}) { } // Debugf takes a pointer subLogger struct, string & interface formats and sends to Info() -func Debugf(sl *subLogger, data string, v ...interface{}) { +func Debugf(sl *SubLogger, data string, v ...interface{}) { Debug(sl, fmt.Sprintf(data, v...)) } // Warn takes a pointer subLogger struct & string and sends to newLogEvent() -func Warn(sl *subLogger, data string) { +func Warn(sl *SubLogger, data string) { fields := getFields(sl) if fields == nil { return @@ -81,7 +81,7 @@ func Warn(sl *subLogger, data string) { } // Warnln takes a pointer subLogger struct & interface formats and sends to newLogEvent() -func Warnln(sl *subLogger, v ...interface{}) { +func Warnln(sl *SubLogger, v ...interface{}) { fields := getFields(sl) if fields == nil { return @@ -94,12 +94,12 @@ func Warnln(sl *subLogger, v ...interface{}) { } // Warnf takes a pointer subLogger struct, string & interface formats and sends to Warn() -func Warnf(sl *subLogger, data string, v ...interface{}) { +func Warnf(sl *SubLogger, data string, v ...interface{}) { Warn(sl, fmt.Sprintf(data, v...)) } // Error takes a pointer subLogger struct & interface formats and sends to newLogEvent() -func Error(sl *subLogger, data ...interface{}) { +func Error(sl *SubLogger, data ...interface{}) { fields := getFields(sl) if fields == nil { return @@ -112,7 +112,7 @@ func Error(sl *subLogger, data ...interface{}) { } // Errorln takes a pointer subLogger struct, string & interface formats and sends to newLogEvent() -func Errorln(sl *subLogger, v ...interface{}) { +func Errorln(sl *SubLogger, v ...interface{}) { fields := getFields(sl) if fields == nil { return @@ -125,7 +125,7 @@ func Errorln(sl *subLogger, v ...interface{}) { } // Errorf takes a pointer subLogger struct, string & interface formats and sends to Debug() -func Errorf(sl *subLogger, data string, v ...interface{}) { +func Errorf(sl *SubLogger, data string, v ...interface{}) { Error(sl, fmt.Sprintf(data, v...)) } @@ -147,7 +147,7 @@ func enabled() bool { return false } -func getFields(sl *subLogger) *logFields { +func getFields(sl *SubLogger) *logFields { if !enabled() { return nil } diff --git a/log/sublogger_types.go b/log/sublogger_types.go index 90611ea8..52e6e57f 100644 --- a/log/sublogger_types.go +++ b/log/sublogger_types.go @@ -4,33 +4,33 @@ import "io" // Global vars related to the logger package var ( - subLoggers = map[string]*subLogger{} + subLoggers = map[string]*SubLogger{} - Global *subLogger - BackTester *subLogger - ConnectionMgr *subLogger - CommunicationMgr *subLogger - APIServerMgr *subLogger - ConfigMgr *subLogger - DatabaseMgr *subLogger - DataHistory *subLogger - GCTScriptMgr *subLogger - OrderMgr *subLogger - PortfolioMgr *subLogger - SyncMgr *subLogger - TimeMgr *subLogger - WebsocketMgr *subLogger - EventMgr *subLogger - DispatchMgr *subLogger + Global *SubLogger + BackTester *SubLogger + ConnectionMgr *SubLogger + CommunicationMgr *SubLogger + APIServerMgr *SubLogger + ConfigMgr *SubLogger + DatabaseMgr *SubLogger + DataHistory *SubLogger + GCTScriptMgr *SubLogger + OrderMgr *SubLogger + PortfolioMgr *SubLogger + SyncMgr *SubLogger + TimeMgr *SubLogger + WebsocketMgr *SubLogger + EventMgr *SubLogger + DispatchMgr *SubLogger - RequestSys *subLogger - ExchangeSys *subLogger - GRPCSys *subLogger - RESTSys *subLogger + RequestSys *SubLogger + ExchangeSys *SubLogger + GRPCSys *SubLogger + RESTSys *SubLogger - Ticker *subLogger - OrderBook *subLogger - Trade *subLogger + Ticker *SubLogger + OrderBook *SubLogger + Trade *SubLogger ) // logFields is used to store data in a non-global and thread-safe manner