Engine improvements

This commit is contained in:
Adrian Gallagher
2019-06-10 20:02:09 +10:00
parent 04c7c4895f
commit f777e68716
88 changed files with 2037 additions and 1413 deletions

View File

@@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"path"
"sync"
"syscall"
"time"
@@ -40,6 +41,7 @@ type Engine struct {
Settings Settings
CryptocurrencyDepositAddresses map[string]map[string]string
Uptime time.Time
ServicesWG sync.WaitGroup
}
// Vars for engine
@@ -436,6 +438,8 @@ func (e *Engine) Stop() {
log.Debugln("Config file saved successfully.")
}
}
// Wait for services to gracefully shutdown
e.ServicesWG.Wait()
log.Debugln("Exiting.")
log.CloseLogFile()
os.Exit(0)

View File

@@ -139,7 +139,10 @@ func (e *Event) ExecuteAction() bool {
if action[0] == ActionSMSNotify {
message := fmt.Sprintf("Event triggered: %s", e.String())
if action[1] == "ALL" {
comms.PushEvent(base.Event{TradeDetails: message})
comms.PushEvent(base.Event{
Type: "event",
Message: message,
})
}
}
} else {

View File

@@ -7,13 +7,16 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math/big"
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/pquerna/otp/totp"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
@@ -26,6 +29,21 @@ import (
"github.com/thrasher-/gocryptotrader/utils"
)
// GetOTPByExchange returns a OTP code for the desired exchange
// if it exists
func GetOTPByExchange(exchName string) (string, error) {
for x := range Bot.Config.Exchanges {
if !strings.EqualFold(Bot.Config.Exchanges[x].Name, exchName) {
continue
}
if otpSecret := Bot.Config.Exchanges[x].API.Credentials.OTPSecret; otpSecret != "" {
return totp.GenerateCode(otpSecret, time.Now())
}
}
return "", errors.New("exchange does not have a otpsecret stored")
}
// GetAuthAPISupportedExchanges returns a list of auth api enabled exchanges
func GetAuthAPISupportedExchanges() []string {
var exchanges []string

View File

@@ -2,10 +2,12 @@ package engine
import (
"errors"
"sync"
"fmt"
"sync/atomic"
"time"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/communications/base"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
log "github.com/thrasher-/gocryptotrader/logger"
)
@@ -16,9 +18,10 @@ var (
ErrOrdersAlreadyExists = errors.New("order already exists")
)
type orderStore struct {
m sync.Mutex
Orders map[string][]exchange.OrderDetail
func (o *orderStore) Get() map[string][]exchange.OrderDetail {
o.m.Lock()
defer o.m.Unlock()
return o.Orders
}
func (o *orderStore) exists(order *exchange.OrderDetail) bool {
@@ -50,13 +53,6 @@ func (o *orderStore) Add(order *exchange.OrderDetail) error {
return nil
}
type orderManager struct {
started int32
stopped int32
shutdown chan struct{}
orderStore orderStore
}
func (o *orderManager) Started() bool {
return atomic.LoadInt32(&o.started) == 1
}
@@ -67,6 +63,9 @@ func (o *orderManager) Start() error {
}
log.Debugln("Order manager starting...")
// test param
o.cfg.CancelOrdersOnShutdown = true
o.shutdown = make(chan struct{})
o.orderStore.Orders = make(map[string][]exchange.OrderDetail)
go o.run()
@@ -82,17 +81,59 @@ func (o *orderManager) Stop() error {
return nil
}
func (o *orderManager) gracefulShutdown() {
if o.cfg.CancelOrdersOnShutdown {
log.Debug("Order manager: Cancelling any open orders...")
orders := o.orderStore.Get()
if orders == nil {
return
}
for k, v := range orders {
log.Debugf("Order manager: Cancelling order(s) for exchange %s.", k)
for y := range v {
log.Debugf("order manager: Cancelling order ID %v [%v]",
v[y].ID, v[y])
err := o.Cancel(k, &exchange.OrderCancellation{
OrderID: v[y].ID,
})
if err != nil {
msg := fmt.Sprintf("Order manager: Exchange %s unable to cancel order ID=%v. Err: %s",
k, v[y].ID, err)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Type: "order",
Message: msg,
})
continue
}
msg := fmt.Sprintf("Order manager: Exchange %s order ID=%v cancelled.",
k, v[y].ID)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Type: "order",
Message: msg,
})
}
}
}
}
func (o *orderManager) run() {
log.Debugln("Order manager started.")
tick := time.NewTicker(OrderManagerDelay)
Bot.ServicesWG.Add(1)
defer func() {
log.Debugf("Order manager shutdown.")
tick.Stop()
Bot.ServicesWG.Done()
}()
for {
select {
case <-o.shutdown:
o.gracefulShutdown()
return
case <-tick.C:
o.processOrders()
@@ -100,9 +141,99 @@ func (o *orderManager) run() {
}
}
func (o *orderManager) Cancel() {}
func (o *orderManager) CancelAllOrders() {}
func (o *orderManager) Place() {}
func (o *orderManager) Cancel(exchName string, order *exchange.OrderCancellation) error {
if exchName == "" {
return errors.New("order exchange name is empty")
}
if order == nil {
return errors.New("order cancel param is nil")
}
if order.OrderID == "" {
return errors.New("order id is empty")
}
exch := GetExchangeByName(exchName)
if exch == nil {
return errors.New("unable to get exchange by name")
}
if order.AssetType.String() != "" && !exch.GetAssetTypes().Contains(order.AssetType) {
return errors.New("order asset type not supported by exchange")
}
return exch.CancelOrder(order)
}
func (o *orderManager) Submit(exchName string, order *exchange.OrderSubmission) (*orderSubmitResponse, error) {
if exchName == "" {
return nil, errors.New("order exchange name must be specified")
}
if order == nil {
return nil, exchange.ErrOrderSubmissionIsNil
}
if err := order.Validate(); err != nil {
return nil, err
}
if o.cfg.EnforceLimitConfig {
if !o.cfg.AllowMarketOrders && order.OrderType == exchange.MarketOrderType {
return nil, errors.New("order market type is not allowed")
}
if o.cfg.LimitAmount > 0 && order.Amount > o.cfg.LimitAmount {
return nil, errors.New("order limit exceeds allowed limit")
}
if len(o.cfg.AllowedExchanges) > 0 &&
!common.StringDataCompareInsensitive(o.cfg.AllowedExchanges, exchName) {
return nil, errors.New("order exchange not found in allowed list")
}
if len(o.cfg.AllowedPairs) > 0 && !o.cfg.AllowedPairs.Contains(order.Pair, true) {
return nil, errors.New("order pair not found in allowed list")
}
}
exch := GetExchangeByName(exchName)
if exch == nil {
return nil, errors.New("unable to get exchange by name")
}
id, err := common.GetV4UUID()
if err != nil {
log.Warnf("Order manager: Unable to generate UUID. Err: %s", err)
}
result, err := exch.SubmitOrder(order)
if err != nil {
return nil, err
}
if result.IsOrderPlaced {
return nil, errors.New("order unable to be placed")
}
msg := fmt.Sprintf("Order manager: Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v side=%v type=%v.",
exchName, result.OrderID, id.String(), order.Pair, order.Price, order.Amount, order.OrderSide, order.OrderType)
log.Debugln(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Type: "order",
Message: msg,
})
return &orderSubmitResponse{
SubmitOrderResponse: exchange.SubmitOrderResponse{
OrderID: result.OrderID,
},
OurOrderID: id.String(),
}, nil
}
func (o *orderManager) processOrders() {
authExchanges := GetAuthAPISupportedExchanges()
@@ -123,8 +254,14 @@ func (o *orderManager) processOrders() {
order := &result[x]
result := o.orderStore.Add(order)
if result != ErrOrdersAlreadyExists {
log.Debugf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
order.Exchange, order.ID, order.CurrencyPair, order.Price, order.Amount, order.OrderSide, order.OrderType)
log.Debug(msg)
Bot.CommsRelayer.PushEvent(base.Event{
Type: "order",
Message: msg,
})
continue
}
}
}

