mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
Engine/GCTScript: Refactor script manager (#580)
* refactor script manager
* remove singleton GCTScriptConfig
* create constant for ".gct" extension
* move GctScriptManager into vm package
* reduce script manager global dependencies
* use manager struct to store runtime override values
* enable/disable scripting subsystem now doesn't store the setting in
config (aligned with other subsystems)
* setting max VMs via start option doesn't change config
* instantiate scriptmanager as part of creating a new Engine
* script manager config is now set during instantiation
* run script manager when enabled in conf or explicitly enabled
* use the Started() method to check if script manager is running
* in tests set script manager as running
* script manager adjustments
* create manager before attempting overrides
* check for nil config when creating script manager
* fix script manager waitgroup counter increased too late
* move autoload() function to autoload.go
* add tests to script manager
This commit is contained in:
@@ -33,7 +33,7 @@ type Engine struct {
|
||||
NTPManager ntpManager
|
||||
ConnectionManager connectionManager
|
||||
DatabaseManager databaseManager
|
||||
GctScriptManager gctScriptManager
|
||||
GctScriptManager *gctscript.GctScriptManager
|
||||
OrderManager orderManager
|
||||
PortfolioManager portfolioManager
|
||||
CommsManager commsManager
|
||||
@@ -58,6 +58,10 @@ func New() (*Engine, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load config. Err: %s", err)
|
||||
}
|
||||
b.GctScriptManager, err = gctscript.NewManager(&b.Config.GCTScript)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create script manager. Err: %s", err)
|
||||
}
|
||||
|
||||
return &b, nil
|
||||
}
|
||||
@@ -91,7 +95,13 @@ func NewFromSettings(settings *Settings, flagSet map[string]bool) (*Engine, erro
|
||||
return nil, fmt.Errorf("unable to adjust runtime GOMAXPROCS value. Err: %s", err)
|
||||
}
|
||||
|
||||
b.GctScriptManager, err = gctscript.NewManager(&b.Config.GCTScript)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create script manager. Err: %s", err)
|
||||
}
|
||||
|
||||
validateSettings(&b, settings, flagSet)
|
||||
|
||||
return &b, nil
|
||||
}
|
||||
|
||||
@@ -129,7 +139,7 @@ func validateSettings(b *Engine, s *Settings, flagSet map[string]bool) {
|
||||
b.Settings.EnableAllPairs = s.EnableAllPairs
|
||||
b.Settings.EnableCoinmarketcapAnalysis = s.EnableCoinmarketcapAnalysis
|
||||
b.Settings.EnableDatabaseManager = s.EnableDatabaseManager
|
||||
b.Settings.EnableGCTScriptManager = s.EnableGCTScriptManager
|
||||
b.Settings.EnableGCTScriptManager = s.EnableGCTScriptManager && (flagSet["gctscriptmanager"] || b.Config.GCTScript.Enabled)
|
||||
b.Settings.MaxVirtualMachines = s.MaxVirtualMachines
|
||||
b.Settings.EnableDispatcher = s.EnableDispatcher
|
||||
b.Settings.EnablePortfolioManager = s.EnablePortfolioManager
|
||||
@@ -166,12 +176,9 @@ func validateSettings(b *Engine, s *Settings, flagSet map[string]bool) {
|
||||
b.Settings.EnableDeprecatedRPC = b.Config.RemoteControl.DeprecatedRPC.Enabled
|
||||
}
|
||||
|
||||
if flagSet["gctscriptmanager"] {
|
||||
gctscript.GCTScriptConfig.Enabled = s.EnableGCTScriptManager
|
||||
}
|
||||
|
||||
if flagSet["maxvirtualmachines"] {
|
||||
gctscript.GCTScriptConfig.MaxVirtualMachines = uint8(s.MaxVirtualMachines)
|
||||
maxMachines := uint8(s.MaxVirtualMachines)
|
||||
b.GctScriptManager.MaxVirtualMachines = &maxMachines
|
||||
}
|
||||
|
||||
if flagSet["withdrawcachesize"] {
|
||||
@@ -471,10 +478,8 @@ func (bot *Engine) Start() error {
|
||||
}
|
||||
|
||||
if bot.Settings.EnableGCTScriptManager {
|
||||
if bot.Config.GCTScript.Enabled {
|
||||
if err := bot.GctScriptManager.Start(); err != nil {
|
||||
gctlog.Errorf(gctlog.Global, "GCTScript manager unable to start: %v", err)
|
||||
}
|
||||
if err := bot.GctScriptManager.Start(&bot.ServicesWG); err != nil {
|
||||
gctlog.Errorf(gctlog.Global, "GCTScript manager unable to start: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -85,25 +85,6 @@ type Settings struct {
|
||||
}
|
||||
|
||||
const (
|
||||
// ErrSubSystemAlreadyStarted message to return when a subsystem is already started
|
||||
ErrSubSystemAlreadyStarted = "manager already started"
|
||||
// ErrSubSystemAlreadyStopped message to return when a subsystem is already stopped
|
||||
ErrSubSystemAlreadyStopped = "already stopped"
|
||||
// ErrSubSystemNotStarted message to return when subsystem not started
|
||||
ErrSubSystemNotStarted = "not started"
|
||||
|
||||
// ErrScriptFailedValidation message to display when a script fails its validation
|
||||
ErrScriptFailedValidation string = "validation failed"
|
||||
// MsgSubSystemStarting message to return when subsystem is starting up
|
||||
MsgSubSystemStarting = "manager starting..."
|
||||
// MsgSubSystemStarted message to return when subsystem has started
|
||||
MsgSubSystemStarted = "started."
|
||||
|
||||
// MsgSubSystemShuttingDown message to return when a subsystem is shutting down
|
||||
MsgSubSystemShuttingDown = "shutting down..."
|
||||
// MsgSubSystemShutdown message to return when a subsystem has shutdown
|
||||
MsgSubSystemShutdown = "manager shutdown."
|
||||
|
||||
// MsgStatusOK message to display when status is "OK"
|
||||
MsgStatusOK string = "ok"
|
||||
// MsgStatusSuccess message to display when status is successful
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/thrasher-corp/gocryptotrader/gctscript/vm"
|
||||
"github.com/thrasher-corp/gocryptotrader/log"
|
||||
)
|
||||
|
||||
const gctscriptManagerName = "GCTScript"
|
||||
|
||||
type gctScriptManager struct {
|
||||
started int32
|
||||
stopped int32
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
// Started returns if gctscript manager subsystem is started
|
||||
func (g *gctScriptManager) Started() bool {
|
||||
return atomic.LoadInt32(&g.started) == 1
|
||||
}
|
||||
|
||||
// Start starts gctscript subsystem and creates shutdown channel
|
||||
func (g *gctScriptManager) Start() (err error) {
|
||||
if atomic.AddInt32(&g.started, 1) != 1 {
|
||||
return fmt.Errorf("%s %s", gctscriptManagerName, ErrSubSystemAlreadyStarted)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
atomic.CompareAndSwapInt32(&g.started, 1, 0)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debugln(log.Global, gctscriptManagerName, MsgSubSystemStarting)
|
||||
|
||||
g.shutdown = make(chan struct{})
|
||||
go g.run()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops gctscript subsystem along with all running Virtual Machines
|
||||
func (g *gctScriptManager) Stop() error {
|
||||
if atomic.LoadInt32(&g.started) == 0 {
|
||||
return fmt.Errorf("%s %s", gctscriptManagerName, ErrSubSystemNotStarted)
|
||||
}
|
||||
|
||||
if atomic.AddInt32(&g.stopped, 1) != 1 {
|
||||
return fmt.Errorf("%s %s", gctscriptManagerName, ErrSubSystemAlreadyStopped)
|
||||
}
|
||||
|
||||
log.Debugln(log.GCTScriptMgr, gctscriptManagerName, MsgSubSystemShuttingDown)
|
||||
close(g.shutdown)
|
||||
err := vm.ShutdownAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *gctScriptManager) run() {
|
||||
log.Debugln(log.Global, gctscriptManagerName, MsgSubSystemStarted)
|
||||
|
||||
Bot.ServicesWG.Add(1)
|
||||
vm.SetDefaultScriptOutput()
|
||||
g.autoLoad()
|
||||
defer func() {
|
||||
atomic.CompareAndSwapInt32(&g.stopped, 1, 0)
|
||||
atomic.CompareAndSwapInt32(&g.started, 1, 0)
|
||||
Bot.ServicesWG.Done()
|
||||
log.Debugln(log.GCTScriptMgr, gctscriptManagerName, MsgSubSystemShutdown)
|
||||
}()
|
||||
|
||||
<-g.shutdown
|
||||
}
|
||||
|
||||
func (g *gctScriptManager) autoLoad() {
|
||||
for x := range Bot.Config.GCTScript.AutoLoad {
|
||||
temp := vm.New()
|
||||
if temp == nil {
|
||||
log.Errorf(log.GCTScriptMgr, "Unable to create Virtual Machine, autoload failed for: %v",
|
||||
Bot.Config.GCTScript.AutoLoad[x])
|
||||
continue
|
||||
}
|
||||
var name = Bot.Config.GCTScript.AutoLoad[x]
|
||||
if filepath.Ext(name) != ".gct" {
|
||||
name += ".gct"
|
||||
}
|
||||
scriptPath := filepath.Join(vm.ScriptPath, name)
|
||||
err := temp.Load(scriptPath)
|
||||
if err != nil {
|
||||
log.Errorf(log.GCTScriptMgr, "%v failed to load: %v", filepath.Base(scriptPath), err)
|
||||
continue
|
||||
}
|
||||
go temp.CompileAndRun()
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/stats"
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
|
||||
"github.com/thrasher-corp/gocryptotrader/gctscript/vm"
|
||||
"github.com/thrasher-corp/gocryptotrader/log"
|
||||
"github.com/thrasher-corp/gocryptotrader/portfolio"
|
||||
)
|
||||
@@ -131,10 +130,8 @@ func (bot *Engine) SetSubsystem(subsys string, enable bool) error {
|
||||
return dispatch.Stop()
|
||||
case "gctscript":
|
||||
if enable {
|
||||
vm.GCTScriptConfig.Enabled = true
|
||||
return bot.GctScriptManager.Start()
|
||||
return bot.GctScriptManager.Start(&bot.ServicesWG)
|
||||
}
|
||||
vm.GCTScriptConfig.Enabled = false
|
||||
return bot.GctScriptManager.Stop()
|
||||
}
|
||||
|
||||
|
||||
@@ -1844,7 +1844,7 @@ func fillMissingCandlesWithStoredTrades(startTime, endTime time.Time, klineItem
|
||||
|
||||
// GCTScriptStatus returns a slice of current running scripts that includes next run time and uuid
|
||||
func (s *RPCServer) GCTScriptStatus(_ context.Context, r *gctrpc.GCTScriptStatusRequest) (*gctrpc.GCTScriptStatusResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GCTScriptStatusResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -1853,7 +1853,7 @@ func (s *RPCServer) GCTScriptStatus(_ context.Context, r *gctrpc.GCTScriptStatus
|
||||
}
|
||||
|
||||
resp := &gctrpc.GCTScriptStatusResponse{
|
||||
Status: fmt.Sprintf("%v of %v virtual machines running", gctscript.VMSCount.Len(), gctscript.GCTScriptConfig.MaxVirtualMachines),
|
||||
Status: fmt.Sprintf("%v of %v virtual machines running", gctscript.VMSCount.Len(), s.GctScriptManager.GetMaxVirtualMachines()),
|
||||
}
|
||||
|
||||
gctscript.AllVMSync.Range(func(k, v interface{}) bool {
|
||||
@@ -1872,7 +1872,7 @@ func (s *RPCServer) GCTScriptStatus(_ context.Context, r *gctrpc.GCTScriptStatus
|
||||
|
||||
// GCTScriptQuery queries a running script and returns script running information
|
||||
func (s *RPCServer) GCTScriptQuery(_ context.Context, r *gctrpc.GCTScriptQueryRequest) (*gctrpc.GCTScriptQueryResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GCTScriptQueryResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -1903,7 +1903,7 @@ func (s *RPCServer) GCTScriptQuery(_ context.Context, r *gctrpc.GCTScriptQueryRe
|
||||
|
||||
// GCTScriptExecute execute a script
|
||||
func (s *RPCServer) GCTScriptExecute(_ context.Context, r *gctrpc.GCTScriptExecuteRequest) (*gctrpc.GenericResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -1911,7 +1911,7 @@ func (s *RPCServer) GCTScriptExecute(_ context.Context, r *gctrpc.GCTScriptExecu
|
||||
r.Script.Path = gctscript.ScriptPath
|
||||
}
|
||||
|
||||
gctVM := gctscript.New()
|
||||
gctVM := s.GctScriptManager.New()
|
||||
if gctVM == nil {
|
||||
return &gctrpc.GenericResponse{Status: MsgStatusError, Data: "unable to create VM instance"}, nil
|
||||
}
|
||||
@@ -1935,7 +1935,7 @@ func (s *RPCServer) GCTScriptExecute(_ context.Context, r *gctrpc.GCTScriptExecu
|
||||
|
||||
// GCTScriptStop terminate a running script
|
||||
func (s *RPCServer) GCTScriptStop(_ context.Context, r *gctrpc.GCTScriptStopRequest) (*gctrpc.GenericResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -1957,7 +1957,7 @@ func (s *RPCServer) GCTScriptStop(_ context.Context, r *gctrpc.GCTScriptStopRequ
|
||||
|
||||
// GCTScriptUpload upload a new script to ScriptPath
|
||||
func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUploadRequest) (*gctrpc.GenericResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -2013,7 +2013,7 @@ func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUpload
|
||||
}
|
||||
var failedFiles []string
|
||||
for x := range files {
|
||||
err = gctscript.Validate(files[x])
|
||||
err = s.GctScriptManager.Validate(files[x])
|
||||
if err != nil {
|
||||
failedFiles = append(failedFiles, files[x])
|
||||
}
|
||||
@@ -2027,16 +2027,16 @@ func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUpload
|
||||
if err != nil {
|
||||
log.Errorf(log.GCTScriptMgr, "Failed to remove file %v (%v), manual deletion required", filepath.Base(fPath), err)
|
||||
}
|
||||
return &gctrpc.GenericResponse{Status: ErrScriptFailedValidation, Data: strings.Join(failedFiles, ", ")}, nil
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptFailedValidation, Data: strings.Join(failedFiles, ", ")}, nil
|
||||
}
|
||||
} else {
|
||||
err = gctscript.Validate(fPath)
|
||||
err = s.GctScriptManager.Validate(fPath)
|
||||
if err != nil {
|
||||
errRemove := os.Remove(fPath)
|
||||
if errRemove != nil {
|
||||
log.Errorf(log.GCTScriptMgr, "Failed to remove file %v, manual deletion required: %v", filepath.Base(fPath), errRemove)
|
||||
}
|
||||
return &gctrpc.GenericResponse{Status: ErrScriptFailedValidation, Data: err.Error()}, nil
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptFailedValidation, Data: err.Error()}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2048,7 +2048,7 @@ func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUpload
|
||||
|
||||
// GCTScriptReadScript read a script and return contents
|
||||
func (s *RPCServer) GCTScriptReadScript(_ context.Context, r *gctrpc.GCTScriptReadScriptRequest) (*gctrpc.GCTScriptQueryResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GCTScriptQueryResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -2073,7 +2073,7 @@ func (s *RPCServer) GCTScriptReadScript(_ context.Context, r *gctrpc.GCTScriptRe
|
||||
|
||||
// GCTScriptListAll lists all scripts inside the default script path
|
||||
func (s *RPCServer) GCTScriptListAll(context.Context, *gctrpc.GCTScriptListAllRequest) (*gctrpc.GCTScriptStatusResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GCTScriptStatusResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
@@ -2083,7 +2083,7 @@ func (s *RPCServer) GCTScriptListAll(context.Context, *gctrpc.GCTScriptListAllRe
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if filepath.Ext(path) == ".gct" {
|
||||
if filepath.Ext(path) == common.GctExt {
|
||||
resp.Scripts = append(resp.Scripts, &gctrpc.GCTScript{
|
||||
Name: path,
|
||||
})
|
||||
@@ -2099,11 +2099,11 @@ func (s *RPCServer) GCTScriptListAll(context.Context, *gctrpc.GCTScriptListAllRe
|
||||
|
||||
// GCTScriptStopAll stops all running scripts
|
||||
func (s *RPCServer) GCTScriptStopAll(context.Context, *gctrpc.GCTScriptStopAllRequest) (*gctrpc.GenericResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
err := gctscript.ShutdownAll()
|
||||
err := s.GctScriptManager.ShutdownAll()
|
||||
if err != nil {
|
||||
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
|
||||
}
|
||||
@@ -2116,19 +2116,19 @@ func (s *RPCServer) GCTScriptStopAll(context.Context, *gctrpc.GCTScriptStopAllRe
|
||||
|
||||
// GCTScriptAutoLoadToggle adds or removes an entry to the autoload list
|
||||
func (s *RPCServer) GCTScriptAutoLoadToggle(_ context.Context, r *gctrpc.GCTScriptAutoLoadRequest) (*gctrpc.GenericResponse, error) {
|
||||
if !gctscript.GCTScriptConfig.Enabled {
|
||||
if !s.GctScriptManager.Started() {
|
||||
return &gctrpc.GenericResponse{Status: gctscript.ErrScriptingDisabled.Error()}, nil
|
||||
}
|
||||
|
||||
if r.Status {
|
||||
err := gctscript.Autoload(r.Script, true)
|
||||
err := s.GctScriptManager.Autoload(r.Script, true)
|
||||
if err != nil {
|
||||
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
|
||||
}
|
||||
return &gctrpc.GenericResponse{Status: "success", Data: "script " + r.Script + " removed from autoload list"}, nil
|
||||
}
|
||||
|
||||
err := gctscript.Autoload(r.Script, false)
|
||||
err := s.GctScriptManager.Autoload(r.Script, false)
|
||||
if err != nil {
|
||||
return &gctrpc.GenericResponse{Status: "error", Data: err.Error()}, nil
|
||||
}
|
||||
|
||||
20
engine/subsystem/subsystem.go
Normal file
20
engine/subsystem/subsystem.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package subsystem
|
||||
|
||||
const (
|
||||
// ErrSubSystemAlreadyStarted message to return when a subsystem is already started
|
||||
ErrSubSystemAlreadyStarted = "manager already started"
|
||||
// ErrSubSystemAlreadyStopped message to return when a subsystem is already stopped
|
||||
ErrSubSystemAlreadyStopped = "already stopped"
|
||||
// ErrSubSystemNotStarted message to return when subsystem not started
|
||||
ErrSubSystemNotStarted = "not started"
|
||||
|
||||
// MsgSubSystemStarting message to return when subsystem is starting up
|
||||
MsgSubSystemStarting = "manager starting..."
|
||||
// MsgSubSystemStarted message to return when subsystem has started
|
||||
MsgSubSystemStarted = "started."
|
||||
|
||||
// MsgSubSystemShuttingDown message to return when a subsystem is shutting down
|
||||
MsgSubSystemShuttingDown = "shutting down..."
|
||||
// MsgSubSystemShutdown message to return when a subsystem has shutdown
|
||||
MsgSubSystemShutdown = "manager shutdown."
|
||||
)
|
||||
Reference in New Issue
Block a user