engine/order manager: Initial REST managed order updating (resolves #772) (#775)

* Initial REST managed order updating

* Apply gloriousCode's changes.go patch

* Update internal order ID handling

* Check error

* Replace string with string pointer

* Avoid nil pointers in upsert

* Update test for UpdateOrderFromDetail()

* Add tests for orders.go

* Remove unnecessary newline

* Address comments

* Add missing nil check

* Add tests for new functions in order_manager.go

* Remove empty line

* Change log level for updates from Info to Debug (keep added orders at Info)

* Initialize orders before running the timer

* [TEMP] Add verbosity for debugging

* Nil checking on exchangeManager in GetExchanges()

- exchangeManager.GetExchanges() and iExchangeManager.GetExchanges() return an error on nil
- bot.GetExchanges() wraps exchangeManager.GetExchanges() and returns an empty slice

* Revert b5afe1a46b

* Do not start the order manager runner thread

Instead, mark the order manager as running

* Remove redundant error.Is() and remove print wrapper on msg

* Add atomic blocker and waitgroup on processOrders()

* Disable unnecessary orderManager runner thread for rpcserver_test

* Remove redundant err from orderStore.getActiveOrders()

* [FIX] Populate requiresProcessing using UpsertResponse data instead of REST return data

.. because the data returned by the REST calls do not include the internal user ID's

* [TEST] Verify that processOrders() actually processes queried order data

* Remove leftover warning and add nil check on wg.Done()

* Apply suggestions from code review

Log category changes - as suggested

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* Return when no exchanges available

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
This commit is contained in:
TaltaM
2021-09-15 01:28:31 +02:00
committed by GitHub
parent 068b375867
commit fd600972ba
17 changed files with 833 additions and 79 deletions

View File

@@ -303,7 +303,10 @@ func (m *apiServerManager) getIndex(w http.ResponseWriter, _ *http.Request) {
// getAllActiveOrderbooks returns all enabled exchanges orderbooks
func getAllActiveOrderbooks(m iExchangeManager) []EnabledExchangeOrderbooks {
var orderbookData []EnabledExchangeOrderbooks
exchanges := m.GetExchanges()
exchanges, err := m.GetExchanges()
if err != nil {
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
}
for x := range exchanges {
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
@@ -340,7 +343,10 @@ func getAllActiveOrderbooks(m iExchangeManager) []EnabledExchangeOrderbooks {
// getAllActiveTickers returns all enabled exchanges tickers
func getAllActiveTickers(m iExchangeManager) []EnabledExchangeCurrencies {
var tickers []EnabledExchangeCurrencies
exchanges := m.GetExchanges()
exchanges, err := m.GetExchanges()
if err != nil {
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
}
for x := range exchanges {
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()
@@ -377,7 +383,10 @@ func getAllActiveTickers(m iExchangeManager) []EnabledExchangeCurrencies {
// getAllActiveAccounts returns all enabled exchanges accounts
func getAllActiveAccounts(m iExchangeManager) []AllEnabledExchangeAccounts {
var accounts []AllEnabledExchangeAccounts
exchanges := m.GetExchanges()
exchanges, err := m.GetExchanges()
if err != nil {
log.Errorf(log.APIServerMgr, "Cannot get exchanges: %v", err)
}
for x := range exchanges {
assets := exchanges[x].GetAssetTypes(true)
exchName := exchanges[x].GetName()

View File

@@ -716,7 +716,12 @@ func (bot *Engine) UnloadExchange(exchName string) error {
// GetExchanges retrieves the loaded exchanges
func (bot *Engine) GetExchanges() []exchange.IBotExchange {
return bot.ExchangeManager.GetExchanges()
exch, err := bot.ExchangeManager.GetExchanges()
if err != nil {
gctlog.Warnf(gctlog.ExchangeSys, "Cannot get exchanges: %v", err)
return []exchange.IBotExchange{}
}
return exch
}
// LoadExchange loads an exchange by name. Optional wait group can be added for
@@ -917,7 +922,7 @@ func (bot *Engine) SetupExchanges() error {
}(configs[x])
}
wg.Wait()
if len(bot.ExchangeManager.GetExchanges()) == 0 {
if len(bot.GetExchanges()) == 0 {
return ErrNoExchangesLoaded
}
return nil

View File

@@ -77,14 +77,17 @@ func (m *ExchangeManager) Add(exch exchange.IBotExchange) {
}
// GetExchanges returns all stored exchanges
func (m *ExchangeManager) GetExchanges() []exchange.IBotExchange {
func (m *ExchangeManager) GetExchanges() ([]exchange.IBotExchange, error) {
if m == nil {
return nil, fmt.Errorf("exchange manager: %w", ErrNilSubsystem)
}
m.m.Lock()
defer m.m.Unlock()
var exchs []exchange.IBotExchange
for _, x := range m.exchanges {
exchs = append(exchs, x)
}
return exchs
return exchs, nil
}
// RemoveExchange removes an exchange from the manager

View File

@@ -28,7 +28,11 @@ func TestExchangeManagerAdd(t *testing.T) {
b := new(bitfinex.Bitfinex)
b.SetDefaults()
m.Add(b)
if exch := m.GetExchanges(); exch[0].GetName() != "Bitfinex" {
exchanges, err := m.GetExchanges()
if err != nil {
t.Error("no exchange manager found")
}
if exchanges[0].GetName() != "Bitfinex" {
t.Error("unexpected exchange name")
}
}
@@ -36,13 +40,21 @@ func TestExchangeManagerAdd(t *testing.T) {
func TestExchangeManagerGetExchanges(t *testing.T) {
t.Parallel()
m := SetupExchangeManager()
if exchanges := m.GetExchanges(); exchanges != nil {
exchanges, err := m.GetExchanges()
if err != nil {
t.Error("no exchange manager found")
}
if exchanges != nil {
t.Error("unexpected value")
}
b := new(bitfinex.Bitfinex)
b.SetDefaults()
m.Add(b)
if exch := m.GetExchanges(); exch[0].GetName() != "Bitfinex" {
exchanges, err = m.GetExchanges()
if err != nil {
t.Error("no exchange manager found")
}
if exchanges[0].GetName() != "Bitfinex" {
t.Error("unexpected exchange name")
}
}

View File

@@ -719,7 +719,7 @@ func (bot *Engine) GetExchangeCryptocurrencyDepositAddresses() map[string]map[st
// GetExchangeNames returns a list of enabled or disabled exchanges
func (bot *Engine) GetExchangeNames(enabledOnly bool) []string {
exchanges := bot.ExchangeManager.GetExchanges()
exchanges := bot.GetExchanges()
var response []string
for i := range exchanges {
if !enabledOnly || (enabledOnly && exchanges[i].IsEnabled()) {

View File

@@ -87,14 +87,19 @@ func (m *OrderManager) Stop() error {
func (m *OrderManager) gracefulShutdown() {
if m.cfg.CancelOrdersOnShutdown {
log.Debugln(log.OrderMgr, "Order manager: Cancelling any open orders...")
m.CancelAllOrders(context.TODO(),
m.orderStore.exchangeManager.GetExchanges())
exchanges, err := m.orderStore.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.OrderMgr, "Order manager cannot get exchanges: %v", err)
return
}
m.CancelAllOrders(context.TODO(), exchanges)
}
}
// run will periodically process orders
func (m *OrderManager) run() {
log.Debugln(log.OrderMgr, "Order manager started.")
m.processOrders()
tick := time.NewTicker(orderManagerDelay)
m.orderStore.wg.Add(1)
defer func() {
@@ -242,12 +247,12 @@ func (m *OrderManager) GetOrderInfo(ctx context.Context, exchangeName, orderID s
return order.Detail{}, err
}
err = m.orderStore.add(&result)
if err != nil && err != ErrOrdersAlreadyExists {
upsertResponse, err := m.orderStore.upsert(&result)
if err != nil {
return order.Detail{}, err
}
return result, nil
return upsertResponse.OrderDetails, nil
}
// validate ensures a submitted order is valid before adding to the manager
@@ -473,6 +478,18 @@ func (m *OrderManager) GetOrdersFiltered(f *order.Filter) ([]order.Detail, error
return m.orderStore.getFilteredOrders(f)
}
// GetOrdersActive returns a snapshot of all orders in the order store
// that have a status that indicates it's currently tradable
func (m *OrderManager) GetOrdersActive(f *order.Filter) ([]order.Detail, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.getActiveOrders(f), nil
}
// processSubmittedOrder adds a new order to the manager
func (m *OrderManager) processSubmittedOrder(newOrder *order.Submit, result order.SubmitResponse) (*OrderSubmitResponse, error) {
if !result.IsOrderPlaced {
@@ -554,7 +571,18 @@ func (m *OrderManager) processSubmittedOrder(newOrder *order.Submit, result orde
// processOrders iterates over all exchange orders via API
// and adds them to the internal order store
func (m *OrderManager) processOrders() {
exchanges := m.orderStore.exchangeManager.GetExchanges()
if !atomic.CompareAndSwapInt32(&m.processingOrders, 0, 1) {
return
}
defer func() {
atomic.StoreInt32(&m.processingOrders, 0)
}()
exchanges, err := m.orderStore.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.OrderMgr, "Order manager cannot get exchanges: %v", err)
return
}
var wg sync.WaitGroup
for i := range exchanges {
if !exchanges[i].GetAuthenticatedAPISupport(exchange.RestAuthentication) {
continue
@@ -585,6 +613,16 @@ func (m *OrderManager) processOrders() {
continue
}
filter := &order.Filter{
Exchange: exchanges[i].GetName(),
}
orders := m.orderStore.getActiveOrders(filter)
order.FilterOrdersByCurrencies(&orders, pairs)
requiresProcessing := make(map[string]bool, len(orders))
for x := range orders {
requiresProcessing[orders[x].InternalOrderID] = true
}
req := order.GetOrdersRequest{
Side: order.AnySide,
Type: order.AnyType,
@@ -593,32 +631,68 @@ func (m *OrderManager) processOrders() {
}
result, err := exchanges[i].GetActiveOrders(context.TODO(), &req)
if err != nil {
log.Warnf(log.OrderMgr,
log.Errorf(log.OrderMgr,
"Order manager: Unable to get active orders for %s and asset type %s: %s",
exchanges[i].GetName(),
supportedAssets[y],
err)
continue
}
if len(orders) == 0 && len(result) == 0 {
continue
}
for z := range result {
ord := &result[z]
result := m.orderStore.add(ord)
if result != ErrOrdersAlreadyExists {
msg := fmt.Sprintf("Order manager: Exchange %s added order ID=%v pair=%v price=%v amount=%v side=%v type=%v.",
ord.Exchange, ord.ID, ord.Pair, ord.Price, ord.Amount, ord.Side, ord.Type)
log.Debugf(log.OrderMgr, "%v", msg)
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: msg,
})
continue
upsertResponse, err := m.UpsertOrder(&result[z])
if err != nil {
log.Error(log.OrderMgr, err)
}
requiresProcessing[upsertResponse.OrderDetails.InternalOrderID] = false
}
if !exchanges[i].GetBase().GetSupportedFeatures().RESTCapabilities.GetOrder {
continue
}
wg.Add(1)
go m.processMatchingOrders(exchanges[i], orders, requiresProcessing, &wg)
}
}
wg.Wait()
}
func (m *OrderManager) processMatchingOrders(exch exchange.IBotExchange, orders []order.Detail, requiresProcessing map[string]bool, wg *sync.WaitGroup) {
defer func() {
if wg != nil {
wg.Done()
}
}()
for x := range orders {
if time.Since(orders[x].LastUpdated) < time.Minute {
continue
}
if requiresProcessing[orders[x].InternalOrderID] {
err := m.FetchAndUpdateExchangeOrder(exch, &orders[x], orders[x].AssetType)
if err != nil {
log.Error(log.OrderMgr, err)
}
}
}
}
// FetchAndUpdateExchangeOrder calls the exchange to upsert an order to the order store
func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item) error {
if ord == nil {
return errors.New("order manager: Order is nil")
}
fetchedOrder, err := exch.GetOrderInfo(context.TODO(), ord.ID, ord.Pair, assetType)
if err != nil {
ord.Status = order.UnknownStatus
return err
}
fetchedOrder.LastUpdated = time.Now()
_, err = m.UpsertOrder(&fetchedOrder)
return err
}
// Exists checks whether an order exists in the order store
func (m *OrderManager) Exists(o *order.Detail) bool {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
@@ -670,14 +744,50 @@ func (m *OrderManager) UpdateExistingOrder(od *order.Detail) error {
}
// UpsertOrder updates an existing order or adds a new one to the orderstore
func (m *OrderManager) UpsertOrder(od *order.Detail) error {
func (m *OrderManager) UpsertOrder(od *order.Detail) (resp *OrderUpsertResponse, err error) {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.upsert(od)
if od == nil {
return nil, errNilOrder
}
var msg string
defer func(message *string) {
if message == nil {
log.Errorf(log.OrderMgr, "UpsertOrder: produced nil order event message\n")
return
}
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: *message,
})
}(&msg)
upsertResponse, err := m.orderStore.upsert(od)
if err != nil {
msg = fmt.Sprintf(
"Order manager: Exchange %s unable to upsert order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v status=%v: %s",
od.Exchange, od.ID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type, od.Status, err)
return nil, err
}
status := "updated"
if upsertResponse.IsNewOrder {
status = "added"
}
msg = fmt.Sprintf("Order manager: Exchange %s %s order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v status=%v.",
upsertResponse.OrderDetails.Exchange, status, upsertResponse.OrderDetails.ID, upsertResponse.OrderDetails.InternalOrderID,
upsertResponse.OrderDetails.Pair, upsertResponse.OrderDetails.Price, upsertResponse.OrderDetails.Amount,
upsertResponse.OrderDetails.Side, upsertResponse.OrderDetails.Type, upsertResponse.OrderDetails.Status)
if upsertResponse.IsNewOrder {
log.Info(log.OrderMgr, msg)
return upsertResponse, nil
}
log.Debug(log.OrderMgr, msg)
return upsertResponse, nil
}
// get returns all orders for all exchanges
@@ -745,27 +855,45 @@ func (s *store) modifyExisting(id string, mod *order.Modify) error {
// upsert (1) checks if such an exchange exists in the exchangeManager, (2) checks if
// order exists and updates/creates it.
func (s *store) upsert(od *order.Detail) error {
func (s *store) upsert(od *order.Detail) (resp *OrderUpsertResponse, err error) {
if od == nil {
return nil, errNilOrder
}
lName := strings.ToLower(od.Exchange)
_, err := s.exchangeManager.GetExchangeByName(lName)
_, err = s.exchangeManager.GetExchangeByName(lName)
if err != nil {
return err
return nil, err
}
s.m.Lock()
defer s.m.Unlock()
r, ok := s.Orders[lName]
if !ok {
od.GenerateInternalOrderID()
s.Orders[lName] = []*order.Detail{od}
return nil
resp = &OrderUpsertResponse{
OrderDetails: od.Copy(),
IsNewOrder: true,
}
return resp, nil
}
for x := range r {
if r[x].ID == od.ID {
r[x].UpdateOrderFromDetail(od)
return nil
resp = &OrderUpsertResponse{
OrderDetails: r[x].Copy(),
IsNewOrder: false,
}
return resp, nil
}
}
// Untracked websocket orders will not have internalIDs yet
od.GenerateInternalOrderID()
s.Orders[lName] = append(s.Orders[lName], od)
return nil
resp = &OrderUpsertResponse{
OrderDetails: od.Copy(),
IsNewOrder: true,
}
return resp, nil
}
// getByExchange returns orders by exchange
@@ -827,16 +955,7 @@ func (s *store) add(det *order.Detail) error {
return ErrOrdersAlreadyExists
}
// Untracked websocket orders will not have internalIDs yet
if det.InternalOrderID == "" {
id, err := uuid.NewV4()
if err != nil {
log.Warnf(log.OrderMgr,
"Order manager: Unable to generate UUID. Err: %s",
err)
} else {
det.InternalOrderID = id.String()
}
}
det.GenerateInternalOrderID()
s.m.Lock()
defer s.m.Unlock()
orders := s.Orders[strings.ToLower(det.Exchange)]
@@ -877,3 +996,43 @@ func (s *store) getFilteredOrders(f *order.Filter) ([]order.Detail, error) {
}
return os, nil
}
// getActiveOrders returns copy of the orders that are active
func (s *store) getActiveOrders(f *order.Filter) []order.Detail {
s.m.RLock()
defer s.m.RUnlock()
var orders []order.Detail
switch {
case f == nil:
for _, e := range s.Orders {
for i := range e {
if !e[i].IsActive() {
continue
}
orders = append(orders, e[i].Copy())
}
}
case f.Exchange != "":
// optimization if Exchange is filtered
if e, ok := s.Orders[strings.ToLower(f.Exchange)]; ok {
for i := range e {
if !e[i].IsActive() || !e[i].MatchFilter(f) {
continue
}
orders = append(orders, e[i].Copy())
}
}
default:
for _, e := range s.Orders {
for i := range e {
if !e[i].IsActive() || !e[i].MatchFilter(f) {
continue
}
orders = append(orders, e[i].Copy())
}
}
}
return orders
}

View File

@@ -7,10 +7,13 @@ import (
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/common/convert"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
)
// omfExchange aka ordermanager fake exchange overrides exchange functions
@@ -29,8 +32,31 @@ func (f omfExchange) CancelOrder(ctx context.Context, o *order.Cancel) error {
// GetOrderInfo overrides testExchange's get order function
// to do the bare minimum required with no API calls or credentials required
func (f omfExchange) GetOrderInfo(ctx context.Context, orderID string, pair currency.Pair, assetType asset.Item) (order.Detail, error) {
if orderID == "" {
switch orderID {
case "":
return order.Detail{}, errors.New("")
case "Order1-unknown-to-active":
return order.Detail{
Exchange: testExchange,
Pair: currency.Pair{Base: currency.BTC, Quote: currency.USD},
AssetType: asset.Spot,
Amount: 1.0,
Side: order.Buy,
Status: order.Active,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order1-unknown-to-active",
}, nil
case "Order2-active-to-inactive":
return order.Detail{
Exchange: testExchange,
Pair: currency.Pair{Base: currency.BTC, Quote: currency.USD},
AssetType: asset.Spot,
Amount: 1.0,
Side: order.Sell,
Status: order.Cancelled,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order2-active-to-inactive",
}, nil
}
return order.Detail{
@@ -38,9 +64,24 @@ func (f omfExchange) GetOrderInfo(ctx context.Context, orderID string, pair curr
ID: orderID,
Pair: pair,
AssetType: assetType,
Status: order.Cancelled,
}, nil
}
// GetActiveOrders overrides the function used by processOrders to return 1 active order
func (f omfExchange) GetActiveOrders(ctx context.Context, req *order.GetOrdersRequest) ([]order.Detail, error) {
return []order.Detail{{
Exchange: testExchange,
Pair: currency.Pair{Base: currency.BTC, Quote: currency.USD},
AssetType: asset.Spot,
Amount: 2.0,
Side: order.Sell,
Status: order.Active,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order3-unknown-to-active",
}}, nil
}
func (f omfExchange) ModifyOrder(ctx context.Context, action *order.Modify) (order.Modify, error) {
ans := *action
ans.ID = "modified_order_id"
@@ -157,11 +198,7 @@ func OrdersSetup(t *testing.T) *OrderManager {
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
err = m.Start()
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
m.started = 1
return m
}
@@ -710,8 +747,155 @@ func TestOrderManager_Modify(t *testing.T) {
}
func TestProcessOrders(t *testing.T) {
m := OrdersSetup(t)
var wg sync.WaitGroup
em := SetupExchangeManager()
exch, err := em.NewExchangeByName(testExchange)
if err != nil {
t.Fatal(err)
}
exch.SetDefaults()
fakeExchange := omfExchange{
IBotExchange: exch,
}
em.Add(fakeExchange)
m, err := SetupOrderManager(em, &CommunicationManager{}, &wg, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
m.started = 1
pairs := currency.Pairs{
currency.Pair{Base: currency.BTC, Quote: currency.USD},
}
// Ensure processOrders() can run the REST calls to GetActiveOrders
// and to GetOrders
exch.GetBase().API = exchange.API{
AuthenticatedSupport: true,
AuthenticatedWebsocketSupport: false,
}
exch.GetBase().Features = exchange.Features{
Supports: exchange.FeaturesSupported{
REST: true,
RESTCapabilities: protocol.Features{
GetOrder: true,
},
},
}
exch.GetBase().CurrencyPairs = currency.PairsManager{
UseGlobalFormat: true,
RequestFormat: &currency.PairFormat{
Delimiter: "-",
Uppercase: true,
},
ConfigFormat: &currency.PairFormat{
Delimiter: "-",
Uppercase: true,
},
Pairs: map[asset.Item]*currency.PairStore{
asset.Spot: {
AssetEnabled: convert.BoolPtr(true),
Enabled: pairs,
Available: pairs,
},
},
}
exch.GetBase().Config = &config.ExchangeConfig{
CurrencyPairs: &currency.PairsManager{
UseGlobalFormat: true,
RequestFormat: &currency.PairFormat{
Delimiter: "-",
Uppercase: true,
},
ConfigFormat: &currency.PairFormat{
Delimiter: "-",
Uppercase: true,
},
Pairs: map[asset.Item]*currency.PairStore{
asset.Spot: {
AssetEnabled: convert.BoolPtr(true),
Enabled: pairs,
Available: pairs,
},
},
},
}
orders := []order.Detail{
{
Exchange: testExchange,
Pair: pairs[0],
AssetType: asset.Spot,
Amount: 1.0,
Side: order.Buy,
Status: order.UnknownStatus,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order1-unknown-to-active",
},
{
Exchange: testExchange,
Pair: pairs[0],
AssetType: asset.Spot,
Amount: 1.0,
Side: order.Sell,
Status: order.Active,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order2-active-to-inactive",
},
{
Exchange: testExchange,
Pair: pairs[0],
AssetType: asset.Spot,
Amount: 2.0,
Side: order.Sell,
Status: order.UnknownStatus,
LastUpdated: time.Now().Add(-time.Hour),
ID: "Order3-unknown-to-active",
},
}
for i := range orders {
if err = m.orderStore.add(&orders[i]); err != nil {
t.Error(err)
}
}
m.processOrders()
// Order1 is not returned by exch.GetActiveOrders()
// It will be fetched by exch.GetOrderInfo(), which will say it is active
res, err := m.GetOrdersFiltered(&order.Filter{ID: "Order1-unknown-to-active"})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 3 result, got: %d", len(res))
}
if res[0].Status != order.Active {
t.Errorf("Order 1 should be active, but status is %s", string(res[0].Status))
}
// Order2 is not returned by exch.GetActiveOrders()
// It will be fetched by exch.GetOrderInfo(), which will say it is cancelled
res, err = m.GetOrdersFiltered(&order.Filter{ID: "Order2-active-to-inactive"})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 1 result, got: %d", len(res))
}
if res[0].Status != order.Cancelled {
t.Errorf("Order 2 should be cancelled, but status is %s", string(res[0].Status))
}
// Order3 is returned by exch.GetActiveOrders(), which will say it is active
res, err = m.GetOrdersFiltered(&order.Filter{ID: "Order3-unknown-to-active"})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 1 result, got: %d", len(res))
}
if res[0].Status != order.Active {
t.Errorf("Order 3 should be active, but status is %s", string(res[0].Status))
}
}
func TestGetOrdersFiltered(t *testing.T) {
@@ -775,3 +959,187 @@ func Test_getFilteredOrders(t *testing.T) {
t.Errorf("Expected 1 result, got: %d", len(res))
}
}
func TestGetOrdersActive(t *testing.T) {
m := OrdersSetup(t)
var err error
orders := []order.Detail{
{
Exchange: testExchange,
Amount: 1.0,
Side: order.Buy,
Status: order.Cancelled,
ID: "Test1",
},
{
Exchange: testExchange,
Amount: 1.0,
Side: order.Sell,
Status: order.Active,
ID: "Test2",
},
}
for i := range orders {
if err = m.orderStore.add(&orders[i]); err != nil {
t.Error(err)
}
}
res, err := m.GetOrdersActive(nil)
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("TestGetOrdersActive - Expected 1 result, got: %d", len(res))
}
res, err = m.GetOrdersActive(&order.Filter{Side: order.Sell})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("TestGetOrdersActive - Expected 1 result, got: %d", len(res))
}
res, err = m.GetOrdersActive(&order.Filter{Side: order.Buy})
if err != nil {
t.Error(err)
}
if len(res) != 0 {
t.Errorf("TestGetOrdersActive - Expected 0 results, got: %d", len(res))
}
}
func Test_processMatchingOrders(t *testing.T) {
m := OrdersSetup(t)
exch, err := m.orderStore.exchangeManager.GetExchangeByName(testExchange)
if err != nil {
t.Fatal(err)
}
orders := []order.Detail{
{
Exchange: testExchange,
ID: "Test1",
LastUpdated: time.Now(),
},
{
Exchange: testExchange,
ID: "Test2",
LastUpdated: time.Now(),
},
{
Exchange: testExchange,
ID: "Test3",
LastUpdated: time.Now().Add(-time.Hour),
},
{
Exchange: testExchange,
ID: "Test4",
LastUpdated: time.Now().Add(-time.Hour),
},
}
requiresProcessing := make(map[string]bool, len(orders))
for i := range orders {
orders[i].GenerateInternalOrderID()
if i%2 == 0 {
requiresProcessing[orders[i].InternalOrderID] = false
} else {
requiresProcessing[orders[i].InternalOrderID] = true
}
}
var wg sync.WaitGroup
wg.Add(1)
m.processMatchingOrders(exch, orders, requiresProcessing, &wg)
wg.Wait()
res, err := m.GetOrdersFiltered(&order.Filter{Exchange: testExchange})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 1 result, got: %d", len(res))
}
if res[0].ID != "Test4" {
t.Error("Order Test4 should have been fetched and updated")
}
}
func TestFetchAndUpdateExchangeOrder(t *testing.T) {
m := OrdersSetup(t)
exch, err := m.orderStore.exchangeManager.GetExchangeByName(testExchange)
if err != nil {
t.Fatal(err)
}
err = m.FetchAndUpdateExchangeOrder(exch, nil, asset.Spot)
if err == nil {
t.Error("Error expected when order is nil")
}
o := &order.Detail{
Exchange: testExchange,
Amount: 1.0,
Side: order.Sell,
Status: order.Active,
ID: "Test",
}
err = m.FetchAndUpdateExchangeOrder(exch, o, asset.Spot)
if err != nil {
t.Error(err)
}
if o.Status != order.Active {
t.Error("Order should be active")
}
res, err := m.GetOrdersFiltered(&order.Filter{Exchange: testExchange})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 1 result, got: %d", len(res))
}
o.Status = order.PartiallyCancelled
err = m.FetchAndUpdateExchangeOrder(exch, o, asset.Spot)
if err != nil {
t.Error(err)
}
res, err = m.GetOrdersFiltered(&order.Filter{Exchange: testExchange})
if err != nil {
t.Error(err)
}
if len(res) != 1 {
t.Errorf("Expected 1 result, got: %d", len(res))
}
}
func Test_getActiveOrders(t *testing.T) {
m := OrdersSetup(t)
var err error
orders := []order.Detail{
{
Exchange: testExchange,
Amount: 1.0,
Side: order.Buy,
Status: order.Cancelled,
ID: "Test1",
},
{
Exchange: testExchange,
Amount: 1.0,
Side: order.Sell,
Status: order.Active,
ID: "Test2",
},
}
for i := range orders {
if err = m.orderStore.add(&orders[i]); err != nil {
t.Error(err)
}
}
res := m.orderStore.getActiveOrders(nil)
if len(res) != 1 {
t.Errorf("Test_getActiveOrders - Expected 1 result, got: %d", len(res))
}
res = m.orderStore.getActiveOrders(&order.Filter{Side: order.Sell})
if len(res) != 1 {
t.Errorf("Test_getActiveOrders - Expected 1 result, got: %d", len(res))
}
res = m.orderStore.getActiveOrders(&order.Filter{Side: order.Buy})
if len(res) != 0 {
t.Errorf("Test_getActiveOrders - Expected 0 results, got: %d", len(res))
}
}

View File

@@ -22,6 +22,7 @@ var (
errNilCommunicationsManager = errors.New("cannot start with nil communications manager")
// ErrOrderIDCannotBeEmpty occurs when an order does not have an ID
ErrOrderIDCannotBeEmpty = errors.New("orderID cannot be empty")
errNilOrder = errors.New("nil order received")
)
type orderManagerConfig struct {
@@ -45,11 +46,12 @@ type store struct {
// OrderManager processes and stores orders across enabled exchanges
type OrderManager struct {
started int32
shutdown chan struct{}
orderStore store
cfg orderManagerConfig
verbose bool
started int32
processingOrders int32
shutdown chan struct{}
orderStore store
cfg orderManagerConfig
verbose bool
}
// OrderSubmitResponse contains the order response along with an internal order ID
@@ -57,3 +59,10 @@ type OrderSubmitResponse struct {
order.SubmitResponse
InternalOrderID string
}
// OrderUpsertResponse contains a copy of the resulting order details and a bool
// indicating if the order details were inserted (true) or updated (false)
type OrderUpsertResponse struct {
OrderDetails order.Detail
IsNewOrder bool
}

View File

@@ -143,7 +143,11 @@ func (m *portfolioManager) processPortfolio() {
value)
}
d := m.getExchangeAccountInfo(m.exchangeManager.GetExchanges())
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.PortfolioMgr, "Portfolio manager cannot get exchanges: %v", err)
}
d := m.getExchangeAccountInfo(exchanges)
m.seedExchangeAccountInfo(d)
atomic.CompareAndSwapInt32(&m.processing, 1, 0)
}

View File

@@ -482,7 +482,10 @@ func (s *RPCServer) GetOrderbook(ctx context.Context, r *gctrpc.GetOrderbookRequ
// GetOrderbooks returns a list of orderbooks for all enabled exchanges and all
// enabled currency pairs
func (s *RPCServer) GetOrderbooks(ctx context.Context, _ *gctrpc.GetOrderbooksRequest) (*gctrpc.GetOrderbooksResponse, error) {
exchanges := s.ExchangeManager.GetExchanges()
exchanges, err := s.ExchangeManager.GetExchanges()
if err != nil {
return nil, err
}
var obResponse []*gctrpc.Orderbooks
var obs []*gctrpc.OrderbookResponse
for x := range exchanges {

View File

@@ -1015,10 +1015,7 @@ func TestGetOrders(t *testing.T) {
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
err = om.Start()
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
om.started = 1
s := RPCServer{Engine: &Engine{ExchangeManager: em, OrderManager: om}}
p := &gctrpc.CurrencyPair{
@@ -1126,7 +1123,7 @@ func TestGetOrder(t *testing.T) {
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
err = om.Start()
om.started = 1
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
@@ -1656,10 +1653,7 @@ func TestGetManagedOrders(t *testing.T) {
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
err = om.Start()
if !errors.Is(err, nil) {
t.Errorf("received '%v', expected '%v'", err, nil)
}
om.started = 1
s := RPCServer{Engine: &Engine{ExchangeManager: em, OrderManager: om}}
p := &gctrpc.CurrencyPair{

View File

@@ -42,7 +42,7 @@ var (
// iExchangeManager limits exposure of accessible functions to exchange manager
// so that subsystems can use some functionality
type iExchangeManager interface {
GetExchanges() []exchange.IBotExchange
GetExchanges() ([]exchange.IBotExchange, error)
GetExchangeByName(string) (exchange.IBotExchange, error)
}

View File

@@ -104,7 +104,10 @@ func (m *syncManager) Start() error {
m.initSyncWG.Add(1)
m.inService.Done()
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer started.")
exchanges := m.exchangeManager.GetExchanges()
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
return err
}
for x := range exchanges {
exchangeName := exchanges[x].GetName()
supportsWebsocket := exchanges[x].SupportsWebsocket()
@@ -454,7 +457,10 @@ func (m *syncManager) worker() {
defer cleanup()
for atomic.LoadInt32(&m.started) != 0 {
exchanges := m.exchangeManager.GetExchanges()
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.SyncMgr, "Sync manager cannot get exchanges: %v", err)
}
for x := range exchanges {
exchangeName := exchanges[x].GetName()
supportsREST := exchanges[x].SupportsREST()

View File

@@ -81,7 +81,10 @@ func (m *websocketRoutineManager) websocketRoutine() {
if m.verbose {
log.Debugln(log.WebsocketMgr, "Connecting exchange websocket services...")
}
exchanges := m.exchangeManager.GetExchanges()
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.WebsocketMgr, "websocket routine manager cannot get exchanges: %v", err)
}
for i := range exchanges {
go func(i int) {
if exchanges[i].SupportsWebsocket() {