From d23898e63ae04f9f06d94273f33f39cc416f5715 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 5 Apr 2023 13:07:35 +1000 Subject: [PATCH] engine: Adds shutdown method to exchange manager and unload all exchanges when engine is stopped (#1112) * engine: shutdown and unload exchange when engine is stopped * linter: fixes * engine/exchMan: add nil check * engine/exchanges: add shutdown method to exchanges, rm len check lock not needed, expanded code coverage, address some nits * exchMan: report all failed shutdowns across exchanges, implement timer and monitoring routines. * exchMan: improve shutdown sequence and aloc. * further improvement * exchman: log from warn to error * websockconnection: Suppress error return when closure is caused by library * linter: fix * fix racies * add note on why not parallel tests * glorious: nits * spelling kween * thrasher: nits * engine: change print of setting using reflection, I keep forgetting to implement this so program around forgetfulness * engine/exchange_management: remove wait group and just rely on intermediary lock * glorious: nits * Update common/common.go Co-authored-by: Adrian Gallagher * Update main.go Co-authored-by: Adrian Gallagher --------- Co-authored-by: Ryan O'Hara-Reid Co-authored-by: Adrian Gallagher --- backtester/data/kline/api/api_test.go | 4 +- backtester/data/kline/live/live_test.go | 4 +- backtester/engine/backtest_test.go | 16 +- backtester/engine/live_test.go | 2 +- backtester/engine/setup.go | 16 +- .../eventhandlers/exchange/exchange_test.go | 21 ++- backtester/funding/funding_test.go | 21 ++- .../trackingcurrencies_test.go | 7 +- cmd/exchange_wrapper_coverage/main.go | 8 +- cmd/exchange_wrapper_issues/main.go | 10 +- common/common.go | 32 ++++ common/common_test.go | 31 ++++ engine/apiserver_test.go | 21 ++- engine/datahistory_manager_test.go | 48 +++--- engine/engine.go | 108 +++++------- engine/engine_test.go | 39 +++-- engine/engine_types.go | 68 +++++--- engine/event_manager_test.go | 28 +++- engine/exchange_manager.go | 156 +++++++++++++----- engine/exchange_manager_test.go | 153 +++++++++++++++-- engine/helpers_test.go | 9 +- engine/order_manager_test.go | 55 +++--- engine/portfolio_manager_test.go | 15 +- engine/rpcserver_test.go | 138 +++++++++++----- engine/sync_manager_test.go | 28 +++- engine/websocketroutine_manager_test.go | 24 +-- engine/withdraw_manager_test.go | 7 +- exchanges/binanceus/binanceus_websocket.go | 7 +- exchanges/exchange.go | 12 ++ exchanges/interfaces.go | 1 + exchanges/stream/websocket.go | 8 +- exchanges/stream/websocket_connection.go | 36 ++-- .../wrappers/gct/exchange/exchange_test.go | 4 +- gctscript/wrappers/gct/gctwrapper_test.go | 17 +- main.go | 5 +- 35 files changed, 803 insertions(+), 356 deletions(-) diff --git a/backtester/data/kline/api/api_test.go b/backtester/data/kline/api/api_test.go index e8d2ff39..68ac3be7 100644 --- a/backtester/data/kline/api/api_test.go +++ b/backtester/data/kline/api/api_test.go @@ -18,7 +18,7 @@ const testExchange = "binanceus" func TestLoadCandles(t *testing.T) { t.Parallel() - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -56,7 +56,7 @@ func TestLoadCandles(t *testing.T) { func TestLoadTrades(t *testing.T) { t.Parallel() - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) diff --git a/backtester/data/kline/live/live_test.go b/backtester/data/kline/live/live_test.go index 772be680..6938f5a0 100644 --- a/backtester/data/kline/live/live_test.go +++ b/backtester/data/kline/live/live_test.go @@ -21,7 +21,7 @@ func TestLoadCandles(t *testing.T) { interval := gctkline.OneHour cp := currency.NewPair(currency.BTC, currency.USDT) a := asset.Spot - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -56,7 +56,7 @@ func TestLoadTrades(t *testing.T) { interval := gctkline.OneMin cp := currency.NewPair(currency.BTC, currency.USDT) a := asset.Spot - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) diff --git a/backtester/engine/backtest_test.go b/backtester/engine/backtest_test.go index b8bf3554..871d7087 100644 --- a/backtester/engine/backtest_test.go +++ b/backtester/engine/backtest_test.go @@ -331,7 +331,7 @@ func TestLoadDataLive(t *testing.T) { Funding: &funding.FundManager{}, DataHolder: &data.HandlerHolder{}, Statistic: &fakeStats{}, - exchangeManager: engine.SetupExchangeManager(), + exchangeManager: engine.NewExchangeManager(), shutdown: make(chan struct{}), } @@ -1103,13 +1103,16 @@ func TestProcessFillEvent(t *testing.T) { ev := &fill.Fill{ Base: de.Base, } - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } b, err := funding.CreateItem(testExchange, a, cp.Base, decimal.Zero, decimal.Zero) if !errors.Is(err, nil) { t.Errorf("received '%v' expected '%v'", err, nil) @@ -1210,13 +1213,16 @@ func TestProcessFuturesFillEvent(t *testing.T) { ev := &fill.Fill{ Base: de.Base, } - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } b, err := funding.CreateItem(testExchange, a, cp.Base, decimal.Zero, decimal.Zero) if !errors.Is(err, expectedError) { t.Errorf("received '%v' expected '%v'", err, expectedError) diff --git a/backtester/engine/live_test.go b/backtester/engine/live_test.go index 8d63b884..32fd2885 100644 --- a/backtester/engine/live_test.go +++ b/backtester/engine/live_test.go @@ -33,7 +33,7 @@ func TestSetupLiveDataHandler(t *testing.T) { t.Errorf("received '%v' expected '%v'", err, gctcommon.ErrNilPointer) } - bt.exchangeManager = engine.SetupExchangeManager() + bt.exchangeManager = engine.NewExchangeManager() err = bt.SetupLiveDataHandler(-1, -1, false, false) if !errors.Is(err, gctcommon.ErrNilPointer) { t.Errorf("received '%v' expected '%v'", err, gctcommon.ErrNilPointer) diff --git a/backtester/engine/setup.go b/backtester/engine/setup.go index ae992183..73420981 100644 --- a/backtester/engine/setup.go +++ b/backtester/engine/setup.go @@ -56,7 +56,7 @@ func NewBacktester() (*BackTest, error) { if err != nil { return nil, err } - bt.exchangeManager = engine.SetupExchangeManager() + bt.exchangeManager = engine.NewExchangeManager() return bt, nil } @@ -166,7 +166,10 @@ func (bt *BackTest) SetupFromConfig(cfg *config.Config, templatePath, output str return err } } - bt.exchangeManager.Add(exch) + err = bt.exchangeManager.Add(exch) + if err != nil { + return err + } } else { return err } @@ -824,9 +827,14 @@ func (bt *BackTest) loadData(cfg *config.Config, exch gctexchange.IBotExchange, } case cfg.DataSettings.LiveData != nil: if !b.Features.Enabled.Kline.Intervals.ExchangeSupported(cfg.DataSettings.Interval) { - return nil, fmt.Errorf("%w don't trade live on custom candle interval of %v", gctkline.ErrCannotConstructInterval, cfg.DataSettings.Interval) + return nil, fmt.Errorf("%w don't trade live on custom candle interval of %v", + gctkline.ErrCannotConstructInterval, + cfg.DataSettings.Interval) + } + err = bt.exchangeManager.Add(exch) + if err != nil { + return nil, err } - bt.exchangeManager.Add(exch) err = bt.LiveDataHandler.AppendDataSource(&liveDataSourceSetup{ exchange: exch, interval: cfg.DataSettings.Interval, diff --git a/backtester/eventhandlers/exchange/exchange_test.go b/backtester/eventhandlers/exchange/exchange_test.go index fe000fc4..3e514dea 100644 --- a/backtester/eventhandlers/exchange/exchange_test.go +++ b/backtester/eventhandlers/exchange/exchange_test.go @@ -184,7 +184,7 @@ func TestPlaceOrder(t *testing.T) { t.Parallel() bot := &engine.Engine{} var err error - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -192,7 +192,10 @@ func TestPlaceOrder(t *testing.T) { exch.SetDefaults() exchB := exch.GetBase() exchB.States = currencystate.NewCurrencyStates() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } bot.ExchangeManager = em bot.OrderManager, err = engine.SetupOrderManager(em, &engine.CommunicationManager{}, &bot.ServicesWG, false, false, 0) if !errors.Is(err, nil) { @@ -238,7 +241,7 @@ func TestExecuteOrder(t *testing.T) { t.Parallel() bot := &engine.Engine{} var err error - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() const testExchange = "binanceus" exch, err := em.NewExchangeByName(testExchange) if err != nil { @@ -247,7 +250,10 @@ func TestExecuteOrder(t *testing.T) { exch.SetDefaults() exchB := exch.GetBase() exchB.States = currencystate.NewCurrencyStates() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } bot.ExchangeManager = em bot.OrderManager, err = engine.SetupOrderManager(em, &engine.CommunicationManager{}, &bot.ServicesWG, false, false, 0) if !errors.Is(err, nil) { @@ -359,7 +365,7 @@ func TestExecuteOrderBuySellSizeLimit(t *testing.T) { t.Parallel() bot := &engine.Engine{} var err error - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() const testExchange = "BTC Markets" exch, err := em.NewExchangeByName(testExchange) if err != nil { @@ -368,7 +374,10 @@ func TestExecuteOrderBuySellSizeLimit(t *testing.T) { exch.SetDefaults() exchB := exch.GetBase() exchB.States = currencystate.NewCurrencyStates() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } bot.ExchangeManager = em bot.OrderManager, err = engine.SetupOrderManager(em, &engine.CommunicationManager{}, &bot.ServicesWG, false, false, 0) if !errors.Is(err, nil) { diff --git a/backtester/funding/funding_test.go b/backtester/funding/funding_test.go index a4d99494..6a0eccfd 100644 --- a/backtester/funding/funding_test.go +++ b/backtester/funding/funding_test.go @@ -779,13 +779,16 @@ func TestUpdateCollateral(t *testing.T) { currency: currency.BTC, available: decimal.NewFromInt(1336), }) - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(exchName) if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } f.exchangeManager = em expectedError = nil @@ -917,7 +920,7 @@ func TestUpdateFundingFromLiveData(t *testing.T) { t.Errorf("received '%v', expected '%v'", err, engine.ErrNilSubsystem) } - f.exchangeManager = engine.SetupExchangeManager() + f.exchangeManager = engine.NewExchangeManager() err = f.UpdateFundingFromLiveData(false) if !errors.Is(err, nil) { t.Errorf("received '%v', expected '%v'", err, nil) @@ -925,7 +928,10 @@ func TestUpdateFundingFromLiveData(t *testing.T) { ff := &binance.Binance{} ff.SetDefaults() - f.exchangeManager.Add(ff) + err = f.exchangeManager.Add(ff) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } err = f.UpdateFundingFromLiveData(false) if !errors.Is(err, exchange.ErrCredentialsAreEmpty) { t.Errorf("received '%v', expected '%v'", err, exchange.ErrCredentialsAreEmpty) @@ -960,7 +966,7 @@ func TestUpdateAllCollateral(t *testing.T) { t.Errorf("received '%v', expected '%v'", err, engine.ErrNilSubsystem) } - f.exchangeManager = engine.SetupExchangeManager() + f.exchangeManager = engine.NewExchangeManager() err = f.UpdateAllCollateral(false, false) if !errors.Is(err, nil) { t.Errorf("received '%v', expected '%v'", err, nil) @@ -968,7 +974,10 @@ func TestUpdateAllCollateral(t *testing.T) { ff := &binance.Binance{} ff.SetDefaults() - f.exchangeManager.Add(ff) + err = f.exchangeManager.Add(ff) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } err = f.UpdateAllCollateral(false, false) if !errors.Is(err, gctcommon.ErrNotYetImplemented) { t.Errorf("received '%v', expected '%v'", err, gctcommon.ErrNotYetImplemented) diff --git a/backtester/funding/trackingcurrencies/trackingcurrencies_test.go b/backtester/funding/trackingcurrencies/trackingcurrencies_test.go index 4ba70df5..75108915 100644 --- a/backtester/funding/trackingcurrencies/trackingcurrencies_test.go +++ b/backtester/funding/trackingcurrencies/trackingcurrencies_test.go @@ -30,7 +30,7 @@ func TestCreateUSDTrackingPairs(t *testing.T) { t.Errorf("received '%v' expected '%v'", err, errExchangeManagerRequired) } - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() _, err = CreateUSDTrackingPairs([]TrackingPair{{Exchange: eName}}, em) if !errors.Is(err, engine.ErrExchangeNotFound) { t.Errorf("received '%v' expected '%v'", err, engine.ErrExchangeNotFound) @@ -61,7 +61,10 @@ func TestCreateUSDTrackingPairs(t *testing.T) { eba.Enabled = eba.Enabled.Add(cp3) eba.AssetEnabled = convert.BoolPtr(true) - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } resp, err := CreateUSDTrackingPairs([]TrackingPair{s1}, em) if !errors.Is(err, nil) { t.Errorf("received '%v' expected '%v'", err, nil) diff --git a/cmd/exchange_wrapper_coverage/main.go b/cmd/exchange_wrapper_coverage/main.go index 58b5b453..c5cc3d9c 100644 --- a/cmd/exchange_wrapper_coverage/main.go +++ b/cmd/exchange_wrapper_coverage/main.go @@ -21,12 +21,14 @@ func main() { } engine.Bot.Settings = engine.Settings{ - DisableExchangeAutoPairUpdates: true, - EnableDryRun: true, + CoreSettings: engine.CoreSettings{EnableDryRun: true}, + ExchangeTuningSettings: engine.ExchangeTuningSettings{ + DisableExchangeAutoPairUpdates: true, + }, } engine.Bot.Config.PurgeExchangeAPICredentials() - engine.Bot.ExchangeManager = engine.SetupExchangeManager() + engine.Bot.ExchangeManager = engine.NewExchangeManager() log.Printf("Loading exchanges..") var wg sync.WaitGroup diff --git a/cmd/exchange_wrapper_issues/main.go b/cmd/exchange_wrapper_issues/main.go index 2ab22a29..1fd6461f 100644 --- a/cmd/exchange_wrapper_issues/main.go +++ b/cmd/exchange_wrapper_issues/main.go @@ -43,12 +43,14 @@ func main() { log.Fatalf("Failed to initialise engine. Err: %s", err) } engine.Bot = bot - bot.ExchangeManager = engine.SetupExchangeManager() + bot.ExchangeManager = engine.NewExchangeManager() bot.Settings = engine.Settings{ - DisableExchangeAutoPairUpdates: true, - Verbose: verboseOverride, - EnableExchangeHTTPRateLimiter: true, + CoreSettings: engine.CoreSettings{Verbose: verboseOverride}, + ExchangeTuningSettings: engine.ExchangeTuningSettings{ + DisableExchangeAutoPairUpdates: true, + EnableExchangeHTTPRateLimiter: true, + }, } log.Println("Loading config...") diff --git a/common/common.go b/common/common.go index a15033ff..d4373922 100644 --- a/common/common.go +++ b/common/common.go @@ -19,6 +19,7 @@ import ( "strings" "sync" "time" + "unicode" "github.com/thrasher-corp/gocryptotrader/common/file" "github.com/thrasher-corp/gocryptotrader/log" @@ -412,6 +413,37 @@ func SplitStringSliceByLimit(in []string, limit uint) [][]string { return sliceSlice } +// AddPaddingOnUpperCase adds padding to a string when detecting an upper case letter. If +// there are multiple upper case items like `ThisIsHTTPExample`, it will only +// pad between like this `This Is HTTP Example`. +func AddPaddingOnUpperCase(s string) string { + if s == "" { + return "" + } + var result []string + left := 0 + for x := 0; x < len(s); x++ { + if x == 0 { + continue + } + + if unicode.IsUpper(rune(s[x])) { + if !unicode.IsUpper(rune(s[x-1])) { + result = append(result, s[left:x]) + left = x + } + } else if x > 1 && unicode.IsUpper(rune(s[x-1])) { + if s[left:x-1] == "" { + continue + } + result = append(result, s[left:x-1]) + left = x - 1 + } + } + result = append(result, s[left:]) + return strings.Join(result, " ") +} + // InArray checks if _val_ belongs to _array_ func InArray(val, array interface{}) (exists bool, index int) { exists = false diff --git a/common/common_test.go b/common/common_test.go index aee8dd5b..c10a5c2e 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -583,6 +583,37 @@ func TestSplitStringSliceByLimit(t *testing.T) { } } +func TestAddPaddingOnUpperCase(t *testing.T) { + t.Parallel() + + testCases := []struct { + Supplied string + Expected string + }{ + { + // empty + }, + { + Supplied: "ExpectedHTTPRainbow", + Expected: "Expected HTTP Rainbow", + }, + { + Supplied: "SmellyCatSmellsBad", + Expected: "Smelly Cat Smells Bad", + }, + { + Supplied: "Gronk", + Expected: "Gronk", + }, + } + + for x := range testCases { + if received := AddPaddingOnUpperCase(testCases[x].Supplied); received != testCases[x].Expected { + t.Fatalf("received '%v' but expected '%v'", received, testCases[x].Expected) + } + } +} + func TestInArray(t *testing.T) { t.Parallel() InArray(nil, nil) diff --git a/engine/apiserver_test.go b/engine/apiserver_test.go index ca955713..f10d1993 100644 --- a/engine/apiserver_test.go +++ b/engine/apiserver_test.go @@ -190,13 +190,16 @@ func TestIsWebsocketServerRunning(t *testing.T) { } func TestGetAllActiveOrderbooks(t *testing.T) { - man := SetupExchangeManager() + man := NewExchangeManager() bs, err := man.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } bs.SetDefaults() - man.Add(bs) + err = man.Add(bs) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } resp := getAllActiveOrderbooks(man) if resp == nil { t.Error("expected not nil") @@ -205,13 +208,16 @@ func TestGetAllActiveOrderbooks(t *testing.T) { func TestGetAllActiveTickers(t *testing.T) { t.Parallel() - man := SetupExchangeManager() + man := NewExchangeManager() bs, err := man.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } bs.SetDefaults() - man.Add(bs) + err = man.Add(bs) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } resp := getAllActiveTickers(man) if resp == nil { t.Error("expected not nil") @@ -220,13 +226,16 @@ func TestGetAllActiveTickers(t *testing.T) { func TestGetAllActiveAccounts(t *testing.T) { t.Parallel() - man := SetupExchangeManager() + man := NewExchangeManager() bs, err := man.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } bs.SetDefaults() - man.Add(bs) + err = man.Add(bs) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } resp := getAllActiveAccounts(man) if resp == nil { t.Error("expected not nil") diff --git a/engine/datahistory_manager_test.go b/engine/datahistory_manager_test.go index fbfdd780..d39760e3 100644 --- a/engine/datahistory_manager_test.go +++ b/engine/datahistory_manager_test.go @@ -31,17 +31,17 @@ func TestSetupDataHistoryManager(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, errNilConfig) } - _, err = SetupDataHistoryManager(SetupExchangeManager(), nil, nil) + _, err = SetupDataHistoryManager(NewExchangeManager(), nil, nil) if !errors.Is(err, errNilDatabaseConnectionManager) { t.Errorf("error '%v', expected '%v'", err, errNilDatabaseConnectionManager) } - _, err = SetupDataHistoryManager(SetupExchangeManager(), &DatabaseConnectionManager{}, nil) + _, err = SetupDataHistoryManager(NewExchangeManager(), &DatabaseConnectionManager{}, nil) if !errors.Is(err, errNilConfig) { t.Errorf("error '%v', expected '%v'", err, errNilConfig) } - _, err = SetupDataHistoryManager(SetupExchangeManager(), &DatabaseConnectionManager{}, &config.DataHistoryManager{}) + _, err = SetupDataHistoryManager(NewExchangeManager(), &DatabaseConnectionManager{}, &config.DataHistoryManager{}) if !errors.Is(err, database.ErrNilInstance) { t.Errorf("error '%v', expected '%v'", err, database.ErrNilInstance) } @@ -60,7 +60,7 @@ func TestSetupDataHistoryManager(t *testing.T) { if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } - m, err := SetupDataHistoryManager(SetupExchangeManager(), dbCM, &config.DataHistoryManager{}) + m, err := SetupDataHistoryManager(NewExchangeManager(), dbCM, &config.DataHistoryManager{}) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -655,7 +655,7 @@ func TestCompareJobsToData(t *testing.T) { } } -func TestRunJob(t *testing.T) { +func TestRunJob(t *testing.T) { //nolint:tparallel // There is a race condition caused by the DataHistoryJob and it's a big change to fix. t.Parallel() tt := time.Now().Truncate(kline.OneHour.Duration()) testCases := []*DataHistoryJob{ @@ -731,7 +731,6 @@ func TestRunJob(t *testing.T) { for x := range testCases { test := testCases[x] t.Run(test.Nickname, func(t *testing.T) { - t.Parallel() err := m.UpsertJob(test, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -912,7 +911,7 @@ func TestConverters(t *testing.T) { // test helper functions func createDHM(t *testing.T) (*DataHistoryManager, *datahistoryjob.DataHistoryJob) { t.Helper() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Fatalf("error '%v', expected '%v'", err, nil) @@ -926,7 +925,10 @@ func createDHM(t *testing.T) (*DataHistoryManager, *datahistoryjob.DataHistoryJo Available: currency.Pairs{cp, cp2}, Enabled: currency.Pairs{cp, cp2}, AssetEnabled: convert.BoolPtr(true)} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } exch2, err := em.NewExchangeByName("Binance") if !errors.Is(err, nil) { @@ -943,7 +945,11 @@ func createDHM(t *testing.T) (*DataHistoryManager, *datahistoryjob.DataHistoryJo RequestFormat: ¤cy.PairFormat{Uppercase: true}, } - em.Add(exch2) + err = em.Add(exch2) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + j := &datahistoryjob.DataHistoryJob{ ID: jobID, Nickname: "datahistoryjob", @@ -968,16 +974,14 @@ func createDHM(t *testing.T) (*DataHistoryManager, *datahistoryjob.DataHistoryJo } m := &DataHistoryManager{ databaseConnectionInstance: &dataBaseConnection{}, - jobDB: dataHistoryJobService{ - job: j, - }, - jobResultDB: dataHistoryJobResultService{}, - started: 1, - exchangeManager: em, - candleLoader: dataHistoryCandleLoader, - interval: time.NewTicker(time.Minute), - verbose: true, - maxResultInsertions: defaultMaxResultInsertions, + jobDB: &dataHistoryJobService{job: j}, + jobResultDB: dataHistoryJobResultService{}, + started: 1, + exchangeManager: em, + candleLoader: dataHistoryCandleLoader, + interval: time.NewTicker(time.Minute), + verbose: true, + maxResultInsertions: defaultMaxResultInsertions, } return m, j } @@ -1017,7 +1021,7 @@ func TestProcessCandleData(t *testing.T) { t.Errorf("received %v expected %v", err, ErrExchangeNotFound) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -1073,7 +1077,7 @@ func TestProcessTradeData(t *testing.T) { t.Errorf("received %v expected %v", err, ErrExchangeNotFound) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -1192,7 +1196,7 @@ func TestValidateCandles(t *testing.T) { t.Errorf("received %v expected %v", err, ErrExchangeNotFound) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/engine.go b/engine/engine.go index f8ab5312..e98a38b2 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "reflect" "runtime" "strings" "sync" @@ -114,7 +115,7 @@ func NewFromSettings(settings *Settings, flagSet map[string]bool) (*Engine, erro return nil, fmt.Errorf("failed to create script manager. Err: %w", err) } - b.ExchangeManager = SetupExchangeManager() + b.ExchangeManager = NewExchangeManager() validateSettings(&b, settings, flagSet) @@ -263,76 +264,31 @@ func validateSettings(b *Engine, s *Settings, flagSet FlagSet) { } } -// PrintSettings returns the engine settings -func PrintSettings(s *Settings) { +// PrintLoadedSettings logs loaded settings. +func (s *Settings) PrintLoadedSettings() { + if s == nil { + return + } gctlog.Debugln(gctlog.Global) gctlog.Debugf(gctlog.Global, "ENGINE SETTINGS") - gctlog.Debugf(gctlog.Global, "- CORE SETTINGS:") - gctlog.Debugf(gctlog.Global, "\t Verbose mode: %v", s.Verbose) - gctlog.Debugf(gctlog.Global, "\t Enable dry run mode: %v", s.EnableDryRun) - gctlog.Debugf(gctlog.Global, "\t Enable all exchanges: %v", s.EnableAllExchanges) - gctlog.Debugf(gctlog.Global, "\t Enable all pairs: %v", s.EnableAllPairs) - gctlog.Debugf(gctlog.Global, "\t Enable CoinMarketCap analysis: %v", s.EnableCoinmarketcapAnalysis) - gctlog.Debugf(gctlog.Global, "\t Enable portfolio manager: %v", s.EnablePortfolioManager) - gctlog.Debugf(gctlog.Global, "\t Enable data history manager: %v", s.EnableDataHistoryManager) - gctlog.Debugf(gctlog.Global, "\t Enable currency state manager: %v", s.EnableCurrencyStateManager) - gctlog.Debugf(gctlog.Global, "\t Portfolio manager sleep delay: %v\n", s.PortfolioManagerDelay) - gctlog.Debugf(gctlog.Global, "\t Enable gPRC: %v", s.EnableGRPC) - gctlog.Debugf(gctlog.Global, "\t Enable gRPC Proxy: %v", s.EnableGRPCProxy) - gctlog.Debugf(gctlog.Global, "\t Enable gRPC shutdown of bot instance: %v", s.EnableGRPCShutdown) - gctlog.Debugf(gctlog.Global, "\t Enable websocket RPC: %v", s.EnableWebsocketRPC) - gctlog.Debugf(gctlog.Global, "\t Enable deprecated RPC: %v", s.EnableDeprecatedRPC) - gctlog.Debugf(gctlog.Global, "\t Enable comms relayer: %v", s.EnableCommsRelayer) - gctlog.Debugf(gctlog.Global, "\t Enable event manager: %v", s.EnableEventManager) - gctlog.Debugf(gctlog.Global, "\t Event manager sleep delay: %v", s.EventManagerDelay) - gctlog.Debugf(gctlog.Global, "\t Enable order manager: %v", s.EnableOrderManager) - gctlog.Debugf(gctlog.Global, "\t Enable exchange sync manager: %v", s.EnableExchangeSyncManager) - gctlog.Debugf(gctlog.Global, "\t Enable deposit address manager: %v\n", s.EnableDepositAddressManager) - gctlog.Debugf(gctlog.Global, "\t Enable websocket routine: %v\n", s.EnableWebsocketRoutine) - gctlog.Debugf(gctlog.Global, "\t Enable NTP client: %v", s.EnableNTPClient) - gctlog.Debugf(gctlog.Global, "\t Enable Database manager: %v", s.EnableDatabaseManager) - gctlog.Debugf(gctlog.Global, "\t Enable dispatcher: %v", s.EnableDispatcher) - gctlog.Debugf(gctlog.Global, "\t Dispatch package max worker amount: %d", s.DispatchMaxWorkerAmount) - gctlog.Debugf(gctlog.Global, "\t Dispatch package jobs limit: %d", s.DispatchJobsLimit) - gctlog.Debugf(gctlog.Global, "\t Futures PNL tracking: %v", s.EnableFuturesTracking) - gctlog.Debugf(gctlog.Global, "- EXCHANGE SYNCER SETTINGS:\n") - gctlog.Debugf(gctlog.Global, "\t Exchange sync continuously: %v\n", s.SyncContinuously) - gctlog.Debugf(gctlog.Global, "\t Exchange sync workers count: %v\n", s.SyncWorkersCount) - gctlog.Debugf(gctlog.Global, "\t Enable ticker syncing: %v\n", s.EnableTickerSyncing) - gctlog.Debugf(gctlog.Global, "\t Enable orderbook syncing: %v\n", s.EnableOrderbookSyncing) - gctlog.Debugf(gctlog.Global, "\t Enable trade syncing: %v\n", s.EnableTradeSyncing) - gctlog.Debugf(gctlog.Global, "\t Exchange REST sync timeout: %v\n", s.SyncTimeoutREST) - gctlog.Debugf(gctlog.Global, "\t Exchange Websocket sync timeout: %v\n", s.SyncTimeoutWebsocket) - gctlog.Debugf(gctlog.Global, "- FOREX SETTINGS:") - gctlog.Debugf(gctlog.Global, "\t Enable Currency Converter: %v", s.EnableCurrencyConverter) - gctlog.Debugf(gctlog.Global, "\t Enable Currency Layer: %v", s.EnableCurrencyLayer) - gctlog.Debugf(gctlog.Global, "\t Enable ExchangeRatesAPI.io: %v", s.EnableExchangeRates) - gctlog.Debugf(gctlog.Global, "\t Enable Fixer: %v", s.EnableFixer) - gctlog.Debugf(gctlog.Global, "\t Enable OpenExchangeRates: %v", s.EnableOpenExchangeRates) - gctlog.Debugf(gctlog.Global, "\t Enable ExchangeRateHost: %v", s.EnableExchangeRateHost) - gctlog.Debugf(gctlog.Global, "- EXCHANGE SETTINGS:") - gctlog.Debugf(gctlog.Global, "\t Enable exchange auto pair updates: %v", s.EnableExchangeAutoPairUpdates) - gctlog.Debugf(gctlog.Global, "\t Disable all exchange auto pair updates: %v", s.DisableExchangeAutoPairUpdates) - gctlog.Debugf(gctlog.Global, "\t Enable exchange websocket support: %v", s.EnableExchangeWebsocketSupport) - gctlog.Debugf(gctlog.Global, "\t Enable exchange verbose mode: %v", s.EnableExchangeVerbose) - gctlog.Debugf(gctlog.Global, "\t Enable exchange HTTP rate limiter: %v", s.EnableExchangeHTTPRateLimiter) - gctlog.Debugf(gctlog.Global, "\t Enable exchange HTTP debugging: %v", s.EnableExchangeHTTPDebugging) - gctlog.Debugf(gctlog.Global, "\t Max HTTP request jobs: %v", s.MaxHTTPRequestJobsLimit) - gctlog.Debugf(gctlog.Global, "\t HTTP request max retry attempts: %v", s.RequestMaxRetryAttempts) - gctlog.Debugf(gctlog.Global, "\t Trade buffer processing interval: %v", s.TradeBufferProcessingInterval) - gctlog.Debugf(gctlog.Global, "\t Alert communications channel pre-allocation buffer size: %v", s.AlertSystemPreAllocationCommsBuffer) - gctlog.Debugf(gctlog.Global, "\t HTTP timeout: %v", s.HTTPTimeout) - gctlog.Debugf(gctlog.Global, "\t HTTP user agent: %v", s.HTTPUserAgent) - gctlog.Debugf(gctlog.Global, "- GCTSCRIPT SETTINGS: ") - gctlog.Debugf(gctlog.Global, "\t Enable GCTScript manager: %v", s.EnableGCTScriptManager) - gctlog.Debugf(gctlog.Global, "\t GCTScript max virtual machines: %v", s.MaxVirtualMachines) - gctlog.Debugf(gctlog.Global, "- WITHDRAW SETTINGS: ") - gctlog.Debugf(gctlog.Global, "\t Withdraw Cache size: %v", s.WithdrawCacheSize) - gctlog.Debugf(gctlog.Global, "- COMMON SETTINGS:") - gctlog.Debugf(gctlog.Global, "\t Global HTTP timeout: %v", s.GlobalHTTPTimeout) - gctlog.Debugf(gctlog.Global, "\t Global HTTP user agent: %v", s.GlobalHTTPUserAgent) - gctlog.Debugf(gctlog.Global, "\t Global HTTP proxy: %v", s.GlobalHTTPProxy) + settings := reflect.ValueOf(*s) + for x := 0; x < settings.NumField(); x++ { + field := settings.Field(x) + if field.Kind() != reflect.Struct { + continue + } + fieldName := field.Type().Name() + gctlog.Debugln(gctlog.Global, "- "+common.AddPaddingOnUpperCase(fieldName)+":") + for y := 0; y < field.NumField(); y++ { + indvSetting := field.Field(y) + indvName := field.Type().Field(y).Name + if indvSetting.Kind() == reflect.String && indvSetting.IsZero() { + indvSetting = reflect.ValueOf("Undefined") + } + gctlog.Debugln(gctlog.Global, "\t", common.AddPaddingOnUpperCase(indvName)+":", indvSetting) + } + } gctlog.Debugln(gctlog.Global) } @@ -713,12 +669,18 @@ func (bot *Engine) Stop() { } } - if err := currency.ShutdownStorageUpdater(); err != nil { + err := bot.ExchangeManager.Shutdown(bot.Settings.ExchangeShutdownTimeout) + if err != nil { + gctlog.Errorf(gctlog.Global, "Exchange manager unable to stop. Error: %v", err) + } + + err = currency.ShutdownStorageUpdater() + if err != nil { gctlog.Errorf(gctlog.Global, "ExchangeSettings storage system. Error: %v", err) } if !bot.Settings.EnableDryRun { - err := bot.Config.SaveConfigToFile(bot.Settings.ConfigFile) + err = bot.Config.SaveConfigToFile(bot.Settings.ConfigFile) if err != nil { gctlog.Errorln(gctlog.Global, "Unable to save config.") } else { @@ -860,7 +822,11 @@ func (bot *Engine) LoadExchange(name string, wg *sync.WaitGroup) error { return err } - bot.ExchangeManager.Add(exch) + err = bot.ExchangeManager.Add(exch) + if err != nil { + return err + } + base := exch.GetBase() if base.API.AuthenticatedSupport || base.API.AuthenticatedWebsocketSupport { diff --git a/engine/engine_test.go b/engine/engine_test.go index 145468c9..92d59c75 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -32,7 +32,7 @@ func TestLoadConfigWithSettings(t *testing.T) { name: "test file", settings: &Settings{ ConfigFile: config.TestFile, - EnableDryRun: true, + CoreSettings: CoreSettings{EnableDryRun: true}, }, want: &empty, wantErr: false, @@ -43,7 +43,7 @@ func TestLoadConfigWithSettings(t *testing.T) { settings: &Settings{ ConfigFile: config.TestFile, DataDir: somePath, - EnableDryRun: true, + CoreSettings: CoreSettings{EnableDryRun: true}, }, want: &somePath, wantErr: false, @@ -79,7 +79,7 @@ func TestStartStopDoesNotCausePanic(t *testing.T) { tempDir := t.TempDir() botOne, err := NewFromSettings(&Settings{ ConfigFile: config.TestFile, - EnableDryRun: true, + CoreSettings: CoreSettings{EnableDryRun: true}, DataDir: tempDir, }, nil) if err != nil { @@ -110,7 +110,7 @@ func TestStartStopTwoDoesNotCausePanic(t *testing.T) { tempDir2 := t.TempDir() botOne, err := NewFromSettings(&Settings{ ConfigFile: config.TestFile, - EnableDryRun: true, + CoreSettings: CoreSettings{EnableDryRun: true}, DataDir: tempDir, }, nil) if err != nil { @@ -120,7 +120,7 @@ func TestStartStopTwoDoesNotCausePanic(t *testing.T) { botTwo, err := NewFromSettings(&Settings{ ConfigFile: config.TestFile, - EnableDryRun: true, + CoreSettings: CoreSettings{EnableDryRun: true}, DataDir: tempDir2, }, nil) if err != nil { @@ -146,14 +146,17 @@ func TestGetExchangeByName(t *testing.T) { t.Errorf("received: %v expected: %v", err, ErrNilSubsystem) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Fatalf("received '%v' expected '%v'", err, nil) } exch.SetDefaults() exch.SetEnabled(true) - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } e := &Engine{ExchangeManager: em} if !exch.IsEnabled() { @@ -180,14 +183,17 @@ func TestGetExchangeByName(t *testing.T) { func TestUnloadExchange(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if !errors.Is(err, nil) { t.Fatalf("received '%v' expected '%v'", err, nil) } exch.SetDefaults() exch.SetEnabled(true) - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } e := &Engine{ExchangeManager: em, Config: &config.Config{Exchanges: []config.Exchange{{Name: testExchange}}}, } @@ -203,15 +209,15 @@ func TestUnloadExchange(t *testing.T) { } err = e.UnloadExchange(testExchange) - if !errors.Is(err, ErrNoExchangesLoaded) { - t.Errorf("error '%v', expected '%v'", err, ErrNoExchangesLoaded) + if !errors.Is(err, ErrExchangeNotFound) { + t.Errorf("error '%v', expected '%v'", err, ErrExchangeNotFound) } } func TestDryRunParamInteraction(t *testing.T) { t.Parallel() bot := &Engine{ - ExchangeManager: SetupExchangeManager(), + ExchangeManager: NewExchangeManager(), Settings: Settings{}, Config: &config.Config{ Exchanges: []config.Exchange{ @@ -323,3 +329,12 @@ func TestSetDefaultWebsocketDataHandler(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) } } + +func TestSettingsPrint(t *testing.T) { + t.Parallel() + var s *Settings + s.PrintLoadedSettings() + + s = &Settings{} + s.PrintLoadedSettings() +} diff --git a/engine/engine_types.go b/engine/engine_types.go index 49b380ea..211c7140 100644 --- a/engine/engine_types.go +++ b/engine/engine_types.go @@ -5,7 +5,11 @@ import ( "time" ) -// Settings stores engine params +// Settings stores engine params. Please define a settings struct for automatic +// display of instance settings. For example, if you define a struct named +// ManagerSettings, it will be displayed as a subheading "Manager Settings" +// and individual field names such as 'EnableManager' will be displayed +// as "Enable Manager: true/false". type Settings struct { ConfigFile string DataDir string @@ -14,7 +18,19 @@ type Settings struct { GoMaxProcs int CheckParamInteraction bool - // Core Settings + CoreSettings + ExchangeSyncerSettings + ForexSettings + ExchangeTuningSettings + GCTScriptSettings + WithdrawSettings + + // Main shutdown channel + Shutdown chan struct{} +} + +// CoreSettings defines settings related to core engine operations +type CoreSettings struct { EnableDryRun bool EnableAllExchanges bool EnableAllPairs bool @@ -41,8 +57,13 @@ type Settings struct { EventManagerDelay time.Duration EnableFuturesTracking bool Verbose bool + EnableDispatcher bool + DispatchMaxWorkerAmount int + DispatchJobsLimit int +} - // Exchange syncer settings +// ExchangeSyncerSettings defines settings for the exchange pair synchronisation +type ExchangeSyncerSettings struct { EnableTickerSyncing bool EnableOrderbookSyncing bool EnableTradeSyncing bool @@ -50,16 +71,20 @@ type Settings struct { SyncContinuously bool SyncTimeoutREST time.Duration SyncTimeoutWebsocket time.Duration +} - // Forex settings +// ForexSettings defines settings related to the foreign exchange services +type ForexSettings struct { EnableCurrencyConverter bool EnableCurrencyLayer bool EnableExchangeRates bool EnableFixer bool EnableOpenExchangeRates bool EnableExchangeRateHost bool +} - // Exchange tuning settings +// ExchangeTuningSettings defines settings related to an exchange +type ExchangeTuningSettings struct { EnableExchangeHTTPRateLimiter bool EnableExchangeHTTPDebugging bool EnableExchangeVerbose bool @@ -72,30 +97,23 @@ type Settings struct { TradeBufferProcessingInterval time.Duration RequestMaxRetryAttempts int AlertSystemPreAllocationCommsBuffer int // See exchanges/alert.go + ExchangeShutdownTimeout time.Duration + HTTPTimeout time.Duration + HTTPUserAgent string + HTTPProxy string + GlobalHTTPTimeout time.Duration + GlobalHTTPUserAgent string + GlobalHTTPProxy string +} - // Global HTTP related settings - GlobalHTTPTimeout time.Duration - GlobalHTTPUserAgent string - GlobalHTTPProxy string - - // Exchange HTTP related settings - HTTPTimeout time.Duration - HTTPUserAgent string - HTTPProxy string - - // Dispatch system settings - EnableDispatcher bool - DispatchMaxWorkerAmount int - DispatchJobsLimit int - - // GCTscript settings +// GCTScriptSettings defines settings related to the GCTScript virtual machine +type GCTScriptSettings struct { MaxVirtualMachines uint +} - // Withdraw settings +// WithdrawSettings defines settings related to Withdrawing cryptocurrency +type WithdrawSettings struct { WithdrawCacheSize uint64 - - // Main shutdown channel - Shutdown chan struct{} } const ( diff --git a/engine/event_manager_test.go b/engine/event_manager_test.go index fc2ff3d2..545e2359 100644 --- a/engine/event_manager_test.go +++ b/engine/event_manager_test.go @@ -107,7 +107,7 @@ func TestEventManagerStop(t *testing.T) { func TestEventManagerAdd(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() m, err := setupEventManager(&CommunicationManager{}, em, 0, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -129,7 +129,10 @@ func TestEventManagerAdd(t *testing.T) { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } _, err = m.Add(testExchange, "", EventConditionParams{}, currency.NewPair(currency.BTC, currency.USDC), asset.Spot, "") if !errors.Is(err, errInvalidItem) { t.Errorf("error '%v', expected '%v'", err, errInvalidItem) @@ -159,7 +162,7 @@ func TestEventManagerAdd(t *testing.T) { func TestEventManagerRemove(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() m, err := setupEventManager(&CommunicationManager{}, em, 0, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -185,7 +188,10 @@ func TestEventManagerRemove(t *testing.T) { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } id, err := m.Add(testExchange, ItemPrice, cond, currency.NewPair(currency.BTC, currency.USDC), asset.Spot, action) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -198,7 +204,7 @@ func TestEventManagerRemove(t *testing.T) { func TestGetEventCounter(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() m, err := setupEventManager(&CommunicationManager{}, em, 0, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -226,7 +232,10 @@ func TestGetEventCounter(t *testing.T) { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } _, err = m.Add(testExchange, ItemPrice, cond, currency.NewPair(currency.BTC, currency.USDC), asset.Spot, action) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -239,7 +248,7 @@ func TestGetEventCounter(t *testing.T) { } func TestCheckEventCondition(t *testing.T) { - em := SetupExchangeManager() + em := NewExchangeManager() m, err := setupEventManager(&CommunicationManager{}, em, 0, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -272,7 +281,10 @@ func TestCheckEventCondition(t *testing.T) { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } _, err = m.Add(testExchange, ItemPrice, cond, currency.NewPair(currency.BTC, currency.USD), asset.Spot, action) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/exchange_manager.go b/engine/exchange_manager.go index c1c37674..c84b6ae4 100644 --- a/engine/exchange_manager.go +++ b/engine/exchange_manager.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/binance" @@ -43,6 +44,9 @@ var ( ErrExchangeAlreadyLoaded = errors.New("exchange already loaded") ErrExchangeFailedToLoad = errors.New("exchange failed to load") ErrExchangeNameIsEmpty = errors.New("exchange name is empty") + + errExchangeIsNil = errors.New("exchange is nil") + errExchangeAlreadyLoaded = errors.New("exchange already loaded") ) // CustomExchangeBuilder interface allows external applications to create @@ -53,26 +57,34 @@ type CustomExchangeBuilder interface { // ExchangeManager manages what exchanges are loaded type ExchangeManager struct { - m sync.Mutex + mtx sync.Mutex exchanges map[string]exchange.IBotExchange Builder CustomExchangeBuilder } -// SetupExchangeManager creates a new exchange manager -func SetupExchangeManager() *ExchangeManager { +// NewExchangeManager creates a new exchange manager +func NewExchangeManager() *ExchangeManager { return &ExchangeManager{ exchanges: make(map[string]exchange.IBotExchange), } } -// Add adds or replaces an exchange -func (m *ExchangeManager) Add(exch exchange.IBotExchange) { - if exch == nil { - return +// Add adds an exchange +func (m *ExchangeManager) Add(exch exchange.IBotExchange) error { + if m == nil { + return fmt.Errorf("exchange manager: %w", ErrNilSubsystem) + } + if exch == nil { + return fmt.Errorf("exchange manager: %w", errExchangeIsNil) + } + m.mtx.Lock() + defer m.mtx.Unlock() + _, ok := m.exchanges[strings.ToLower(exch.GetName())] + if ok { + return fmt.Errorf("exchange manager: %s %w", exch.GetName(), errExchangeAlreadyLoaded) } - m.m.Lock() m.exchanges[strings.ToLower(exch.GetName())] = exch - m.m.Unlock() + return nil } // GetExchanges returns all stored exchanges @@ -80,32 +92,37 @@ func (m *ExchangeManager) GetExchanges() ([]exchange.IBotExchange, error) { if m == nil { return nil, fmt.Errorf("exchange manager: %w", ErrNilSubsystem) } - m.m.Lock() - defer m.m.Unlock() + m.mtx.Lock() + defer m.mtx.Unlock() exchs := make([]exchange.IBotExchange, 0, len(m.exchanges)) - for _, x := range m.exchanges { - exchs = append(exchs, x) + for _, exch := range m.exchanges { + exchs = append(exchs, exch) } return exchs, nil } // RemoveExchange removes an exchange from the manager -func (m *ExchangeManager) RemoveExchange(exchName string) error { - if m.Len() == 0 { - return ErrNoExchangesLoaded +func (m *ExchangeManager) RemoveExchange(exchangeName string) error { + if m == nil { + return fmt.Errorf("exchange manager: %w", ErrNilSubsystem) } - exch, err := m.GetExchangeByName(exchName) + + if exchangeName == "" { + return fmt.Errorf("exchange manager: %w", ErrExchangeNameIsEmpty) + } + + m.mtx.Lock() + defer m.mtx.Unlock() + exch, ok := m.exchanges[strings.ToLower(exchangeName)] + if !ok { + return fmt.Errorf("exchange manager: %s %w", exchangeName, ErrExchangeNotFound) + } + err := exch.Shutdown() if err != nil { - return err + return fmt.Errorf("exchange manager: %w", err) } - m.m.Lock() - defer m.m.Unlock() - err = exch.GetBase().Requester.Shutdown() - if err != nil { - return err - } - delete(m.exchanges, strings.ToLower(exchName)) - log.Infof(log.ExchangeSys, "%s exchange unloaded successfully.\n", exchName) + delete(m.exchanges, strings.ToLower(exchangeName)) + log.Infof(log.ExchangeSys, "%s exchange unloaded successfully.\n", exchangeName) return nil } @@ -117,33 +134,27 @@ func (m *ExchangeManager) GetExchangeByName(exchangeName string) (exchange.IBotE if exchangeName == "" { return nil, fmt.Errorf("exchange manager: %w", ErrExchangeNameIsEmpty) } - m.m.Lock() - defer m.m.Unlock() + m.mtx.Lock() + defer m.mtx.Unlock() exch, ok := m.exchanges[strings.ToLower(exchangeName)] if !ok { - return nil, fmt.Errorf("%s %w", exchangeName, ErrExchangeNotFound) + return nil, fmt.Errorf("exchange manager: %s %w", exchangeName, ErrExchangeNotFound) } return exch, nil } -// Len says how many exchanges are loaded -func (m *ExchangeManager) Len() int { - m.m.Lock() - defer m.m.Unlock() - return len(m.exchanges) -} - // NewExchangeByName helps create a new exchange to be loaded func (m *ExchangeManager) NewExchangeByName(name string) (exchange.IBotExchange, error) { - if m == nil { - return nil, fmt.Errorf("exchange manager %w", ErrNilSubsystem) - } nameLower := strings.ToLower(name) - if exch, _ := m.GetExchangeByName(nameLower); exch != nil { - return nil, fmt.Errorf("%s %w", name, ErrExchangeAlreadyLoaded) + _, err := m.GetExchangeByName(nameLower) + if err != nil && !errors.Is(err, ErrExchangeNotFound) { + return nil, fmt.Errorf("exchange manager: %s %w", name, err) + } + if err == nil { + return nil, fmt.Errorf("exchange manager: %s %w", name, ErrExchangeAlreadyLoaded) } - var exch exchange.IBotExchange + var exch exchange.IBotExchange switch nameLower { case "binanceus": exch = new(binanceus.Binanceus) @@ -201,7 +212,66 @@ func (m *ExchangeManager) NewExchangeByName(name string) (exchange.IBotExchange, if m.Builder != nil { return m.Builder.NewExchangeByName(nameLower) } - return nil, fmt.Errorf("%s, %w", nameLower, ErrExchangeNotFound) + return nil, fmt.Errorf("exchange manager: %s, %w", nameLower, ErrExchangeNotFound) } return exch, nil } + +// Shutdown shuts down all exchanges and unloads them +func (m *ExchangeManager) Shutdown(shutdownTimeout time.Duration) error { + if m == nil { + return fmt.Errorf("exchange manager: %w", ErrNilSubsystem) + } + + if shutdownTimeout < 0 { + shutdownTimeout = 0 + } + + var lockout sync.Mutex + timer := time.NewTimer(shutdownTimeout) + var wg sync.WaitGroup + + m.mtx.Lock() + defer m.mtx.Unlock() + + lockout.Lock() + for _, exch := range m.exchanges { + wg.Add(1) + go func(wg *sync.WaitGroup, mtx *sync.Mutex, exch exchange.IBotExchange) { + err := exch.Shutdown() + if err != nil { + log.Errorf(log.ExchangeSys, "%s failed to shutdown %v.\n", exch.GetName(), err) + } else { + mtx.Lock() + delete(m.exchanges, strings.ToLower(exch.GetName())) + mtx.Unlock() + } + wg.Done() + }(&wg, &lockout, exch) + } + lockout.Unlock() + + ch := make(chan struct{}) + go func(wg *sync.WaitGroup, finish chan<- struct{}) { + wg.Wait() + finish <- struct{}{} + }(&wg, ch) + + select { + case <-timer.C: + // Possible deadlock in a number of operating exchanges. + lockout.Lock() + for name := range m.exchanges { + log.Warnf(log.ExchangeSys, "%s has failed to shutdown within %s, please review.\n", name, shutdownTimeout) + } + lockout.Unlock() + case <-ch: + // Every exchange has finished their shutdown call. + lockout.Lock() + for name := range m.exchanges { + log.Errorf(log.ExchangeSys, "%s has failed to shutdown due to error, please review.\n", name) + } + lockout.Unlock() + } + return nil +} diff --git a/engine/exchange_manager_test.go b/engine/exchange_manager_test.go index 25415f78..f5a63f91 100644 --- a/engine/exchange_manager_test.go +++ b/engine/exchange_manager_test.go @@ -11,9 +11,15 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" ) -func TestSetupExchangeManager(t *testing.T) { +type broken struct { + bitfinex.Bitfinex +} + +func (b *broken) Shutdown() error { return errExpectedTestError } + +func TestNewExchangeManager(t *testing.T) { t.Parallel() - m := SetupExchangeManager() + m := NewExchangeManager() if m == nil { //nolint:staticcheck,nolintlint // SA5011 Ignore the nil warnings t.Fatalf("unexpected response") } @@ -24,10 +30,27 @@ func TestSetupExchangeManager(t *testing.T) { func TestExchangeManagerAdd(t *testing.T) { t.Parallel() - m := SetupExchangeManager() + var m *ExchangeManager + err := m.Add(nil) + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem) + } + + m = NewExchangeManager() + err = m.Add(nil) + if !errors.Is(err, errExchangeIsNil) { + t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeIsNil) + } b := new(bitfinex.Bitfinex) b.SetDefaults() - m.Add(b) + err = m.Add(b) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + err = m.Add(b) + if !errors.Is(err, errExchangeAlreadyLoaded) { + t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeAlreadyLoaded) + } exchanges, err := m.GetExchanges() if err != nil { t.Error("no exchange manager found") @@ -39,7 +62,13 @@ func TestExchangeManagerAdd(t *testing.T) { func TestExchangeManagerGetExchanges(t *testing.T) { t.Parallel() - m := SetupExchangeManager() + var m *ExchangeManager + _, err := m.GetExchanges() + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem) + } + + m = NewExchangeManager() exchanges, err := m.GetExchanges() if err != nil { t.Error("no exchange manager found") @@ -49,7 +78,10 @@ func TestExchangeManagerGetExchanges(t *testing.T) { } b := new(bitfinex.Bitfinex) b.SetDefaults() - m.Add(b) + err = m.Add(b) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } exchanges, err = m.GetExchanges() if err != nil { t.Error("no exchange manager found") @@ -61,30 +93,76 @@ func TestExchangeManagerGetExchanges(t *testing.T) { func TestExchangeManagerRemoveExchange(t *testing.T) { t.Parallel() - m := SetupExchangeManager() - if err := m.RemoveExchange("Bitfinex"); err != ErrNoExchangesLoaded { - t.Error("no exchanges should be loaded") + var m *ExchangeManager + err := m.RemoveExchange("") + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem) } + + m = NewExchangeManager() + + err = m.RemoveExchange("") + if !errors.Is(err, ErrExchangeNameIsEmpty) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrExchangeNameIsEmpty) + } + + err = m.RemoveExchange("Bitfinex") + if !errors.Is(err, ErrExchangeNotFound) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrExchangeNotFound) + } + b := new(bitfinex.Bitfinex) b.SetDefaults() - m.Add(b) - err := m.RemoveExchange("Bitstamp") + err = m.Add(b) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + err = m.RemoveExchange("Bitstamp") if !errors.Is(err, ErrExchangeNotFound) { t.Errorf("received: %v but expected: %v", err, ErrExchangeNotFound) } - if err := m.RemoveExchange("BiTFiNeX"); err != nil { - t.Error("exchange should have been removed") + + err = m.RemoveExchange("BiTFiNeX") + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) } - if m.Len() != 0 { + + if len(m.exchanges) != 0 { t.Error("exchange manager len should be 0") } + + brokenExch := &broken{} + brokenExch.SetDefaults() + + err = m.Add(brokenExch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + err = m.RemoveExchange("BiTFiNeX") + if !errors.Is(err, errExpectedTestError) { + t.Fatalf("received: '%v' but expected: '%v'", err, errExpectedTestError) + } } func TestNewExchangeByName(t *testing.T) { - m := SetupExchangeManager() + var m *ExchangeManager + _, err := m.NewExchangeByName("") + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem) + } + + m = NewExchangeManager() + _, err = m.NewExchangeByName("") + if !errors.Is(err, ErrExchangeNameIsEmpty) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrExchangeNameIsEmpty) + } + exchanges := []string{"binanceus", "binance", "bitfinex", "bitflyer", "bithumb", "bitmex", "bitstamp", "bittrex", "btc markets", "btse", "bybit", "coinut", "exmo", "coinbasepro", "gateio", "gemini", "hitbtc", "huobi", "itbit", "kraken", "lbank", "okcoin international", "okx", "poloniex", "yobit", "zb", "fake"} for i := range exchanges { - exch, err := m.NewExchangeByName(exchanges[i]) + var exch exchange.IBotExchange + exch, err = m.NewExchangeByName(exchanges[i]) if err != nil && exchanges[i] != "fake" { t.Fatal(err) } @@ -95,6 +173,19 @@ func TestNewExchangeByName(t *testing.T) { } } } + + load := &bitfinex.Bitfinex{} + load.SetDefaults() + + err = m.Add(load) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + _, err = m.NewExchangeByName("bitfinex") + if !errors.Is(err, ErrExchangeAlreadyLoaded) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrExchangeAlreadyLoaded) + } } type ExchangeBuilder struct{} @@ -113,7 +204,7 @@ func (n ExchangeBuilder) NewExchangeByName(name string) (exchange.IBotExchange, } func TestNewCustomExchangeByName(t *testing.T) { - m := SetupExchangeManager() + m := NewExchangeManager() m.Builder = ExchangeBuilder{} name := "customex" exch, err := m.NewExchangeByName(name) @@ -127,3 +218,31 @@ func TestNewCustomExchangeByName(t *testing.T) { } } } + +func TestExchangeManagerShutdown(t *testing.T) { + t.Parallel() + var m *ExchangeManager + err := m.Shutdown(-1) + if !errors.Is(err, ErrNilSubsystem) { + t.Fatalf("received: '%v' but expected: '%v'", err, ErrNilSubsystem) + } + + m = NewExchangeManager() + err = m.Shutdown(-1) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + brokenExch := &broken{} + brokenExch.SetDefaults() + + err = m.Add(brokenExch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + err = m.Shutdown(-1) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } +} diff --git a/engine/helpers_test.go b/engine/helpers_test.go index d6028aa8..8b1c643a 100644 --- a/engine/helpers_test.go +++ b/engine/helpers_test.go @@ -59,7 +59,7 @@ func CreateTestBot(t *testing.T) *Engine { }, } bot := &Engine{ - ExchangeManager: SetupExchangeManager(), + ExchangeManager: NewExchangeManager(), Config: &config.Config{Exchanges: []config.Exchange{ { Name: testExchange, @@ -1057,7 +1057,7 @@ func createDepositEngine(opts *fakeDepositExchangeOpts) *Engine { ps.Available = nil } return &Engine{ - Settings: Settings{Verbose: true}, + Settings: Settings{CoreSettings: CoreSettings{Verbose: true}}, Config: &config.Config{ Exchanges: []config.Exchange{ { @@ -1189,7 +1189,10 @@ func TestGetExchangeNames(t *testing.T) { } if exch != nil { exch.SetDefaults() - bot.ExchangeManager.Add(exch) + err = bot.ExchangeManager.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } } } if e := bot.GetExchangeNames(false); len(e) != len(bot.Config.Exchanges) { diff --git a/engine/order_manager_test.go b/engine/order_manager_test.go index 14ab5ac4..2daf865b 100644 --- a/engine/order_manager_test.go +++ b/engine/order_manager_test.go @@ -160,24 +160,24 @@ func TestSetupOrderManager(t *testing.T) { if !errors.Is(err, errNilExchangeManager) { t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager) } - _, err = SetupOrderManager(SetupExchangeManager(), nil, nil, false, false, 0) + _, err = SetupOrderManager(NewExchangeManager(), nil, nil, false, false, 0) if !errors.Is(err, errNilCommunicationsManager) { t.Errorf("error '%v', expected '%v'", err, errNilCommunicationsManager) } - _, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, nil, false, false, 0) + _, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, nil, false, false, 0) if !errors.Is(err, errNilWaitGroup) { t.Errorf("error '%v', expected '%v'", err, errNilWaitGroup) } var wg sync.WaitGroup - _, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) + _, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } - _, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, true, 0) + _, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, true, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } - _, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, true, 1337) + _, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, true, 1337) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -190,7 +190,7 @@ func TestOrderManagerStart(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem) } var wg sync.WaitGroup - m, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) + m, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -211,7 +211,7 @@ func TestOrderManagerIsRunning(t *testing.T) { } var wg sync.WaitGroup - m, err := SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) + m, err := SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -236,7 +236,7 @@ func TestOrderManagerStop(t *testing.T) { } var wg sync.WaitGroup - m, err = SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) + m, err = SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -258,7 +258,7 @@ func TestOrderManagerStop(t *testing.T) { func OrdersSetup(t *testing.T) *OrderManager { t.Helper() var wg sync.WaitGroup - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -277,7 +277,10 @@ func OrdersSetup(t *testing.T) *OrderManager { fakeExchange := omfExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -733,7 +736,7 @@ func TestOrderManager_Modify(t *testing.T) { func TestProcessOrders(t *testing.T) { var wg sync.WaitGroup - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -742,7 +745,10 @@ func TestProcessOrders(t *testing.T) { fakeExchange := omfExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -1305,13 +1311,16 @@ func TestSubmitFakeOrder(t *testing.T) { ord.Side = order.Buy ord.Type = order.Market ord.Amount = 1337 - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } o.orderStore.exchangeManager = em resp, err = ord.DeriveSubmitResponse("1234") @@ -1436,7 +1445,7 @@ func TestOrderManagerAdd(t *testing.T) { func TestGetAllOpenFuturesPositions(t *testing.T) { t.Parallel() wg := &sync.WaitGroup{} - o, err := SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, wg, false, false, time.Hour) + o, err := SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, wg, false, false, time.Hour) if !errors.Is(err, nil) { t.Errorf("received '%v', expected '%v'", err, nil) } @@ -1464,7 +1473,7 @@ func TestGetAllOpenFuturesPositions(t *testing.T) { func TestGetOpenFuturesPosition(t *testing.T) { t.Parallel() wg := &sync.WaitGroup{} - o, err := SetupOrderManager(SetupExchangeManager(), &CommunicationManager{}, wg, false, false, time.Hour) + o, err := SetupOrderManager(NewExchangeManager(), &CommunicationManager{}, wg, false, false, time.Hour) if !errors.Is(err, nil) { t.Errorf("received '%v', expected '%v'", err, nil) } @@ -1481,7 +1490,7 @@ func TestGetOpenFuturesPosition(t *testing.T) { t.Errorf("received '%v', expected '%v'", err, order.ErrNotFuturesAsset) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -1508,7 +1517,10 @@ func TestGetOpenFuturesPosition(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } o, err = SetupOrderManager(em, &CommunicationManager{}, wg, false, true, time.Hour) if !errors.Is(err, nil) { t.Errorf("received '%v', expected '%v'", err, nil) @@ -1557,7 +1569,7 @@ func TestProcessFuturesPositions(t *testing.T) { if !errors.Is(err, errFuturesTrackingDisabled) { t.Errorf("received '%v', expected '%v'", err, errFuturesTrackingDisabled) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -1594,7 +1606,10 @@ func TestProcessFuturesPositions(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup o, err = SetupOrderManager(em, &CommunicationManager{}, &wg, false, true, time.Hour) if !errors.Is(err, nil) { diff --git a/engine/portfolio_manager_test.go b/engine/portfolio_manager_test.go index 05724eb8..6ebecae7 100644 --- a/engine/portfolio_manager_test.go +++ b/engine/portfolio_manager_test.go @@ -12,7 +12,7 @@ func TestSetupPortfolioManager(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager) } - m, err := setupPortfolioManager(SetupExchangeManager(), 0, nil) + m, err := setupPortfolioManager(NewExchangeManager(), 0, nil) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -27,7 +27,7 @@ func TestIsPortfolioManagerRunning(t *testing.T) { t.Error("expected false") } - m, err := setupPortfolioManager(SetupExchangeManager(), 0, nil) + m, err := setupPortfolioManager(NewExchangeManager(), 0, nil) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -52,7 +52,7 @@ func TestPortfolioManagerStart(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem) } - m, err = setupPortfolioManager(SetupExchangeManager(), 0, nil) + m, err = setupPortfolioManager(NewExchangeManager(), 0, nil) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -81,7 +81,7 @@ func TestPortfolioManagerStop(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem) } - m, err = setupPortfolioManager(SetupExchangeManager(), 0, nil) + m, err = setupPortfolioManager(NewExchangeManager(), 0, nil) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -101,13 +101,16 @@ func TestPortfolioManagerStop(t *testing.T) { } func TestProcessPortfolio(t *testing.T) { - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("Bitstamp") if !errors.Is(err, nil) { t.Fatalf("error '%v', expected '%v'", err, nil) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err := setupPortfolioManager(em, 0, nil) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/rpcserver_test.go b/engine/rpcserver_test.go index 5199bf65..70c78a88 100644 --- a/engine/rpcserver_test.go +++ b/engine/rpcserver_test.go @@ -390,7 +390,7 @@ func RPCTestSetup(t *testing.T) *Engine { }) engerino.Config = &config.Config{} - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -405,7 +405,10 @@ func RPCTestSetup(t *testing.T) *Engine { AssetEnabled: convert.BoolPtr(true), ConfigFormat: ¤cy.PairFormat{Uppercase: true}, RequestFormat: ¤cy.PairFormat{Uppercase: true}} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } exch, err = em.NewExchangeByName("Binance") if err != nil { @@ -421,7 +424,10 @@ func RPCTestSetup(t *testing.T) *Engine { AssetEnabled: convert.BoolPtr(true), ConfigFormat: ¤cy.PairFormat{Uppercase: true}, RequestFormat: ¤cy.PairFormat{Uppercase: true}} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } engerino.ExchangeManager = em engerino.Config.Database = dbConf @@ -1048,7 +1054,7 @@ func TestFindMissingSavedCandleIntervals(t *testing.T) { func TestSetExchangeTradeProcessing(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -1058,7 +1064,10 @@ func TestSetExchangeTradeProcessing(t *testing.T) { b.Config = &config.Exchange{ Features: &config.FeaturesConfig{Enabled: config.FeaturesEnabledConfig{SaveTradeData: false}}, } - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} _, err = s.SetExchangeTradeProcessing(context.Background(), &gctrpc.SetExchangeTradeProcessingRequest{Exchange: testExchange, Status: true}) if err != nil { @@ -1166,7 +1175,7 @@ func TestGetHistoricTrades(t *testing.T) { func TestGetAccountInfo(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -1181,7 +1190,10 @@ func TestGetAccountInfo(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} _, err = s.GetAccountInfo(context.Background(), &gctrpc.GetAccountInfoRequest{Exchange: fakeExchangeName, AssetType: asset.Spot.String()}) if !errors.Is(err, nil) { @@ -1191,7 +1203,7 @@ func TestGetAccountInfo(t *testing.T) { func TestUpdateAccountInfo(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -1206,7 +1218,10 @@ func TestUpdateAccountInfo(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} _, err = s.GetAccountInfo(context.Background(), &gctrpc.GetAccountInfoRequest{Exchange: fakeExchangeName, AssetType: asset.Spot.String()}) @@ -1232,7 +1247,7 @@ func TestGetOrders(t *testing.T) { t.Parallel() exchName := "Binance" engerino := &Engine{} - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(exchName) if err != nil { t.Fatal(err) @@ -1247,7 +1262,10 @@ func TestGetOrders(t *testing.T) { AssetEnabled: convert.BoolPtr(true), ConfigFormat: ¤cy.PairFormat{Uppercase: true}, RequestFormat: ¤cy.PairFormat{Uppercase: true}} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, engerino.CommunicationsManager, &wg, false, false, 0) if !errors.Is(err, nil) { @@ -1339,7 +1357,7 @@ func TestGetOrder(t *testing.T) { t.Parallel() exchName := "Binance" engerino := &Engine{} - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(exchName) if err != nil { t.Fatal(err) @@ -1354,7 +1372,10 @@ func TestGetOrder(t *testing.T) { AssetEnabled: convert.BoolPtr(true), ConfigFormat: ¤cy.PairFormat{Uppercase: true}, RequestFormat: ¤cy.PairFormat{Uppercase: true}} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, engerino.CommunicationsManager, &wg, false, false, 0) if !errors.Is(err, nil) { @@ -1588,7 +1609,7 @@ func TestParseEvents(t *testing.T) { func TestRPCServerUpsertDataHistoryJob(t *testing.T) { t.Parallel() m, _ := createDHM(t) - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -1601,7 +1622,10 @@ func TestRPCServerUpsertDataHistoryJob(t *testing.T) { Available: currency.Pairs{cp}, Enabled: currency.Pairs{cp}, AssetEnabled: convert.BoolPtr(true)} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{dataHistoryManager: m, ExchangeManager: em}} _, err = s.UpsertDataHistoryJob(context.Background(), nil) if !errors.Is(err, errNilRequestData) { @@ -1872,7 +1896,7 @@ func TestGetDataHistoryJobSummary(t *testing.T) { func TestGetManagedOrders(t *testing.T) { exchName := "Binance" engerino := &Engine{} - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(exchName) if err != nil { t.Fatal(err) @@ -1887,7 +1911,10 @@ func TestGetManagedOrders(t *testing.T) { AssetEnabled: convert.BoolPtr(true), ConfigFormat: ¤cy.PairFormat{Uppercase: true}, RequestFormat: ¤cy.PairFormat{Uppercase: true}} - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, engerino.CommunicationsManager, &wg, false, false, 0) if !errors.Is(err, nil) { @@ -2168,7 +2195,7 @@ func TestCurrencyStateTrading(t *testing.T) { func TestCurrencyStateTradingPair(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -2192,7 +2219,10 @@ func TestCurrencyStateTradingPair(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em, currencyStateManager: &CurrencyStateManager{started: 1, iExchangeManager: em}}} @@ -2209,7 +2239,7 @@ func TestCurrencyStateTradingPair(t *testing.T) { func TestGetFuturesPositions(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -2243,7 +2273,10 @@ func TestGetFuturesPositions(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, time.Hour) if !errors.Is(err, nil) { @@ -2345,7 +2378,7 @@ func TestGetFuturesPositions(t *testing.T) { func TestGetCollateral(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -2375,7 +2408,10 @@ func TestGetCollateral(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{ Engine: &Engine{ ExchangeManager: em, @@ -2466,7 +2502,7 @@ func TestShutdown(t *testing.T) { func TestGetTechnicalAnalysis(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -2495,7 +2531,10 @@ func TestGetTechnicalAnalysis(t *testing.T) { } b.Features.Enabled.Kline.Intervals = kline.DeployExchangeIntervals(kline.OneDay) - em.Add(fExchange{IBotExchange: exch}) + err = em.Add(fExchange{IBotExchange: exch}) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{ Engine: &Engine{ ExchangeManager: em, @@ -2735,7 +2774,7 @@ func TestGetTechnicalAnalysis(t *testing.T) { func TestGetMarginRatesHistory(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(testExchange) if err != nil { t.Fatal(err) @@ -2759,7 +2798,10 @@ func TestGetMarginRatesHistory(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{ Engine: &Engine{ ExchangeManager: em, @@ -2874,7 +2916,7 @@ func TestGetMarginRatesHistory(t *testing.T) { func TestGetFundingRates(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -2907,7 +2949,10 @@ func TestGetFundingRates(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, time.Hour) if !errors.Is(err, nil) { @@ -2966,7 +3011,7 @@ func TestGetFundingRates(t *testing.T) { func TestGetManagedPosition(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -3003,7 +3048,10 @@ func TestGetManagedPosition(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, time.Hour) if !errors.Is(err, nil) { @@ -3105,7 +3153,7 @@ func TestGetManagedPosition(t *testing.T) { func TestGetAllManagedPositions(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -3142,7 +3190,10 @@ func TestGetAllManagedPositions(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } var wg sync.WaitGroup om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, time.Hour) if !errors.Is(err, nil) { @@ -3212,7 +3263,7 @@ func TestGetAllManagedPositions(t *testing.T) { func TestGetOrderbookMovement(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -3239,7 +3290,10 @@ func TestGetOrderbookMovement(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} @@ -3319,7 +3373,7 @@ func TestGetOrderbookMovement(t *testing.T) { func TestGetOrderbookAmountByNominal(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -3346,7 +3400,10 @@ func TestGetOrderbookAmountByNominal(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} @@ -3419,7 +3476,7 @@ func TestGetOrderbookAmountByNominal(t *testing.T) { func TestGetOrderbookAmountByImpact(t *testing.T) { t.Parallel() - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("binance") if err != nil { t.Fatal(err) @@ -3446,7 +3503,10 @@ func TestGetOrderbookAmountByImpact(t *testing.T) { fakeExchange := fExchange{ IBotExchange: exch, } - em.Add(fakeExchange) + err = em.Add(fakeExchange) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } s := RPCServer{Engine: &Engine{ExchangeManager: em}} diff --git a/engine/sync_manager_test.go b/engine/sync_manager_test.go index 89d8e2f0..167c96c0 100644 --- a/engine/sync_manager_test.go +++ b/engine/sync_manager_test.go @@ -64,13 +64,16 @@ func TestSyncManagerStart(t *testing.T) { if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m.exchangeManager = em m.config.SynchronizeContinuously = true err = m.Start() @@ -98,13 +101,16 @@ func TestSyncManagerStop(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem) } - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: ¤cy.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -146,13 +152,16 @@ func TestPrintTickerSummary(t *testing.T) { var m *syncManager m.PrintTickerSummary(&ticker.Price{}, "REST", nil) - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: ¤cy.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) @@ -185,13 +194,16 @@ func TestPrintOrderbookSummary(t *testing.T) { var m *syncManager m.PrintOrderbookSummary(nil, "REST", nil) - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName("Bitstamp") if err != nil { t.Fatal(err) } exch.SetDefaults() - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } m, err = setupSyncManager(&SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: ¤cy.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/websocketroutine_manager_test.go b/engine/websocketroutine_manager_test.go index 5d9df8e6..45eca731 100644 --- a/engine/websocketroutine_manager_test.go +++ b/engine/websocketroutine_manager_test.go @@ -19,26 +19,26 @@ func TestWebsocketRoutineManagerSetup(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager) } - _, err = setupWebsocketRoutineManager(SetupExchangeManager(), nil, nil, nil, false) + _, err = setupWebsocketRoutineManager(NewExchangeManager(), nil, nil, nil, false) if !errors.Is(err, errNilOrderManager) { t.Errorf("error '%v', expected '%v'", err, errNilOrderManager) } - _, err = setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, nil, nil, false) + _, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, nil, nil, false) if !errors.Is(err, errNilCurrencyPairSyncer) { t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairSyncer) } - _, err = setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, nil, false) + _, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, nil, false) if !errors.Is(err, errNilCurrencyConfig) { t.Errorf("error '%v', expected '%v'", err, errNilCurrencyConfig) } - _, err = setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{}, true) + _, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{}, true) if !errors.Is(err, errNilCurrencyPairFormat) { t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairFormat) } - m, err := setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) + m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -57,7 +57,7 @@ func TestWebsocketRoutineManagerStart(t *testing.T) { Uppercase: false, Delimiter: "-", }} - m, err = setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, cfg, true) + m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, cfg, true) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -77,7 +77,7 @@ func TestWebsocketRoutineManagerIsRunning(t *testing.T) { t.Error("expected false") } - m, err := setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) + m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -101,7 +101,7 @@ func TestWebsocketRoutineManagerStop(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem) } - m, err = setupWebsocketRoutineManager(SetupExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) + m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, ¤cy.Config{CurrencyPairFormat: ¤cy.PairFormat{}}, false) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) } @@ -123,14 +123,16 @@ func TestWebsocketRoutineManagerStop(t *testing.T) { func TestWebsocketRoutineManagerHandleData(t *testing.T) { var exchName = "Bitstamp" var wg sync.WaitGroup - em := SetupExchangeManager() + em := NewExchangeManager() exch, err := em.NewExchangeByName(exchName) if !errors.Is(err, nil) { t.Fatalf("error '%v', expected '%v'", err, nil) } exch.SetDefaults() - em.Add(exch) - + err = em.Add(exch) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } om, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false, false, 0) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/withdraw_manager_test.go b/engine/withdraw_manager_test.go index 57ebd402..bb51596f 100644 --- a/engine/withdraw_manager_test.go +++ b/engine/withdraw_manager_test.go @@ -22,7 +22,7 @@ const ( func withdrawManagerTestHelper(t *testing.T) (*ExchangeManager, *portfolioManager) { t.Helper() - em := SetupExchangeManager() + em := NewExchangeManager() b := new(bybit.Bybit) b.SetDefaults() cfg, err := b.GetDefaultConfig(context.Background()) @@ -33,7 +33,10 @@ func withdrawManagerTestHelper(t *testing.T) (*ExchangeManager, *portfolioManage if err != nil { t.Fatal(err) } - em.Add(b) + err = em.Add(b) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } pm, err := setupPortfolioManager(em, 0, &portfolio.Base{Addresses: []portfolio.Address{}}) if err != nil { t.Fatal(err) diff --git a/exchanges/binanceus/binanceus_websocket.go b/exchanges/binanceus/binanceus_websocket.go index cc50cbfa..bfd75426 100644 --- a/exchanges/binanceus/binanceus_websocket.go +++ b/exchanges/binanceus/binanceus_websocket.go @@ -100,9 +100,10 @@ func (bi *Binanceus) KeepAuthKeyAlive() { // ClosUserDataStream closes the User data stream and remove the listen key when closing the websocket. defer func() { er := bi.CloseUserDataStream(context.Background()) - log.Errorf(log.WebsocketMgr, - "%s closing user data stream error %v", - bi.Name, er) + if er != nil { + log.Errorf(log.WebsocketMgr, "%s closing user data stream error %v", + bi.Name, er) + } }() // Looping in 30 Minutes and updating the listenKey ticks := time.NewTicker(time.Minute * 30) diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 8fbf537c..c6abb6b8 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -1578,3 +1578,15 @@ func (b *Base) GetKlineExtendedRequest(pair currency.Pair, a asset.Item, interva return &kline.ExtendedRequest{Request: r, RangeHolder: dates}, nil } + +// Shutdown closes active websocket connections if available and then cleans up +// a REST requester instance. +func (b *Base) Shutdown() error { + if b.Websocket != nil { + err := b.Websocket.Shutdown() + if err != nil && !errors.Is(err, stream.ErrNotConnected) { + return err + } + } + return b.Requester.Shutdown() +} diff --git a/exchanges/interfaces.go b/exchanges/interfaces.go index 9308b9b7..7fbb608f 100644 --- a/exchanges/interfaces.go +++ b/exchanges/interfaces.go @@ -27,6 +27,7 @@ type IBotExchange interface { Setup(exch *config.Exchange) error Start(ctx context.Context, wg *sync.WaitGroup) error SetDefaults() + Shutdown() error GetName() string SetEnabled(bool) FetchTicker(ctx context.Context, p currency.Pair, a asset.Item) (*ticker.Price, error) diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index e43ddbc4..e56fcdee 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -26,6 +26,8 @@ const ( var ( // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + // ErrNotConnected defines an error when websocket is not connected + ErrNotConnected = errors.New("websocket is not connected") errAlreadyRunning = errors.New("connection monitor is already running") errExchangeConfigIsNil = errors.New("exchange config is nil") @@ -423,10 +425,12 @@ func (w *Websocket) Shutdown() error { defer w.m.Unlock() if !w.IsConnected() { - return fmt.Errorf("%v websocket: cannot shutdown a disconnected websocket", - w.exchangeName) + return fmt.Errorf("%v websocket: cannot shutdown %w", + w.exchangeName, + ErrNotConnected) } + // TODO: Interrupt connection and or close connection when it is re-established. if w.IsConnecting() { return fmt.Errorf("%v websocket: cannot shutdown, in the process of reconnection", w.exchangeName) diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index e1ea5af7..0c372599 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -198,12 +198,13 @@ func (w *WebsocketConnection) SetupPingHandler(handler PingHandler) { }() } -func (w *WebsocketConnection) setConnectedStatus(b bool) { +// setConnectedStatus sets connection status if changed it will return true. +// TODO: Swap out these atomic switches and opt for sync.RWMutex. +func (w *WebsocketConnection) setConnectedStatus(b bool) bool { if b { - atomic.StoreInt32(&w.connected, 1) - return + return atomic.SwapInt32(&w.connected, 1) == 0 } - atomic.StoreInt32(&w.connected, 0) + return atomic.SwapInt32(&w.connected, 0) == 1 } // IsConnected exposes websocket connection status @@ -216,16 +217,22 @@ func (w *WebsocketConnection) ReadMessage() Response { mType, resp, err := w.Connection.ReadMessage() if err != nil { if isDisconnectionError(err) { - w.setConnectedStatus(false) - select { - case w.readMessageErrors <- err: - default: - // bypass if there is no receiver, as this stops it returning - // when shutdown is called. - log.Warnf(log.WebsocketMgr, - "%s failed to relay error: %v", - w.ExchangeName, - err) + if w.setConnectedStatus(false) { + // NOTE: When w.setConnectedStatus() returns true the underlying + // state was changed and this infers that the connection was + // externally closed and an error is reported else Shutdown() + // method on WebsocketConnection type has been called and can + // be skipped. + select { + case w.readMessageErrors <- err: + default: + // bypass if there is no receiver, as this stops it returning + // when shutdown is called. + log.Warnf(log.WebsocketMgr, + "%s failed to relay error: %v", + w.ExchangeName, + err) + } } } return Response{} @@ -315,6 +322,7 @@ func (w *WebsocketConnection) Shutdown() error { if w == nil || w.Connection == nil { return nil } + w.setConnectedStatus(false) return w.Connection.UnderlyingConn().Close() } diff --git a/gctscript/wrappers/gct/exchange/exchange_test.go b/gctscript/wrappers/gct/exchange/exchange_test.go index 7dea04c1..8342a54a 100644 --- a/gctscript/wrappers/gct/exchange/exchange_test.go +++ b/gctscript/wrappers/gct/exchange/exchange_test.go @@ -34,8 +34,8 @@ const ( var ( settings = engine.Settings{ + CoreSettings: engine.CoreSettings{EnableDryRun: true}, ConfigFile: filepath.Join("..", "..", "..", "..", "testdata", "configtest.json"), - EnableDryRun: true, DataDir: filepath.Join("..", "..", "..", "..", "testdata", "gocryptotrader"), } exchangeTest = Exchange{} @@ -201,7 +201,7 @@ func setupEngine() (err error) { return err } - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() engine.Bot.ExchangeManager = em return engine.Bot.LoadExchange(exchName, nil) diff --git a/gctscript/wrappers/gct/gctwrapper_test.go b/gctscript/wrappers/gct/gctwrapper_test.go index 84ab0300..6dd87b73 100644 --- a/gctscript/wrappers/gct/gctwrapper_test.go +++ b/gctscript/wrappers/gct/gctwrapper_test.go @@ -19,10 +19,12 @@ import ( func TestMain(m *testing.M) { settings := engine.Settings{ - ConfigFile: filepath.Join("..", "..", "..", "testdata", "configtest.json"), - EnableDryRun: true, - DataDir: filepath.Join("..", "..", "..", "testdata", "gocryptotrader"), - EnableDepositAddressManager: true, + CoreSettings: engine.CoreSettings{ + EnableDryRun: true, + EnableDepositAddressManager: true, + }, + ConfigFile: filepath.Join("..", "..", "..", "testdata", "configtest.json"), + DataDir: filepath.Join("..", "..", "..", "testdata", "gocryptotrader"), } var err error engine.Bot, err = engine.NewFromSettings(&settings, nil) @@ -30,7 +32,7 @@ func TestMain(m *testing.M) { log.Print(err) os.Exit(1) } - em := engine.SetupExchangeManager() + em := engine.NewExchangeManager() exch, err := em.NewExchangeByName(exch.Value) if err != nil { log.Print(err) @@ -45,7 +47,10 @@ func TestMain(m *testing.M) { if err != nil { log.Fatal(err) } - em.Add(exch) + err = em.Add(exch) + if !errors.Is(err, nil) { + log.Fatalf("received: '%v' but expected: '%v'", err, nil) + } engine.Bot.ExchangeManager = em engine.Bot.WithdrawManager, err = engine.SetupWithdrawManager(em, nil, true) if err != nil { diff --git a/main.go b/main.go index 8ade9ab4..d0b1f545 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "runtime" + "time" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" @@ -96,6 +97,7 @@ func main() { flag.BoolVar(&settings.EnableExchangeHTTPDebugging, "exchangehttpdebugging", false, "sets the exchanges HTTP debugging") flag.DurationVar(&settings.TradeBufferProcessingInterval, "tradeprocessinginterval", trade.DefaultProcessorIntervalTime, "sets the interval to save trade buffer data to the database") flag.IntVar(&settings.AlertSystemPreAllocationCommsBuffer, "alertbuffer", alert.PreAllocCommsDefaultBuffer, "sets the size of the pre-allocation communications buffer") + flag.DurationVar(&settings.ExchangeShutdownTimeout, "exchangeshutdowntimeout", time.Second*10, "sets the maximum amount of time the program will wait for an exchange to shut down gracefully") // Common tuning settings flag.DurationVar(&settings.GlobalHTTPTimeout, "globalhttptimeout", 0, "sets common HTTP timeout value for HTTP requests") @@ -139,7 +141,8 @@ func main() { gctscript.Setup() - engine.PrintSettings(&engine.Bot.Settings) + engine.Bot.Settings.PrintLoadedSettings() + if err = engine.Bot.Start(); err != nil { errClose := gctlog.CloseLogger() if errClose != nil {