36
engine/orders_types.go Normal file
View File

@@ -0,0 +1,36 @@
package engine
import (
"sync"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
)
type orderManagerConfig struct {
EnforceLimitConfig bool
AllowMarketOrders bool
CancelOrdersOnShutdown bool
LimitAmount float64
AllowedPairs currency.Pairs
AllowedExchanges []string
OrderSubmissionRetries int64
}
type orderStore struct {
m sync.Mutex
Orders map[string][]exchange.OrderDetail
}
type orderManager struct {
started int32
stopped int32
shutdown chan struct{}
orderStore orderStore
cfg orderManagerConfig
}
type orderSubmitResponse struct {
exchange.SubmitOrderResponse
OurOrderID string
}

View File

@@ -48,10 +48,12 @@ func (p *portfolioManager) Stop() error {
func (p *portfolioManager) run() {
log.Debugln("Portfolio manager started.")
Bot.ServicesWG.Add(1)
tick := time.NewTicker(PortfolioSleepDelay)
defer func() {
log.Debugf("Portfolio manager shutdown.")
tick.Stop()
Bot.ServicesWG.Done()
}()
for {

View File

@@ -214,7 +214,6 @@ func TickerUpdaterRoutine() {
}
printTickerSummary(&result, c, assetType, exchangeName, err)
if err == nil {
Bot.CommsRelayer.StageTickerData(exchangeName, assetType, &result)
if Bot.Config.RemoteControl.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "ticker_update", assetType.String(), exchangeName)
}
@@ -261,7 +260,6 @@ func OrderbookUpdaterRoutine() {
result, err := exch.UpdateOrderbook(c, assetType)
printOrderbookSummary(&result, c, assetType, exchangeName, err)
if err == nil {
Bot.CommsRelayer.StageOrderbookData(exchangeName, assetType, &result)
if Bot.Config.RemoteControl.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "orderbook_update", assetType.String(), exchangeName)
}
@@ -324,7 +322,7 @@ var wg sync.WaitGroup
func Websocketshutdown(ws *exchange.Websocket) error {
err := ws.Shutdown() // shutdown routines on the exchange
if err != nil {
log.Errorf("routines.go error - failed to shutodwn %s", err)
log.Errorf("routines.go error - failed to shutdown %s", err)
}
timer := time.NewTimer(5 * time.Second)
@@ -426,7 +424,10 @@ func WebsocketDataHandler(ws *exchange.Websocket) {
Low: d.LowPrice,
Volume: d.Quantity,
}
Bot.ExchangeCurrencyPairManager.update(ws.GetName(), d.Pair, d.AssetType, SyncItemTicker, nil)
if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil {
Bot.ExchangeCurrencyPairManager.update(ws.GetName(),
d.Pair, d.AssetType, SyncItemTicker, nil)
}
ticker.ProcessTicker(ws.GetName(), &tickerNew, d.AssetType)
printTickerSummary(&tickerNew, tickerNew.Pair, d.AssetType, ws.GetName(), nil)
case exchange.KlineData:
@@ -437,7 +438,11 @@ func WebsocketDataHandler(ws *exchange.Websocket) {
case exchange.WebsocketOrderbookUpdate:
// Orderbook data
result := data.(exchange.WebsocketOrderbookUpdate)
Bot.ExchangeCurrencyPairManager.update(ws.GetName(), result.Pair, result.Asset, SyncItemOrderbook, nil)
if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil {
Bot.ExchangeCurrencyPairManager.update(ws.GetName(),
result.Pair, result.Asset, SyncItemOrderbook, nil)
}
// TO-DO: printOrderbookSummary
//nolint:gocritic log.Infof("Websocket %s %s orderbook updated", ws.GetName(), result.Pair.Pair().String())
default:
if Bot.Settings.Verbose {

View File

@@ -176,6 +176,12 @@ func (s *RPCServer) EnableExchange(ctx context.Context, r *gctrpc.GenericExchang
return &gctrpc.GenericExchangeNameResponse{}, err
}
// GetExchangeOTPCode retrieves an exchanges OTP code
func (s *RPCServer) GetExchangeOTPCode(ctx context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GetExchangeOTPReponse, error) {
result, err := GetOTPByExchange(r.Exchange)
return &gctrpc.GetExchangeOTPReponse{OtpCode: result}, err
}
// GetExchangeInfo gets info for a specific exchange
func (s *RPCServer) GetExchangeInfo(ctx context.Context, r *gctrpc.GenericExchangeNameRequest) (*gctrpc.GetExchangeInfoResponse, error) {
exchCfg, err := Bot.Config.GetExchangeConfig(r.Exchange)
@@ -580,9 +586,15 @@ func (s *RPCServer) SubmitOrder(ctx context.Context, r *gctrpc.SubmitOrderReques
}
p := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote)
result, err := exch.SubmitOrder(p, exchange.OrderSide(r.Side),
exchange.OrderType(r.OrderType), r.Amount, r.Price, r.ClientId)
submission := &exchange.OrderSubmission{
Pair: p,
OrderSide: exchange.OrderSide(r.Side),
OrderType: exchange.OrderType(r.OrderType),
Amount: r.Amount,
Price: r.Price,
ClientID: r.ClientId,
}
result, err := exch.SubmitOrder(submission)
return &gctrpc.SubmitOrderResponse{
OrderId: result.OrderID,
OrderPlaced: result.IsOrderPlaced,

View File

@@ -197,7 +197,7 @@ func (e *ExchangeCurrencyPairSyncer) update(exchangeName string, p currency.Pair
return
}
default:
log.Warnf("ExchangeCurrencyPairSyncer: unkown sync item %v", syncType)
log.Warnf("ExchangeCurrencyPairSyncer: unknown sync item %v", syncType)
return
}
@@ -267,10 +267,6 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
continue
}
if Bot.Exchanges[x].GetName() == "BTCC" {
continue
}
exchangeName := Bot.Exchanges[x].GetName()
assetTypes := Bot.Exchanges[x].GetAssetTypes()
supportsREST := Bot.Exchanges[x].SupportsREST()
@@ -456,17 +452,13 @@ func (e *ExchangeCurrencyPairSyncer) Start() {
continue
}
if Bot.Exchanges[x].GetName() == "BTCC" {
continue
}
exchangeName := Bot.Exchanges[x].GetName()
supportsWebsocket := Bot.Exchanges[x].SupportsWebsocket()
assetTypes := Bot.Exchanges[x].GetAssetTypes()
supportsREST := Bot.Exchanges[x].SupportsREST()
if !supportsREST && !supportsWebsocket {
log.Warnf("Loaded exchange %s does not support REST or Websocket", exchangeName)
log.Warnf("Loaded exchange %s does not support REST or Websocket.", exchangeName)
continue
}