Daily engine improvements

Link up various subsystems to be managed atomically with the ability to start/stop them
New subsystem APIs
Comms changes
This commit is contained in:
Adrian Gallagher
2019-06-13 17:30:50 +10:00
parent 33085318c4
commit 6b2cfe7905
20 changed files with 1731 additions and 390 deletions

View File

@@ -37,6 +37,152 @@ func getInfo(_ *cli.Context) error {
return nil
}
var getSubsystemsCommand = cli.Command{
Name: "getsubsystems",
Usage: "gets GoCryptoTrader subsystems and their status",
Action: getSubsystems,
}
func getSubsystems(_ *cli.Context) error {
conn, err := setupClient()
if err != nil {
return err
}
defer conn.Close()
client := gctrpc.NewGoCryptoTraderClient(conn)
result, err := client.GetSubsystems(context.Background(),
&gctrpc.GetSubsystemsRequest{},
)
if err != nil {
return err
}
jsonOutput(result)
return nil
}
var enableSubsystemCommand = cli.Command{
Name: "enablesubsystem",
Usage: "enables an engine subsystem",
ArgsUsage: "<subsystem>",
Action: enableSubsystem,
Flags: []cli.Flag{
cli.StringFlag{
Name: "subsystem",
Usage: "the subsystem to enable",
},
},
}
func enableSubsystem(c *cli.Context) error {
if c.NArg() == 0 && c.NumFlags() == 0 {
cli.ShowCommandHelp(c, "enablesubsystem")
return nil
}
conn, err := setupClient()
if err != nil {
return err
}
defer conn.Close()
var subsystemName string
if c.IsSet("subsystem") {
subsystemName = c.String("subsystem")
} else {
subsystemName = c.Args().First()
}
client := gctrpc.NewGoCryptoTraderClient(conn)
result, err := client.EnableSubsystem(context.Background(),
&gctrpc.GenericSubsystemRequest{
Subsystem: subsystemName,
},
)
if err != nil {
return err
}
jsonOutput(result)
return nil
}
var disableSubsystemCommand = cli.Command{
Name: "disablesubsystem",
Usage: "disables an engine subsystem",
ArgsUsage: "<subsystem>",
Action: disableSubsystem,
Flags: []cli.Flag{
cli.StringFlag{
Name: "subsystem",
Usage: "the subsystem to disable",
},
},
}
func disableSubsystem(c *cli.Context) error {
if c.NArg() == 0 && c.NumFlags() == 0 {
cli.ShowCommandHelp(c, "disablesubsystem")
return nil
}
conn, err := setupClient()
if err != nil {
return err
}
defer conn.Close()
var subsystemName string
if c.IsSet("subsystem") {
subsystemName = c.String("subsystem")
} else {
subsystemName = c.Args().First()
}
client := gctrpc.NewGoCryptoTraderClient(conn)
result, err := client.DisableSubsystem(context.Background(),
&gctrpc.GenericSubsystemRequest{
Subsystem: subsystemName,
},
)
if err != nil {
return err
}
jsonOutput(result)
return nil
}
var getRPCEndpointsCommand = cli.Command{
Name: "getrpcendpoints",
Usage: "gets GoCryptoTrader endpoints info",
Action: getRPCEndpoints,
}
func getRPCEndpoints(_ *cli.Context) error {
conn, err := setupClient()
if err != nil {
return err
}
defer conn.Close()
client := gctrpc.NewGoCryptoTraderClient(conn)
result, err := client.GetRPCEndpoints(context.Background(),
&gctrpc.GetRPCEndpointsRequest{},
)
if err != nil {
return err
}
jsonOutput(result)
return nil
}
var getExchangesCommand = cli.Command{
Name: "getexchanges",
Usage: "gets a list of enabled or available exchanges",

View File

@@ -85,6 +85,10 @@ func main() {
}
app.Commands = []cli.Command{
getInfoCommand,
getSubsystemsCommand,
enableSubsystemCommand,
disableSubsystemCommand,
getRPCEndpointsCommand,
getExchangesCommand,
enableExchangeCommand,
disableExchangeCommand,

View File

@@ -1,6 +1,7 @@
package base
import (
"errors"
"time"
"github.com/thrasher-/gocryptotrader/config"
@@ -50,7 +51,7 @@ func (c IComm) PushEvent(event Event) {
// GetEnabledCommunicationMediums prints out enabled and connected communication
// packages
// (#debug output only)
func (c IComm) GetEnabledCommunicationMediums() {
func (c IComm) GetEnabledCommunicationMediums() error {
var count int
for i := range c {
if c[i].IsEnabled() && c[i].IsConnected() {
@@ -59,6 +60,7 @@ func (c IComm) GetEnabledCommunicationMediums() {
}
}
if count == 0 {
log.Warnf("Communications: No communication mediums are enabled.")
return errors.New("no communication mediums are enabled")
}
return nil
}

View File

@@ -1,6 +1,8 @@
package communications
import (
"errors"
"github.com/thrasher-/gocryptotrader/communications/base"
"github.com/thrasher-/gocryptotrader/communications/slack"
"github.com/thrasher-/gocryptotrader/communications/smsglobal"
@@ -15,9 +17,12 @@ type Communications struct {
}
// NewComm sets up and returns a pointer to a Communications object
func NewComm(cfg *config.CommunicationsConfig) *Communications {
var comm Communications
func NewComm(cfg *config.CommunicationsConfig) (*Communications, error) {
if !cfg.IsAnyEnabled() {
return nil, errors.New("no communication relayers enabled")
}
var comm Communications
if cfg.TelegramConfig.Enabled {
Telegram := new(telegram.Telegram)
Telegram.Setup(cfg)
@@ -43,5 +48,5 @@ func NewComm(cfg *config.CommunicationsConfig) *Communications {
}
comm.Setup()
return &comm
return &comm, nil
}

View File

@@ -8,18 +8,19 @@ import (
func TestNewComm(t *testing.T) {
var cfg config.CommunicationsConfig
communications := NewComm(&cfg)
if len(communications.IComm) != 0 {
t.Errorf("Test failed, communications NewComm, expected len 0, got len %d",
len(communications.IComm))
_, err := NewComm(&cfg)
if err == nil {
t.Error("NewComm should failed on no enabled communication mediums")
}
cfg.TelegramConfig.Enabled = true
cfg.SMSGlobalConfig.Enabled = true
cfg.SMTPConfig.Enabled = true
cfg.SlackConfig.Enabled = true
communications = NewComm(&cfg)
communications, err := NewComm(&cfg)
if err != nil {
t.Error("Unexpected result")
}
if len(communications.IComm) != 4 {
t.Errorf("Test failed, communications NewComm, expected len 4, got len %d",

View File

@@ -202,6 +202,18 @@ type CommunicationsConfig struct {
TelegramConfig TelegramConfig `json:"telegram"`
}
// IsAnyEnabled returns whether or any any comms relayers
// are enabled
func (c *CommunicationsConfig) IsAnyEnabled() bool {
if c.SMSGlobalConfig.Enabled ||
c.SMTPConfig.Enabled ||
c.SlackConfig.Enabled ||
c.TelegramConfig.Enabled {
return true
}
return false
}
// SlackConfig holds all variables to start and run the Slack package
type SlackConfig struct {
Name string `json:"name"`

87
engine/comms_relayer.go Normal file
View File

@@ -0,0 +1,87 @@
package engine
import (
"errors"
"sync/atomic"
"github.com/thrasher-/gocryptotrader/communications"
"github.com/thrasher-/gocryptotrader/communications/base"
log "github.com/thrasher-/gocryptotrader/logger"
)
// commsManager starts the NTP manager
type commsManager struct {
started int32
stopped int32
shutdown chan struct{}
relayMsg chan base.Event
comms *communications.Communications
}
func (c *commsManager) Started() bool {
return atomic.LoadInt32(&c.started) == 1
}
func (c *commsManager) Start() (err error) {
if atomic.AddInt32(&c.started, 1) != 1 {
return errors.New("communications manager already started")
}
defer func() {
if err != nil {
atomic.CompareAndSwapInt32(&c.started, 1, 0)
}
}()
log.Debugln("Communications manager starting...")
commsCfg := Bot.Config.GetCommunicationsConfig()
c.comms, err = communications.NewComm(&commsCfg)
if err != nil {
return err
}
c.shutdown = make(chan struct{})
c.relayMsg = make(chan base.Event)
go c.run()
log.Debugln("Communications manager started.")
return nil
}
func (c *commsManager) Stop() error {
if atomic.LoadInt32(&c.started) == 0 {
return errors.New("communications manager not started")
}
if atomic.AddInt32(&c.stopped, 1) != 1 {
return errors.New("communications manager is already stopped")
}
close(c.shutdown)
log.Debugln("Communications manager shutting down...")
return nil
}
func (c *commsManager) PushEvent(evt base.Event) {
if !c.Started() {
return
}
c.relayMsg <- evt
}
func (c *commsManager) run() {
defer func() {
// TO-DO shutdown comms connections for connected services (Slack etc)
atomic.CompareAndSwapInt32(&c.stopped, 1, 0)
atomic.CompareAndSwapInt32(&c.started, 1, 0)
log.Debugln("Communications manager shutdown.")
}()
for {
select {
case msg := <-c.relayMsg:
c.comms.PushEvent(msg)
case <-c.shutdown:
return
}
}
}

View File

@@ -49,6 +49,8 @@ func (c *connectionManager) Stop() error {
log.Debugln("Connection manager shutting down...")
c.conn.Shutdown()
atomic.CompareAndSwapInt32(&c.stopped, 1, 0)
atomic.CompareAndSwapInt32(&c.started, 1, 0)
log.Debugln("Connection manager stopped.")
return nil
}

View File

@@ -12,15 +12,12 @@ import (
"time"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/communications"
"github.com/thrasher-/gocryptotrader/config"
"github.com/thrasher-/gocryptotrader/currency"
"github.com/thrasher-/gocryptotrader/currency/coinmarketcap"
"github.com/thrasher-/gocryptotrader/engine/events"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/request"
log "github.com/thrasher-/gocryptotrader/logger"
"github.com/thrasher-/gocryptotrader/ntpclient"
"github.com/thrasher-/gocryptotrader/portfolio"
"github.com/thrasher-/gocryptotrader/utils"
)
@@ -32,10 +29,11 @@ type Engine struct {
Portfolio *portfolio.Base
Exchanges []exchange.IBotExchange
ExchangeCurrencyPairManager *ExchangeCurrencyPairSyncer
NTPManager ntpManager
ConnectionManager connectionManager
OrderManager orderManager
PortfolioManager portfolioManager
CommsRelayer *communications.Communications
CommsManager commsManager
Shutdown chan struct{}
Settings Settings
CryptocurrencyDepositAddresses map[string]map[string]string
@@ -148,11 +146,10 @@ func ValidateSettings(b *Engine, s *Settings) {
b.Settings.EnableEventManager = s.EnableEventManager
if b.Settings.EnableEventManager {
events.Verbose = b.Settings.Verbose
if b.Settings.EventManagerDelay != time.Duration(0) && s.EventManagerDelay > 0 {
b.Settings.EventManagerDelay = s.EventManagerDelay
} else {
b.Settings.EventManagerDelay = events.SleepDelay
b.Settings.EventManagerDelay = EventSleepDelay
}
}
@@ -273,27 +270,8 @@ func (e *Engine) Start() {
}
if e.Settings.EnableNTPClient {
if e.Config.NTPClient.Level != -1 {
NTPTime, errNTP := ntpclient.NTPClient(e.Config.NTPClient.Pool)
currentTime := time.Now()
if errNTP != nil {
log.Warnf("NTPClient failed to create: %v", errNTP)
} else {
NTPcurrentTimeDifference := NTPTime.Sub(currentTime)
configNTPTime := *e.Config.NTPClient.AllowedDifference
configNTPNegativeTime := (*e.Config.NTPClient.AllowedNegativeDifference - (*e.Config.NTPClient.AllowedNegativeDifference * 2))
if NTPcurrentTimeDifference > configNTPTime || NTPcurrentTimeDifference < configNTPNegativeTime {
log.Warnf("Time out of sync (NTP): %v | (time.Now()): %v | (Difference): %v | (Allowed): +%v / %v", NTPTime, currentTime, NTPcurrentTimeDifference, configNTPTime, configNTPNegativeTime)
if e.Config.NTPClient.Level == 0 {
disable, errNTP := e.Config.DisableNTPCheck(os.Stdin)
if errNTP != nil {
log.Errorf("failed to disable ntp time check reason: %v", errNTP)
} else {
log.Info(disable)
}
}
}
}
if err := e.NTPManager.Start(); err != nil {
log.Errorf("NTP manager unable to start: %v", err)
}
}
@@ -322,10 +300,9 @@ func (e *Engine) Start() {
}
if e.Settings.EnableCommsRelayer {
log.Debugln("Starting communication mediums..")
commsCfg := e.Config.GetCommunicationsConfig()
e.CommsRelayer = communications.NewComm(&commsCfg)
e.CommsRelayer.GetEnabledCommunicationMediums()
if err := e.CommsManager.Start(); err != nil {
log.Errorf("Communications manager unable to start: %v", err)
}
}
var newFxSettings []currency.FXSettings
@@ -398,7 +375,7 @@ func (e *Engine) Start() {
}
if e.Settings.EnableEventManager {
go events.EventManger()
go EventManger()
}
<-e.Shutdown
@@ -419,6 +396,18 @@ func (e *Engine) Stop() {
}
}
if e.NTPManager.Started() {
if err := e.NTPManager.Stop(); err != nil {
log.Errorf("NTP manager unable to stop. Error: %v", err)
}
}
if e.CommsManager.Started() {
if err := e.CommsManager.Stop(); err != nil {
log.Errorf("Communication manager unable to stop. Error: %v", err)
}
}
if e.PortfolioManager.Started() {
if err := e.PortfolioManager.Stop(); err != nil {
log.Errorf("Fund manager unable to stop. Error: %v", err)

View File

@@ -1,4 +1,4 @@
package events
package engine
import (
"errors"
@@ -6,7 +6,6 @@ import (
"strings"
"time"
"github.com/thrasher-/gocryptotrader/communications"
"github.com/thrasher-/gocryptotrader/communications/base"
"github.com/thrasher-/gocryptotrader/config"
"github.com/thrasher-/gocryptotrader/currency"
@@ -16,6 +15,8 @@ import (
log "github.com/thrasher-/gocryptotrader/logger"
)
// TO-DO MAKE THIS A SERVICE SUBSYSTEM
// Event const vars
const (
ItemPrice = "PRICE"
@@ -32,7 +33,6 @@ const (
ActionTest = "ACTION_TEST"
defaultSleepDelay = time.Millisecond * 500
defaultVerbose = true
)
// vars related to events package
@@ -41,16 +41,11 @@ var (
errInvalidCondition = errors.New("invalid conditional option")
errInvalidAction = errors.New("invalid action")
errExchangeDisabled = errors.New("desired exchange is disabled")
SleepDelay = defaultSleepDelay
Verbose = defaultVerbose
// NOTE comms is an interim implementation
comms *communications.Communications
EventSleepDelay = defaultSleepDelay
)
// ConditionParams holds the event condition variables
type ConditionParams struct {
// EventConditionParams holds the event condition variables
type EventConditionParams struct {
Condition string
Price float64
@@ -64,7 +59,7 @@ type Event struct {
ID int64
Exchange string
Item string
Condition ConditionParams
Condition EventConditionParams
Pair currency.Pair
Asset assets.AssetType
Action string
@@ -75,15 +70,9 @@ type Event struct {
// appended
var Events []*Event
// SetComms is an interim function that will support a median integration. This
// sets the current comms package.
func SetComms(commsP *communications.Communications) {
comms = commsP
}
// Add adds an event to the Events chain and returns an index/eventID
// and an error
func Add(exchange, item string, condition ConditionParams, currencyPair currency.Pair, asset assets.AssetType, action string) (int64, error) {
func Add(exchange, item string, condition EventConditionParams, currencyPair currency.Pair, asset assets.AssetType, action string) (int64, error) {
err := IsValidEvent(exchange, item, condition, action)
if err != nil {
return 0, err
@@ -139,7 +128,7 @@ func (e *Event) ExecuteAction() bool {
if action[0] == ActionSMSNotify {
message := fmt.Sprintf("Event triggered: %s", e.String())
if action[1] == "ALL" {
comms.PushEvent(base.Event{
Bot.CommsManager.PushEvent(base.Event{
Type: "event",
Message: message,
})
@@ -164,7 +153,7 @@ func (e *Event) processTicker() bool {
t, err := ticker.GetTicker(e.Exchange, e.Pair, e.Asset)
if err != nil {
if Verbose {
if Bot.Settings.Verbose {
log.Debugf("Events: failed to get ticker. Err: %s", err)
}
return false
@@ -173,7 +162,7 @@ func (e *Event) processTicker() bool {
lastPrice := t.Last
if lastPrice == 0 {
if Verbose {
if Bot.Settings.Verbose {
log.Debugln("Events: ticker last price is 0")
}
return false
@@ -211,7 +200,7 @@ func (e *Event) processCondition(actual, threshold float64) bool {
func (e *Event) processOrderbook() bool {
ob, err := orderbook.Get(e.Exchange, e.Pair, e.Asset)
if err != nil {
if Verbose {
if Bot.Settings.Verbose {
log.Debugf("Events: Failed to get orderbook. Err: %s", err)
}
return false
@@ -242,18 +231,17 @@ func (e *Event) processOrderbook() bool {
return success
}
// CheckCondition will check the event structure to see if there is a condition
// CheckEventCondition will check the event structure to see if there is a condition
// met
func (e *Event) CheckCondition() bool {
func (e *Event) CheckEventCondition() bool {
if e.Item == ItemPrice {
return e.processTicker()
}
return e.processOrderbook()
}
// IsValidEvent checks the actions to be taken and returns an error if incorrect
func IsValidEvent(exchange, item string, condition ConditionParams, action string) error {
func IsValidEvent(exchange, item string, condition EventConditionParams, action string) error {
exchange = strings.ToUpper(exchange)
item = strings.ToUpper(item)
action = strings.ToUpper(action)
@@ -290,7 +278,7 @@ func IsValidEvent(exchange, item string, condition ConditionParams, action strin
}
if a[1] != "ALL" {
comms.PushEvent(base.Event{Type: a[1]})
Bot.CommsManager.PushEvent(base.Event{Type: a[1]})
}
} else if action != ActionConsolePrint && action != ActionTest {
return errInvalidAction
@@ -302,17 +290,17 @@ func IsValidEvent(exchange, item string, condition ConditionParams, action strin
// EventManger is the overarching routine that will iterate through the Events
// chain
func EventManger() {
log.Debugf("EventManager started. SleepDelay: %v", SleepDelay.String())
log.Debugf("EventManager started. SleepDelay: %v", EventSleepDelay.String())
for {
total, executed := GetEventCounter()
if total > 0 && executed != total {
for _, event := range Events {
if !event.Executed {
if Verbose {
if Bot.Settings.Verbose {
log.Debugf("Events: Processing event %s.", event.String())
}
success := event.CheckCondition()
success := event.CheckEventCondition()
if success {
log.Debugf(
"Events: ID: %d triggered on %s successfully.\n", event.ID,
@@ -323,7 +311,7 @@ func EventManger() {
}
}
}
time.Sleep(SleepDelay)
time.Sleep(EventSleepDelay)
}
}

View File

@@ -1,4 +1,4 @@
package events
package engine
//
// import (

View File

@@ -29,6 +29,87 @@ import (
"github.com/thrasher-/gocryptotrader/utils"
)
// GetSubsystemsStatus returns the status of various subsystems
func GetSubsystemsStatus() map[string]bool {
systems := make(map[string]bool)
systems["communications"] = Bot.CommsManager.Started()
systems["internet_monitor"] = Bot.ConnectionManager.Started()
systems["orders"] = Bot.OrderManager.Started()
systems["portfolio"] = Bot.PortfolioManager.Started()
systems["ntp_timekeeper"] = Bot.NTPManager.Started()
systems["exchange_syncer"] = Bot.Settings.EnableExchangeSyncManager
systems["grpc"] = Bot.Settings.EnableGRPC
systems["grpc_proxy"] = Bot.Settings.EnableGRPCProxy
systems["deprecated_rpc"] = Bot.Settings.EnableDeprecatedRPC
systems["websocket_rpc"] = Bot.Settings.EnableWebsocketRPC
return systems
}
// RPCEndpoint stores an RPC endpoint status and addr
type RPCEndpoint struct {
Started bool
ListenAddr string
}
// GetRPCEndpoints returns a list of RPC endpoints and their listen addrs
func GetRPCEndpoints() map[string]RPCEndpoint {
endpoints := make(map[string]RPCEndpoint)
endpoints["grpc"] = RPCEndpoint{
Started: Bot.Settings.EnableGRPC,
ListenAddr: "grpc://" + Bot.Config.RemoteControl.GRPC.ListenAddress,
}
endpoints["grpc_proxy"] = RPCEndpoint{
Started: Bot.Settings.EnableGRPCProxy,
ListenAddr: "http://" + Bot.Config.RemoteControl.GRPC.GRPCProxyListenAddress,
}
endpoints["deprecated_rpc"] = RPCEndpoint{
Started: Bot.Settings.EnableDeprecatedRPC,
ListenAddr: "http://" + Bot.Config.RemoteControl.DeprecatedRPC.ListenAddress,
}
endpoints["websocket_rpc"] = RPCEndpoint{
Started: Bot.Settings.EnableWebsocketRPC,
ListenAddr: "ws://" + Bot.Config.RemoteControl.WebsocketRPC.ListenAddress,
}
return endpoints
}
// SetSubsystem enables or disables an engine subsystem
func SetSubsystem(subsys string, enable bool) error {
switch strings.ToLower(subsys) {
case "communications":
if enable {
return Bot.CommsManager.Start()
}
return Bot.CommsManager.Stop()
case "internet_monitor":
if enable {
return Bot.ConnectionManager.Start()
}
return Bot.CommsManager.Stop()
case "orders":
if enable {
return Bot.OrderManager.Start()
}
return Bot.OrderManager.Stop()
case "portfolio":
if enable {
return Bot.PortfolioManager.Start()
}
return Bot.OrderManager.Stop()
case "ntp_timekeeper":
if enable {
return Bot.NTPManager.Start()
}
return Bot.NTPManager.Stop()
case "exchange_syncer":
if enable {
Bot.ExchangeCurrencyPairManager.Start()
}
Bot.ExchangeCurrencyPairManager.Stop()
}
return errors.New("subsystem not found")
}
// GetExchangeOTPs returns OTP codes for all exchanges which have a otpsecret
// stored
func GetExchangeOTPs() (map[string]string, error) {

View File

@@ -72,9 +72,17 @@ func (o *orderManager) Start() error {
return nil
}
func (o *orderManager) Stop() error {
if atomic.LoadInt32(&o.started) == 0 {
return errors.New("order manager not started")
}
if atomic.AddInt32(&o.stopped, 1) != 1 {
return errors.New("order manager is already stopped")
}
defer func() {
atomic.CompareAndSwapInt32(&o.stopped, 1, 0)
atomic.CompareAndSwapInt32(&o.started, 1, 0)
}()
log.Debugln("Order manager shutting down...")
close(o.shutdown)
@@ -101,7 +109,7 @@ func (o *orderManager) gracefulShutdown() {
msg := fmt.Sprintf("Order manager: Exchange %s unable to cancel order ID=%v. Err: %s",
k, v[y].ID, err)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
@@ -111,7 +119,7 @@ func (o *orderManager) gracefulShutdown() {
msg := fmt.Sprintf("Order manager: Exchange %s order ID=%v cancelled.",
k, v[y].ID)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
@@ -222,7 +230,7 @@ func (o *orderManager) Submit(exchName string, order *exchange.OrderSubmission)
msg := fmt.Sprintf("Order manager: Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v side=%v type=%v.",
exchName, result.OrderID, id.String(), order.Pair, order.Price, order.Amount, order.OrderSide, order.OrderType)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
@@ -257,7 +265,7 @@ func (o *orderManager) processOrders() {
msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
order.Exchange, order.ID, order.CurrencyPair, order.Price, order.Amount, order.OrderSide, order.OrderType)
log.Debug(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Bot.CommsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})

View File

@@ -51,9 +51,11 @@ func (p *portfolioManager) run() {
Bot.ServicesWG.Add(1)
tick := time.NewTicker(PortfolioSleepDelay)
defer func() {
log.Debugf("Portfolio manager shutdown.")
atomic.CompareAndSwapInt32(&p.stopped, 1, 0)
atomic.CompareAndSwapInt32(&p.started, 1, 0)
tick.Stop()
Bot.ServicesWG.Done()
log.Debugf("Portfolio manager shutdown.")
}()
for {

View File

@@ -15,7 +15,6 @@ import (
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/common/crypto"
"github.com/thrasher-/gocryptotrader/currency"
"github.com/thrasher-/gocryptotrader/engine/events"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/assets"
"github.com/thrasher-/gocryptotrader/gctrpc"
@@ -152,8 +151,47 @@ func (s *RPCServer) GetInfo(ctx context.Context, r *gctrpc.GetInfoRequest) (*gct
AvailableExchanges: int64(len(Bot.Config.Exchanges)),
DefaultFiatCurrency: Bot.Config.Currency.FiatDisplayCurrency.String(),
DefaultForexProvider: Bot.Config.GetPrimaryForexProvider(),
SubsystemStatus: GetSubsystemsStatus(),
}
endpoints := GetRPCEndpoints()
resp.RpcEndpoints = make(map[string]*gctrpc.RPCEndpoint)
for k, v := range endpoints {
resp.RpcEndpoints[k] = &gctrpc.RPCEndpoint{
Started: v.Started,
ListenAddress: v.ListenAddr,
}
}
return &resp, nil
}
// GetSubsystems returns a list of subsystems and their status
func (s *RPCServer) GetSubsystems(ctx context.Context, r *gctrpc.GetSubsystemsRequest) (*gctrpc.GetSusbsytemsResponse, error) {
return &gctrpc.GetSusbsytemsResponse{SubsystemsStatus: GetSubsystemsStatus()}, nil
}
// EnableSubsystem enables a engine subsytem
func (s *RPCServer) EnableSubsystem(ctx context.Context, r *gctrpc.GenericSubsystemRequest) (*gctrpc.GenericSubsystemResponse, error) {
err := SetSubsystem(r.Subsystem, true)
return &gctrpc.GenericSubsystemResponse{}, err
}
// DisableSubsystem disables a engine subsytem
func (s *RPCServer) DisableSubsystem(ctx context.Context, r *gctrpc.GenericSubsystemRequest) (*gctrpc.GenericSubsystemResponse, error) {
err := SetSubsystem(r.Subsystem, false)
return &gctrpc.GenericSubsystemResponse{}, err
}
// GetRPCEndpoints returns a list of API endpoints
func (s *RPCServer) GetRPCEndpoints(ctx context.Context, r *gctrpc.GetRPCEndpointsRequest) (*gctrpc.GetRPCEndpointsResponse, error) {
endpoints := GetRPCEndpoints()
var resp gctrpc.GetRPCEndpointsResponse
resp.Endpoints = make(map[string]*gctrpc.RPCEndpoint)
for k, v := range endpoints {
resp.Endpoints[k] = &gctrpc.RPCEndpoint{
Started: v.Started,
ListenAddress: v.ListenAddr,
}
}
return &resp, nil
}
@@ -638,7 +676,7 @@ func (s *RPCServer) GetEvents(ctx context.Context, r *gctrpc.GetEventsRequest) (
// AddEvent adds an event
func (s *RPCServer) AddEvent(ctx context.Context, r *gctrpc.AddEventRequest) (*gctrpc.AddEventResponse, error) {
evtCondition := events.ConditionParams{
evtCondition := EventConditionParams{
CheckBids: r.ConditionParams.CheckBids,
CheckBidsAndAsks: r.ConditionParams.CheckBidsAndAsks,
Condition: r.ConditionParams.Condition,
@@ -649,7 +687,7 @@ func (s *RPCServer) AddEvent(ctx context.Context, r *gctrpc.AddEventRequest) (*g
p := currency.NewPairWithDelimiter(r.Pair.Base,
r.Pair.Quote, r.Pair.Delimiter)
id, err := events.Add(r.Exchange, r.Item, evtCondition, p, assets.AssetType(r.AssetType), r.Action)
id, err := Add(r.Exchange, r.Item, evtCondition, p, assets.AssetType(r.AssetType), r.Action)
if err != nil {
return nil, err
}
@@ -659,7 +697,7 @@ func (s *RPCServer) AddEvent(ctx context.Context, r *gctrpc.AddEventRequest) (*g
// RemoveEvent removes an event, specified by an event ID
func (s *RPCServer) RemoveEvent(ctx context.Context, r *gctrpc.RemoveEventRequest) (*gctrpc.RemoveEventResponse, error) {
events.Remove(r.Id)
Remove(r.Id)
return &gctrpc.RemoveEventResponse{}, nil
}

140
engine/timekeeper.go Normal file
View File

@@ -0,0 +1,140 @@
package engine
import (
"errors"
"fmt"
"os"
"sync/atomic"
"time"
log "github.com/thrasher-/gocryptotrader/logger"
ntpclient "github.com/thrasher-/gocryptotrader/ntpclient"
)
// vars related to the NTP manager
var (
NTPCheckInterval = time.Second * 30
NTPRetryLimit = 3
errNTPDisabled = errors.New("ntp client disabled")
)
// ntpManager starts the NTP manager
type ntpManager struct {
started int32
stopped int32
inititalCheck bool
shutdown chan struct{}
}
func (n *ntpManager) Started() bool {
return atomic.LoadInt32(&n.started) == 1
}
func (n *ntpManager) Start() (err error) {
if atomic.AddInt32(&n.started, 1) != 1 {
return errors.New("NTP manager already started")
}
var disable bool
defer func() {
if err != nil || disable {
atomic.CompareAndSwapInt32(&n.started, 1, 0)
}
}()
log.Debugln("NTP manager starting...")
if Bot.Config.NTPClient.Level == 0 {
// Initial NTP check (prompts user on how we should proceed)
n.inititalCheck = true
// Sometimes the NTP client can have transient issues due to UDP, try
// the default retry limits before giving up
for i := 0; i < NTPRetryLimit; i++ {
err = n.processTime()
switch err {
case nil:
break
case errNTPDisabled:
log.Debugf("NTP manager: User disabled NTP prompts. Exiting.")
disable = true
err = nil
return
default:
if i == NTPRetryLimit-1 {
return err
}
}
}
}
n.shutdown = make(chan struct{})
go n.run()
log.Debugln("NTP manager started.")
return nil
}
func (n *ntpManager) Stop() error {
if atomic.LoadInt32(&n.started) == 0 {
return errors.New("NTP manager not started")
}
if atomic.AddInt32(&n.stopped, 1) != 1 {
return errors.New("NTP manager is already stopped")
}
close(n.shutdown)
log.Debugln("NTP manager shutting down...")
return nil
}
func (n *ntpManager) run() {
t := time.NewTicker(NTPCheckInterval)
defer func() {
t.Stop()
atomic.CompareAndSwapInt32(&n.stopped, 1, 0)
atomic.CompareAndSwapInt32(&n.started, 1, 0)
log.Debugln("NTP manager shutdown.")
}()
for {
select {
case <-n.shutdown:
return
case <-t.C:
n.processTime()
if Bot.Config.NTPClient.Level == 0 {
close(n.shutdown)
}
}
}
}
func (n *ntpManager) FetchNTPTime() (time.Time, error) {
return ntpclient.NTPClient(Bot.Config.NTPClient.Pool)
}
func (n *ntpManager) processTime() error {
NTPTime, err := n.FetchNTPTime()
if err != nil {
return err
}
currentTime := time.Now()
NTPcurrentTimeDifference := NTPTime.Sub(currentTime)
configNTPTime := *Bot.Config.NTPClient.AllowedDifference
configNTPNegativeTime := (*Bot.Config.NTPClient.AllowedNegativeDifference - (*Bot.Config.NTPClient.AllowedNegativeDifference * 2))
if NTPcurrentTimeDifference > configNTPTime || NTPcurrentTimeDifference < configNTPNegativeTime {
log.Warnf("NTP manager: Time out of sync (NTP): %v | (time.Now()): %v | (Difference): %v | (Allowed): +%v / %v", NTPTime, currentTime, NTPcurrentTimeDifference, configNTPTime, configNTPNegativeTime)
if n.inititalCheck {
n.inititalCheck = false
disable, err := Bot.Config.DisableNTPCheck(os.Stdin)
if err != nil {
return fmt.Errorf("unable to disable NTP check: %s", err)
}
log.Info(disable)
if Bot.Config.NTPClient.Level == -1 {
return errNTPDisabled
}
}
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -37,6 +37,58 @@ func request_GoCryptoTrader_GetInfo_0(ctx context.Context, marshaler runtime.Mar
}
func request_GoCryptoTrader_GetSubsystems_0(ctx context.Context, marshaler runtime.Marshaler, client GoCryptoTraderClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetSubsystemsRequest
var metadata runtime.ServerMetadata
msg, err := client.GetSubsystems(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
var (
filter_GoCryptoTrader_EnableSubsystem_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_GoCryptoTrader_EnableSubsystem_0(ctx context.Context, marshaler runtime.Marshaler, client GoCryptoTraderClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GenericSubsystemRequest
var metadata runtime.ServerMetadata
if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_GoCryptoTrader_EnableSubsystem_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.EnableSubsystem(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
var (
filter_GoCryptoTrader_DisableSubsystem_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_GoCryptoTrader_DisableSubsystem_0(ctx context.Context, marshaler runtime.Marshaler, client GoCryptoTraderClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GenericSubsystemRequest
var metadata runtime.ServerMetadata
if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_GoCryptoTrader_DisableSubsystem_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.DisableSubsystem(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_GoCryptoTrader_GetRPCEndpoints_0(ctx context.Context, marshaler runtime.Marshaler, client GoCryptoTraderClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetRPCEndpointsRequest
var metadata runtime.ServerMetadata
msg, err := client.GetRPCEndpoints(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
var (
filter_GoCryptoTrader_GetExchanges_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
@@ -533,6 +585,86 @@ func RegisterGoCryptoTraderHandlerClient(ctx context.Context, mux *runtime.Serve
})
mux.Handle("GET", pattern_GoCryptoTrader_GetSubsystems_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GoCryptoTrader_GetSubsystems_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_GoCryptoTrader_GetSubsystems_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_GoCryptoTrader_EnableSubsystem_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GoCryptoTrader_EnableSubsystem_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_GoCryptoTrader_EnableSubsystem_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_GoCryptoTrader_DisableSubsystem_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GoCryptoTrader_DisableSubsystem_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_GoCryptoTrader_DisableSubsystem_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_GoCryptoTrader_GetRPCEndpoints_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_GoCryptoTrader_GetRPCEndpoints_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_GoCryptoTrader_GetRPCEndpoints_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_GoCryptoTrader_GetExchanges_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -1139,6 +1271,14 @@ func RegisterGoCryptoTraderHandlerClient(ctx context.Context, mux *runtime.Serve
var (
pattern_GoCryptoTrader_GetInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "getinfo"}, ""))
pattern_GoCryptoTrader_GetSubsystems_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "getsusbsystems"}, ""))
pattern_GoCryptoTrader_EnableSubsystem_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "enablesubsystem"}, ""))
pattern_GoCryptoTrader_DisableSubsystem_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "disablesubsystem"}, ""))
pattern_GoCryptoTrader_GetRPCEndpoints_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "getrpcendpoints"}, ""))
pattern_GoCryptoTrader_GetExchanges_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "getexchanges"}, ""))
pattern_GoCryptoTrader_DisableExchange_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "disableexchange"}, ""))
@@ -1203,6 +1343,14 @@ var (
var (
forward_GoCryptoTrader_GetInfo_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_GetSubsystems_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_EnableSubsystem_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_DisableSubsystem_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_GetRPCEndpoints_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_GetExchanges_0 = runtime.ForwardResponseMessage
forward_GoCryptoTrader_DisableExchange_0 = runtime.ForwardResponseMessage

View File

@@ -12,6 +12,35 @@ message GetInfoResponse {
int64 enabled_exchanges = 3;
string default_forex_provider = 4;
string default_fiat_currency = 5;
map<string, bool> subsystem_status = 6;
map<string, RPCEndpoint> rpc_endpoints = 7;
}
// TO-DO comms APIs
message GetCommunicationRelayersRequest {}
message GetCommunicationRelayersResponse {}
message GenericSubsystemRequest {
string subsystem = 1;
}
message GenericSubsystemResponse {}
message GetSubsystemsRequest {}
message GetSusbsytemsResponse {
map<string, bool> subsystems_status = 1;
}
message GetRPCEndpointsRequest{}
message RPCEndpoint {
bool started = 1;
string listen_address = 2;
}
message GetRPCEndpointsResponse {
map<string, RPCEndpoint> endpoints = 1;
}
message GenericExchangeNameRequest {
@@ -403,6 +432,30 @@ service GoCryptoTrader {
};
}
rpc GetSubsystems (GetSubsystemsRequest) returns (GetSusbsytemsResponse) {
option (google.api.http) = {
get: "/v1/getsusbsystems"
};
}
rpc EnableSubsystem (GenericSubsystemRequest) returns (GenericSubsystemResponse) {
option (google.api.http) = {
get: "/v1/enablesubsystem"
};
}
rpc DisableSubsystem (GenericSubsystemRequest) returns (GenericSubsystemResponse) {
option (google.api.http) = {
get: "/v1/disablesubsystem"
};
}
rpc GetRPCEndpoints (GetRPCEndpointsRequest) returns (GetRPCEndpointsResponse) {
option (google.api.http) = {
get: "/v1/getrpcendpoints"
};
}
rpc GetExchanges (GetExchangesRequest) returns (GetExchangesResponse) {
option (google.api.http) = {
get: "/v1/getexchanges"

View File

@@ -145,6 +145,30 @@
]
}
},
"/v1/disablesubsystem": {
"get": {
"operationId": "DisableSubsystem",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/gctrpcGenericSubsystemResponse"
}
}
},
"parameters": [
{
"name": "subsystem",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
"GoCryptoTrader"
]
}
},
"/v1/enableexchange": {
"post": {
"operationId": "EnableExchange",
@@ -171,6 +195,30 @@
]
}
},
"/v1/enablesubsystem": {
"get": {
"operationId": "EnableSubsystem",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/gctrpcGenericSubsystemResponse"
}
}
},
"parameters": [
{
"name": "subsystem",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
"GoCryptoTrader"
]
}
},
"/v1/getaccountinfo": {
"get": {
"operationId": "GetAccountInfo",
@@ -542,6 +590,38 @@
]
}
},
"/v1/getrpcendpoints": {
"get": {
"operationId": "GetRPCEndpoints",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/gctrpcGetRPCEndpointsResponse"
}
}
},
"tags": [
"GoCryptoTrader"
]
}
},
"/v1/getsusbsystems": {
"get": {
"operationId": "GetSubsystems",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/gctrpcGetSusbsytemsResponse"
}
}
},
"tags": [
"GoCryptoTrader"
]
}
},
"/v1/getticker": {
"post": {
"operationId": "GetTicker",
@@ -978,6 +1058,9 @@
"gctrpcGenericExchangeNameResponse": {
"type": "object"
},
"gctrpcGenericSubsystemResponse": {
"type": "object"
},
"gctrpcGetAccountInfoResponse": {
"type": "object",
"properties": {
@@ -1180,6 +1263,19 @@
},
"default_fiat_currency": {
"type": "string"
},
"subsystem_status": {
"type": "object",
"additionalProperties": {
"type": "boolean",
"format": "boolean"
}
},
"rpc_endpoints": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/gctrpcRPCEndpoint"
}
}
}
},
@@ -1290,6 +1386,29 @@
}
}
},
"gctrpcGetRPCEndpointsResponse": {
"type": "object",
"properties": {
"endpoints": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/gctrpcRPCEndpoint"
}
}
}
},
"gctrpcGetSusbsytemsResponse": {
"type": "object",
"properties": {
"subsystems_status": {
"type": "object",
"additionalProperties": {
"type": "boolean",
"format": "boolean"
}
}
}
},
"gctrpcGetTickerRequest": {
"type": "object",
"properties": {
@@ -1490,6 +1609,18 @@
}
}
},
"gctrpcRPCEndpoint": {
"type": "object",
"properties": {
"started": {
"type": "boolean",
"format": "boolean"
},
"listen_address": {
"type": "string"
}
}
},
"gctrpcRemoveEventRequest": {
"type": "object",
"properties": {