From a54c5107f4fb60e7b9eb59f0251afa43ccf44c2a Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 3 Sep 2021 11:59:52 +1000 Subject: [PATCH] engine: GetSubsystemsStatus fix (#773) * engine: GetSubsystemsStatus fix * engine: force map literal to stop doubling up on keys, expanded test coverage * engine: Deploy default for migration requirement. * glorious: nits addr * glorious: suggestion * tests: fix --- communications/communications.go | 6 +- dispatch/dispatch.go | 5 +- engine/communication_manager_test.go | 4 +- engine/connection_manager.go | 10 +- engine/connection_manager_test.go | 4 + engine/helpers.go | 92 +++++++++-------- engine/helpers_test.go | 146 +++++++++++++++++++++++++++ engine/ntp_manager.go | 2 +- engine/ntp_manager_test.go | 4 +- engine/ntp_manager_types.go | 2 +- engine/rpcserver.go | 43 ++++---- gctscript/vm/manager.go | 5 + 12 files changed, 251 insertions(+), 72 deletions(-) diff --git a/communications/communications.go b/communications/communications.go index a117e56c..06644896 100644 --- a/communications/communications.go +++ b/communications/communications.go @@ -15,13 +15,13 @@ type Communications struct { base.IComm } -// ErrNoCommunicationRelayersEnabled returns when no relayers enabled -var ErrNoCommunicationRelayersEnabled = errors.New("no communication relayers enabled") +// ErrNoRelayersEnabled returns when no communication relayers are enabled +var ErrNoRelayersEnabled = errors.New("no communication relayers are enabled") // NewComm sets up and returns a pointer to a Communications object func NewComm(cfg *base.CommunicationsConfig) (*Communications, error) { if !cfg.IsAnyEnabled() { - return nil, ErrNoCommunicationRelayersEnabled + return nil, ErrNoRelayersEnabled } var comm Communications diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d5e79ebd..291116a8 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -11,6 +11,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) +// ErrNotRunning defines an error when the dispatcher is not running +var ErrNotRunning = errors.New("dispatcher not running") + // Name is an exported subsystem name const Name = "dispatch" @@ -117,7 +120,7 @@ func (d *Dispatcher) start(workers, channelCapacity int) error { // stop stops the service and shuts down all worker routines func (d *Dispatcher) stop() error { if !atomic.CompareAndSwapUint32(&d.running, 1, 0) { - return errors.New("dispatcher not running") + return ErrNotRunning } close(d.shutdown) ch := make(chan struct{}) diff --git a/engine/communication_manager_test.go b/engine/communication_manager_test.go index dab3ccdf..e97eac53 100644 --- a/engine/communication_manager_test.go +++ b/engine/communication_manager_test.go @@ -16,8 +16,8 @@ func TestSetup(t *testing.T) { } _, err = SetupCommunicationManager(&base.CommunicationsConfig{}) - if !errors.Is(err, communications.ErrNoCommunicationRelayersEnabled) { - t.Errorf("error '%v', expected '%v'", err, communications.ErrNoCommunicationRelayersEnabled) + if !errors.Is(err, communications.ErrNoRelayersEnabled) { + t.Errorf("error '%v', expected '%v'", err, communications.ErrNoRelayersEnabled) } m, err := SetupCommunicationManager(&base.CommunicationsConfig{ diff --git a/engine/connection_manager.go b/engine/connection_manager.go index cf8b7ed8..ce261db2 100644 --- a/engine/connection_manager.go +++ b/engine/connection_manager.go @@ -1,6 +1,7 @@ package engine import ( + "errors" "fmt" "sync/atomic" @@ -12,6 +13,8 @@ import ( // ConnectionManagerName is an exported subsystem name const ConnectionManagerName = "internet_monitor" +var errConnectionCheckerIsNil = errors.New("connection checker is nil") + // connectionManager manages the connchecker type connectionManager struct { started int32 @@ -72,14 +75,17 @@ func (m *connectionManager) Start() error { // Stop stops the connection manager func (m *connectionManager) Stop() error { if m == nil { - return fmt.Errorf("connection manager %w", ErrNilSubsystem) + return fmt.Errorf("connection manager: %w", ErrNilSubsystem) } if atomic.LoadInt32(&m.started) == 0 { - return fmt.Errorf("connection manager %w", ErrSubSystemNotStarted) + return fmt.Errorf("connection manager: %w", ErrSubSystemNotStarted) } defer func() { atomic.CompareAndSwapInt32(&m.started, 1, 0) }() + if m.conn == nil { + return fmt.Errorf("connection manager: %w", errConnectionCheckerIsNil) + } log.Debugln(log.ConnectionMgr, "Connection manager shutting down...") m.conn.Shutdown() log.Debugln(log.ConnectionMgr, "Connection manager stopped.") diff --git a/engine/connection_manager_test.go b/engine/connection_manager_test.go index a66b054d..f1c0095b 100644 --- a/engine/connection_manager_test.go +++ b/engine/connection_manager_test.go @@ -69,6 +69,10 @@ func TestConnectionMonitorStart(t *testing.T) { func TestConnectionMonitorStop(t *testing.T) { t.Parallel() + err := (&connectionManager{started: 1}).Stop() + if !errors.Is(err, errConnectionCheckerIsNil) { + t.Errorf("error '%v', expected '%v'", err, errConnectionCheckerIsNil) + } m, err := setupConnectionManager(&config.ConnectionMonitorConfig{}) if !errors.Is(err, nil) { t.Errorf("error '%v', expected '%v'", err, nil) diff --git a/engine/helpers.go b/engine/helpers.go index 0607a8bb..91a41c37 100644 --- a/engine/helpers.go +++ b/engine/helpers.go @@ -34,29 +34,31 @@ import ( ) var ( - errCertExpired = errors.New("gRPC TLS certificate has expired") - errCertDataIsNil = errors.New("gRPC TLS certificate PEM data is nil") - errCertTypeInvalid = errors.New("gRPC TLS certificate type is invalid") + errCertExpired = errors.New("gRPC TLS certificate has expired") + errCertDataIsNil = errors.New("gRPC TLS certificate PEM data is nil") + errCertTypeInvalid = errors.New("gRPC TLS certificate type is invalid") + errSubsystemNotFound = errors.New("subsystem not found") + errGRPCManagementFault = errors.New("cannot manage GRPC subsystem via GRPC. Please manually change your config") ) // GetSubsystemsStatus returns the status of various subsystems func (bot *Engine) GetSubsystemsStatus() map[string]bool { - systems := make(map[string]bool) - systems[SyncManagerName] = bot.CommunicationsManager.IsRunning() - systems[ConnectionManagerName] = bot.connectionManager.IsRunning() - systems[OrderManagerName] = bot.OrderManager.IsRunning() - systems[PortfolioManagerName] = bot.portfolioManager.IsRunning() - systems[NTPManagerName] = bot.ntpManager.IsRunning() - systems[DatabaseConnectionManagerName] = bot.DatabaseManager.IsRunning() - systems[SyncManagerName] = bot.Settings.EnableExchangeSyncManager - systems[grpcName] = bot.Settings.EnableGRPC - systems[grpcProxyName] = bot.Settings.EnableGRPCProxy - systems[vm.Name] = bot.gctScriptManager.IsRunning() - systems[DeprecatedName] = bot.Settings.EnableDeprecatedRPC - systems[WebsocketName] = bot.Settings.EnableWebsocketRPC - systems[dispatch.Name] = dispatch.IsRunning() - systems[dataHistoryManagerName] = bot.dataHistoryManager.IsRunning() - return systems + return map[string]bool{ + CommunicationsManagerName: bot.CommunicationsManager.IsRunning(), + ConnectionManagerName: bot.connectionManager.IsRunning(), + OrderManagerName: bot.OrderManager.IsRunning(), + PortfolioManagerName: bot.portfolioManager.IsRunning(), + NTPManagerName: bot.ntpManager.IsRunning(), + DatabaseConnectionManagerName: bot.DatabaseManager.IsRunning(), + SyncManagerName: bot.Settings.EnableExchangeSyncManager, + grpcName: bot.Settings.EnableGRPC, + grpcProxyName: bot.Settings.EnableGRPCProxy, + vm.Name: bot.gctScriptManager.IsRunning(), + DeprecatedName: bot.Settings.EnableDeprecatedRPC, + WebsocketName: bot.Settings.EnableWebsocketRPC, + dispatch.Name: dispatch.IsRunning(), + dataHistoryManagerName: bot.dataHistoryManager.IsRunning(), + } } // RPCEndpoint stores an RPC endpoint status and addr @@ -66,29 +68,40 @@ type RPCEndpoint struct { } // GetRPCEndpoints returns a list of RPC endpoints and their listen addrs -func GetRPCEndpoints() map[string]RPCEndpoint { - endpoints := make(map[string]RPCEndpoint) - endpoints[grpcName] = RPCEndpoint{ - Started: Bot.Settings.EnableGRPC, - ListenAddr: "grpc://" + Bot.Config.RemoteControl.GRPC.ListenAddress, +func (bot *Engine) GetRPCEndpoints() (map[string]RPCEndpoint, error) { + if bot.Config == nil { + return nil, errNilConfig } - endpoints[grpcProxyName] = RPCEndpoint{ - Started: Bot.Settings.EnableGRPCProxy, - ListenAddr: "http://" + Bot.Config.RemoteControl.GRPC.GRPCProxyListenAddress, - } - endpoints[DeprecatedName] = RPCEndpoint{ - Started: Bot.Settings.EnableDeprecatedRPC, - ListenAddr: "http://" + Bot.Config.RemoteControl.DeprecatedRPC.ListenAddress, - } - endpoints[WebsocketName] = RPCEndpoint{ - Started: Bot.Settings.EnableWebsocketRPC, - ListenAddr: "ws://" + Bot.Config.RemoteControl.WebsocketRPC.ListenAddress, - } - return endpoints + return map[string]RPCEndpoint{ + grpcName: { + Started: bot.Settings.EnableGRPC, + ListenAddr: "grpc://" + bot.Config.RemoteControl.GRPC.ListenAddress, + }, + grpcProxyName: { + Started: bot.Settings.EnableGRPCProxy, + ListenAddr: "http://" + bot.Config.RemoteControl.GRPC.GRPCProxyListenAddress, + }, + DeprecatedName: { + Started: bot.Settings.EnableDeprecatedRPC, + ListenAddr: "http://" + bot.Config.RemoteControl.DeprecatedRPC.ListenAddress, + }, + WebsocketName: { + Started: bot.Settings.EnableWebsocketRPC, + ListenAddr: "ws://" + bot.Config.RemoteControl.WebsocketRPC.ListenAddress, + }, + }, nil } // SetSubsystem enables or disables an engine subsystem func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error { + if bot == nil { + return errNilBot + } + + if bot.Config == nil { + return errNilConfig + } + var err error switch strings.ToLower(subSystemName) { case CommunicationsManagerName: @@ -227,7 +240,7 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error { } return bot.apiServer.StopWebsocketServer() case grpcName, grpcProxyName: - return errors.New("cannot manage GRPC subsystem via GRPC. Please manually change your config") + return errGRPCManagementFault case dataHistoryManagerName: if enable { if bot.dataHistoryManager == nil { @@ -251,8 +264,7 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error { } return bot.gctScriptManager.Stop() } - - return errors.New("subsystem not found") + return fmt.Errorf("%s: %w", subSystemName, errSubsystemNotFound) } // GetExchangeOTPs returns OTP codes for all exchanges which have a otpsecret diff --git a/engine/helpers_test.go b/engine/helpers_test.go index 6dfcc58a..f86261af 100644 --- a/engine/helpers_test.go +++ b/engine/helpers_test.go @@ -18,13 +18,18 @@ import ( "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" "github.com/thrasher-corp/gocryptotrader/common/file" + "github.com/thrasher-corp/gocryptotrader/communications" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/database" + "github.com/thrasher-corp/gocryptotrader/dispatch" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/stats" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/gctscript/vm" + "github.com/thrasher-corp/gocryptotrader/log" ) var testExchange = "Bitstamp" @@ -88,6 +93,147 @@ func CreateTestBot(t *testing.T) *Engine { return bot } +func TestGetSubsystemsStatus(t *testing.T) { + m := (&Engine{}).GetSubsystemsStatus() + if len(m) != 14 { + t.Fatalf("subsystem count is wrong expecting: %d but received: %d", 14, len(m)) + } +} + +func TestGetRPCEndpoints(t *testing.T) { + _, err := (&Engine{}).GetRPCEndpoints() + if !errors.Is(err, errNilConfig) { + t.Fatalf("received: %v, but expected: %v", err, errNilConfig) + } + + m, err := (&Engine{Config: &config.Config{}}).GetRPCEndpoints() + if !errors.Is(err, nil) { + t.Fatalf("received: %v, but expected: %v", err, nil) + } + if len(m) != 4 { + t.Fatalf("expected length: %d but received: %d", 4, len(m)) + } +} + +func TestSetSubsystem(t *testing.T) { + testCases := []struct { + Subsystem string + Engine *Engine + EnableError error + DisableError error + }{ + {Subsystem: "sillyBilly", EnableError: errNilBot, DisableError: errNilBot}, + {Subsystem: "sillyBilly", Engine: &Engine{}, EnableError: errNilConfig, DisableError: errNilConfig}, + {Subsystem: "sillyBilly", Engine: &Engine{Config: &config.Config{}}, EnableError: errSubsystemNotFound, DisableError: errSubsystemNotFound}, + { + Subsystem: CommunicationsManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: communications.ErrNoRelayersEnabled, + DisableError: ErrNilSubsystem, + }, + { + Subsystem: ConnectionManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: nil, + DisableError: nil, + }, + { + Subsystem: OrderManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: nil, + DisableError: nil, + }, + { + Subsystem: PortfolioManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: errNilExchangeManager, + DisableError: ErrNilSubsystem, + }, + { + Subsystem: NTPManagerName, + Engine: &Engine{Config: &config.Config{Logging: log.Config{Enabled: convert.BoolPtr(false)}}}, + EnableError: errNilNTPConfigValues, + DisableError: ErrNilSubsystem, + }, + { + Subsystem: DatabaseConnectionManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: database.ErrDatabaseSupportDisabled, + DisableError: ErrSubSystemNotStarted, + }, + { + Subsystem: SyncManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: errNoSyncItemsEnabled, + DisableError: ErrNilSubsystem, + }, + { + Subsystem: dispatch.Name, + Engine: &Engine{Config: &config.Config{}}, + EnableError: nil, + DisableError: nil, + }, + + { + Subsystem: DeprecatedName, + Engine: &Engine{Config: &config.Config{}, Settings: Settings{ConfigFile: config.DefaultFilePath()}}, + EnableError: errServerDisabled, + DisableError: ErrSubSystemNotStarted, + }, + { + Subsystem: WebsocketName, + Engine: &Engine{Config: &config.Config{}, Settings: Settings{ConfigFile: config.DefaultFilePath()}}, + EnableError: errServerDisabled, + DisableError: ErrSubSystemNotStarted, + }, + { + Subsystem: grpcName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: errGRPCManagementFault, + DisableError: errGRPCManagementFault}, + { + Subsystem: grpcProxyName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: errGRPCManagementFault, + DisableError: errGRPCManagementFault}, + { + Subsystem: dataHistoryManagerName, + Engine: &Engine{Config: &config.Config{}}, + EnableError: database.ErrNilInstance, + DisableError: ErrNilSubsystem, + }, + { + Subsystem: vm.Name, + Engine: &Engine{Config: &config.Config{}}, + EnableError: nil, + DisableError: nil, + }, + } + + for _, tt := range testCases { + tt := tt + t.Run(tt.Subsystem, func(t *testing.T) { + t.Parallel() + err := tt.Engine.SetSubsystem(tt.Subsystem, true) + if !errors.Is(err, tt.EnableError) { + t.Fatalf( + "while enabled %s subsystem received: %#v, but expected: %v", + tt.Subsystem, + err, + tt.EnableError) + } + err = tt.Engine.SetSubsystem(tt.Subsystem, false) + if !errors.Is(err, tt.DisableError) { + t.Fatalf( + "while disabling %s subsystem received: %#v, but expected: %v", + tt.Subsystem, + err, + tt.DisableError) + } + }) + } +} + func TestGetExchangeOTPs(t *testing.T) { t.Parallel() bot := CreateTestBot(t) diff --git a/engine/ntp_manager.go b/engine/ntp_manager.go index 56bb98d0..dcf0d445 100644 --- a/engine/ntp_manager.go +++ b/engine/ntp_manager.go @@ -18,7 +18,7 @@ func setupNTPManager(cfg *config.NTPClientConfig, loggingEnabled bool) (*ntpMana } if cfg.AllowedNegativeDifference == nil || cfg.AllowedDifference == nil { - return nil, errNilConfigValues + return nil, errNilNTPConfigValues } return &ntpManager{ shutdown: make(chan struct{}), diff --git a/engine/ntp_manager_test.go b/engine/ntp_manager_test.go index f1bc491d..94cde679 100644 --- a/engine/ntp_manager_test.go +++ b/engine/ntp_manager_test.go @@ -14,8 +14,8 @@ func TestSetupNTPManager(t *testing.T) { t.Errorf("error '%v', expected '%v'", err, errNilConfig) } _, err = setupNTPManager(&config.NTPClientConfig{}, false) - if !errors.Is(err, errNilConfigValues) { - t.Errorf("error '%v', expected '%v'", err, errNilConfigValues) + if !errors.Is(err, errNilNTPConfigValues) { + t.Errorf("error '%v', expected '%v'", err, errNilNTPConfigValues) } sec := time.Second cfg := &config.NTPClientConfig{ diff --git a/engine/ntp_manager_types.go b/engine/ntp_manager_types.go index d286b3d4..aaac2321 100644 --- a/engine/ntp_manager_types.go +++ b/engine/ntp_manager_types.go @@ -13,7 +13,7 @@ const ( ) var ( - errNilConfigValues = errors.New("nil allowed time differences received") + errNilNTPConfigValues = errors.New("nil allowed time differences received") errNTPManagerDisabled = errors.New("NTP manager disabled") ) diff --git a/engine/rpcserver.go b/engine/rpcserver.go index ab86fffd..fc4f9a88 100644 --- a/engine/rpcserver.go +++ b/engine/rpcserver.go @@ -186,24 +186,35 @@ func (s *RPCServer) StartRPCRESTProxy() { // GetInfo returns info about the current GoCryptoTrader session func (s *RPCServer) GetInfo(_ context.Context, _ *gctrpc.GetInfoRequest) (*gctrpc.GetInfoResponse, error) { - d := time.Since(s.uptime) - resp := gctrpc.GetInfoResponse{ - Uptime: d.String(), + rpcEndpoints, err := s.getRPCEndpoints() + if err != nil { + return nil, err + } + + return &gctrpc.GetInfoResponse{ + Uptime: time.Since(s.uptime).String(), EnabledExchanges: int64(s.Config.CountEnabledExchanges()), AvailableExchanges: int64(len(s.Config.Exchanges)), DefaultFiatCurrency: s.Config.Currency.FiatDisplayCurrency.String(), DefaultForexProvider: s.Config.GetPrimaryForexProvider(), SubsystemStatus: s.GetSubsystemsStatus(), + RpcEndpoints: rpcEndpoints, + }, nil +} + +func (s *RPCServer) getRPCEndpoints() (map[string]*gctrpc.RPCEndpoint, error) { + endpoints, err := s.Engine.GetRPCEndpoints() + if err != nil { + return nil, err } - endpoints := GetRPCEndpoints() - resp.RpcEndpoints = make(map[string]*gctrpc.RPCEndpoint) - for k, v := range endpoints { - resp.RpcEndpoints[k] = &gctrpc.RPCEndpoint{ - Started: v.Started, - ListenAddress: v.ListenAddr, + rpcEndpoints := make(map[string]*gctrpc.RPCEndpoint) + for key, val := range endpoints { + rpcEndpoints[key] = &gctrpc.RPCEndpoint{ + Started: val.Started, + ListenAddress: val.ListenAddr, } } - return &resp, nil + return rpcEndpoints, nil } // GetSubsystems returns a list of subsystems and their status @@ -233,16 +244,8 @@ func (s *RPCServer) DisableSubsystem(_ context.Context, r *gctrpc.GenericSubsyst // GetRPCEndpoints returns a list of API endpoints func (s *RPCServer) GetRPCEndpoints(_ context.Context, _ *gctrpc.GetRPCEndpointsRequest) (*gctrpc.GetRPCEndpointsResponse, error) { - endpoints := GetRPCEndpoints() - var resp gctrpc.GetRPCEndpointsResponse - resp.Endpoints = make(map[string]*gctrpc.RPCEndpoint) - for k, v := range endpoints { - resp.Endpoints[k] = &gctrpc.RPCEndpoint{ - Started: v.Started, - ListenAddress: v.ListenAddr, - } - } - return &resp, nil + endpoint, err := s.getRPCEndpoints() + return &gctrpc.GetRPCEndpointsResponse{Endpoints: endpoint}, err } // GetCommunicationRelayers returns the status of the engines communication relayers diff --git a/gctscript/vm/manager.go b/gctscript/vm/manager.go index 477de66e..ea6a3742 100644 --- a/gctscript/vm/manager.go +++ b/gctscript/vm/manager.go @@ -15,6 +15,8 @@ const ( Name = "gctscript" ) +var ErrNilSubsystem = errors.New("gct script has not been set up") + // GctScriptManager loads and runs GCT Tengo scripts type GctScriptManager struct { config *Config @@ -61,6 +63,9 @@ func (g *GctScriptManager) Start(wg *sync.WaitGroup) (err error) { // Stop stops gctscript subsystem along with all running Virtual Machines func (g *GctScriptManager) Stop() error { + if g == nil { + return fmt.Errorf("%s %w", caseName, ErrNilSubsystem) + } if atomic.LoadInt32(&g.started) == 0 { return fmt.Errorf("%s not running", caseName) }