mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 15:09:42 +00:00
Order manager changes
This commit is contained in:
@@ -32,11 +32,11 @@ type Engine struct {
|
||||
Portfolio *portfolio.Base
|
||||
Exchanges []exchange.IBotExchange
|
||||
ExchangeCurrencyPairManager *ExchangeCurrencyPairSyncer
|
||||
OrderManager *OrderManager
|
||||
OrderManager orderManager
|
||||
PortfolioManager portfolioManager
|
||||
CommsRelayer *communications.Communications
|
||||
Connectivity *connchecker.Checker
|
||||
Shutdown chan bool
|
||||
Shutdown chan struct{}
|
||||
Settings Settings
|
||||
CryptocurrencyDepositAddresses map[string]map[string]string
|
||||
Uptime time.Time
|
||||
@@ -157,6 +157,7 @@ func ValidateSettings(b *Engine, s *Settings) {
|
||||
|
||||
b.Settings.EnableConnectivityMonitor = s.EnableConnectivityMonitor
|
||||
b.Settings.EnableNTPClient = s.EnableNTPClient
|
||||
b.Settings.EnableOrderManager = s.EnableOrderManager
|
||||
b.Settings.EnableExchangeSyncManager = s.EnableExchangeSyncManager
|
||||
b.Settings.EnableTickerSyncing = s.EnableTickerSyncing
|
||||
b.Settings.EnableOrderbookSyncing = s.EnableOrderbookSyncing
|
||||
@@ -219,7 +220,7 @@ func PrintSettings(s *Settings) {
|
||||
log.Debugf("\t Enable all exchanges: %v", s.EnableAllExchanges)
|
||||
log.Debugf("\t Enable all pairs: %v", s.EnableAllPairs)
|
||||
log.Debugf("\t Enable coinmarketcap analaysis: %v", s.EnableCoinmarketcapAnalysis)
|
||||
log.Debugf("\t Enable portfolio watcher: %v", s.EnablePortfolioManager)
|
||||
log.Debugf("\t Enable portfolio manager: %v", s.EnablePortfolioManager)
|
||||
log.Debugf("\t Enable gPRC: %v", s.EnableGRPC)
|
||||
log.Debugf("\t Enable gRPC Proxy: %v", s.EnableGRPCProxy)
|
||||
log.Debugf("\t Enable websocket RPC: %v", s.EnableWebsocketRPC)
|
||||
@@ -377,6 +378,12 @@ func (e *Engine) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
if e.Settings.EnableOrderManager {
|
||||
if err = e.OrderManager.Start(); err != nil {
|
||||
log.Errorf("Order manager unable to start: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if e.Settings.EnableExchangeSyncManager {
|
||||
exchangeSyncCfg := CurrencyPairSyncerConfig{
|
||||
SyncTicker: e.Settings.EnableTickerSyncing,
|
||||
@@ -393,10 +400,6 @@ func (e *Engine) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
if e.Settings.EnableOrderManager {
|
||||
go StartOrderManagerRoutine()
|
||||
}
|
||||
|
||||
if e.Settings.EnableEventManager {
|
||||
go events.EventManger()
|
||||
}
|
||||
@@ -413,6 +416,12 @@ func (e *Engine) Stop() {
|
||||
e.Config.Portfolio = portfolio.Portfolio
|
||||
}
|
||||
|
||||
if e.OrderManager.Started() {
|
||||
if err := e.OrderManager.Stop(); err != nil {
|
||||
log.Errorf("Order manager unable to stop. Error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if e.PortfolioManager.Started() {
|
||||
if err := e.PortfolioManager.Stop(); err != nil {
|
||||
log.Errorf("Fund manager unable to stop. Error: %v", err)
|
||||
@@ -436,11 +445,11 @@ func (e *Engine) Stop() {
|
||||
// shuts down the engine instance
|
||||
func (e *Engine) handleInterrupt() {
|
||||
c := make(chan os.Signal, 1)
|
||||
e.Shutdown = make(chan bool)
|
||||
e.Shutdown = make(chan struct{})
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
sig := <-c
|
||||
log.Debugf("Captured %v, shutdown requested.", sig)
|
||||
e.Shutdown <- true
|
||||
close(e.Shutdown)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -26,6 +26,18 @@ import (
|
||||
"github.com/thrasher-/gocryptotrader/utils"
|
||||
)
|
||||
|
||||
// GetAuthAPISupportedExchanges returns a list of auth api enabled exchanges
|
||||
func GetAuthAPISupportedExchanges() []string {
|
||||
var exchanges []string
|
||||
for x := range Bot.Exchanges {
|
||||
if !Bot.Exchanges[x].GetAuthenticatedAPISupport() {
|
||||
continue
|
||||
}
|
||||
exchanges = append(exchanges, Bot.Exchanges[x].GetName())
|
||||
}
|
||||
return exchanges
|
||||
}
|
||||
|
||||
// IsOnline returns whether or not the engine has Internet connectivity
|
||||
func IsOnline() bool {
|
||||
if Bot.Connectivity == nil {
|
||||
|
||||
130
engine/orders.go
130
engine/orders.go
@@ -1,47 +1,131 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
exchange "github.com/thrasher-/gocryptotrader/exchanges"
|
||||
log "github.com/thrasher-/gocryptotrader/logger"
|
||||
)
|
||||
|
||||
// OrderManager manages orders for all enabled exchanges
|
||||
type OrderManager struct {
|
||||
// vars for the fund manager package
|
||||
var (
|
||||
OrderManagerDelay = time.Second * 10
|
||||
ErrOrdersAlreadyExists = errors.New("order already exists")
|
||||
)
|
||||
|
||||
type orderStore struct {
|
||||
m sync.Mutex
|
||||
Orders map[string][]exchange.OrderDetail
|
||||
}
|
||||
|
||||
func (o *OrderManager) add() {
|
||||
func (o *orderStore) exists(order *exchange.OrderDetail) bool {
|
||||
r, ok := o.Orders[order.Exchange]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
for x := range r {
|
||||
if r[x].ID == order.ID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (o *orderStore) Add(order *exchange.OrderDetail) error {
|
||||
o.m.Lock()
|
||||
defer o.m.Unlock()
|
||||
|
||||
if o.exists(order) {
|
||||
return ErrOrdersAlreadyExists
|
||||
}
|
||||
|
||||
orders := o.Orders[order.Exchange]
|
||||
orders = append(orders, *order)
|
||||
o.Orders[order.Exchange] = orders
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartOrderManagerRoutine starts the orderbook manage routine
|
||||
func StartOrderManagerRoutine() {
|
||||
log.Debugln("Starting order manager routine")
|
||||
if Bot.OrderManager == nil {
|
||||
Bot.OrderManager = new(OrderManager)
|
||||
type orderManager struct {
|
||||
started int32
|
||||
stopped int32
|
||||
shutdown chan struct{}
|
||||
orderStore orderStore
|
||||
}
|
||||
|
||||
func (o *orderManager) Started() bool {
|
||||
return atomic.LoadInt32(&o.started) == 1
|
||||
}
|
||||
|
||||
func (o *orderManager) Start() error {
|
||||
if atomic.AddInt32(&o.started, 1) != 1 {
|
||||
return errors.New("order manager already started")
|
||||
}
|
||||
|
||||
log.Debugln("Order manager starting...")
|
||||
o.shutdown = make(chan struct{})
|
||||
o.orderStore.Orders = make(map[string][]exchange.OrderDetail)
|
||||
go o.run()
|
||||
return nil
|
||||
}
|
||||
func (o *orderManager) Stop() error {
|
||||
if atomic.AddInt32(&o.stopped, 1) != 1 {
|
||||
return errors.New("order manager is already stopped")
|
||||
}
|
||||
|
||||
log.Debugln("Order manager shutting down...")
|
||||
close(o.shutdown)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *orderManager) run() {
|
||||
log.Debugln("Order manager started.")
|
||||
tick := time.NewTicker(OrderManagerDelay)
|
||||
defer func() {
|
||||
log.Debugf("Order manager shutdown.")
|
||||
tick.Stop()
|
||||
}()
|
||||
|
||||
for {
|
||||
for x := range Bot.Exchanges {
|
||||
if !Bot.Exchanges[x].IsEnabled() || !Bot.Exchanges[x].GetAuthenticatedAPISupport() {
|
||||
continue
|
||||
}
|
||||
exchName := Bot.Exchanges[x].GetName()
|
||||
log.Printf("Getting active orders for %s", exchName)
|
||||
|
||||
orders, err := Bot.Exchanges[x].GetActiveOrders(&exchange.GetOrdersRequest{})
|
||||
if err != nil {
|
||||
log.Printf("Get active orders failed: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Orders for exchange %s: %v", exchName, orders)
|
||||
select {
|
||||
case <-o.shutdown:
|
||||
return
|
||||
case <-tick.C:
|
||||
o.processOrders()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *orderManager) Cancel() {}
|
||||
|
||||
func (o *orderManager) Place() {}
|
||||
|
||||
func (o *orderManager) processOrders() {
|
||||
authExchanges := GetAuthAPISupportedExchanges()
|
||||
for x := range authExchanges {
|
||||
log.Debugf("Order manager: Procesing orders for exchange %v.", authExchanges[x])
|
||||
exch := GetExchangeByName(authExchanges[x])
|
||||
req := exchange.GetOrdersRequest{
|
||||
OrderSide: exchange.AnyOrderSide,
|
||||
OrderType: exchange.AnyOrderType,
|
||||
}
|
||||
result, err := exch.GetActiveOrders(&req)
|
||||
if err != nil {
|
||||
log.Debugf("Order manager: Unable to get active orders: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for x := range result {
|
||||
order := &result[x]
|
||||
result := o.orderStore.Add(order)
|
||||
if result != ErrOrdersAlreadyExists {
|
||||
log.Debugf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
|
||||
order.Exchange, order.ID, order.CurrencyPair, order.Price, order.Amount, order.OrderSide, order.OrderType)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user