Files
gocryptotrader/engine/order_manager.go
cranktakular fd9aaf00a2 Coinbase: Update exchange implementation (#1480)
* Slight enhance of Coinbase tests

Continual enhance of Coinbase tests

The revamp continues

Oh jeez the Orderbook part's unfinished don't look

Coinbase revamp, Orderbook still unfinished

* Coinbase revamp; CreateReport is still WIP

* More coinbase improvements; onto sandbox testing

* Coinbase revamp continues

* Coinbase revamp continues

* Coinbasepro revamp is ceaseless

* Coinbase revamp, starting on advanced trade API

* Coinbase Advanced Trade Starts in Ernest

V3 done, onto V2

Coinbase revamp nears completion

Coinbase revamp nears completion

Test commit should fail

Coinbase revamp nears completion

* Coinbase revamp stage wrapper

* Coinbase wrapper coherence continues

* Coinbase wrapper continues writhing

* Coinbase wrapper & codebase cleanup

* Coinbase updates & wrap progress

* More Coinbase wrapper progress

* Wrapper is wrapped, kinda

* Test & type checking

* Coinbase REST revamp finished

* Post-merge fix

* WS revamp begins

* WS Main Revamp Done?

* CB websocket tidying up

* Coinbase WS wrapperupperer

* Coinbase revamp done??

* Linter progress

* Continued lint cleanup

* Further lint cleanup

* Increased lint coverage

* Does this fix all sloppy reassigns & shadowing?

* Undoing retry policy change

* Documentation regeneration

* Coinbase code improvements

* Providing warning about known issue

* Updating an error to new format

* Making gocritic happy

* Review adherence

* Endpoints moved to V3 & nil pointer fixes

* Removing seemingly superfluous constant

* Glorious improvements

* Removing unused error

* Partial public endpoint addition

* Slight improvements

* Wrapper improvements; still a few errors left in other packages

* A lil Coinbase progress

* Json cleaning

* Lint appeasement

* Config repair

* Config fix (real)

* Little fix

* New public endpoint incorporation

* Additional fixes

* Improvements & Appeasements

* LineSaver

* Additional fixes

* Another fix

* Fixing picked nits

* Quick fixies

* Lil fixes

* Subscriptions: Add List.Enabled

* CoinbasePro: Add subscription templating

* fixup! CoinbasePro: Add subscription templating

* fixup! CoinbasePro: Add subscription templating

* Comment fix

* Subsequent fixes

* Issues hopefully fixed

* Lint fix

* Glorious fixes

* Json formatting

* ShazNits

* (L/N)i(n/)t

* Adding a test

* Tiny test improvement

* Template patch testing

* Fixes

* Further shaznits

* Lint nit

* JWT move and other fixes

* Small nits

* Shaznit, singular

* Post-merge fix

* Post-merge fixes

* Typo fix

* Some glorious nits

* Required changes

* Stop going

* Alias attempt

* Alias fix & test cleanup

* Test fix

* GetDepositAddress logic improvement

* Status update: Fixed

* Lint fix

* Happy birthday to PR 1480

* Cleanups

* Necessary nit corrections

* Fixing sillybug

* As per request

* Programming progress

* Order fixes

* Further fixies

* Test fix

* Pre-merge fixes

* More shaznits

* Context

* Sonic error handling

* Import fix

* Better Sonic error handling

* Perfect Sonic error handling?

* F purge

* Coinbase improvements

* API Update Conformity

* Coinbase continuation

* Coinbase order improvements

* Coinbase order improvements

* CreateOrderConfig improvements

* Managing API updates

* Coinbase API update progression

* jwt rename

* Comment link fix

* Coinbase v2 cleanup

* Post-merge fixes

* Review fixes

* GK's suggestions

* Linter fix

* Minor gbjk fixes

* Nit fixes

* Merge fix

* Lint fixes

* Coinbase rename stage 1

* Coinbase rename stage 2

* Coinbase rename stage 3

* Coinbase rename stage 4

* Coinbase rename final fix

* Coinbase: PoC on converting to request structs

* Applying requested changes

* Many review fixes, handled

* Thrashed by nits

* More minor modifications

* The last nit!?

---------

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
2025-09-16 13:37:00 +10:00

1184 lines
35 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gofrs/uuid"
"github.com/shopspring/decimal"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/communications/base"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/currencystate"
"github.com/thrasher-corp/gocryptotrader/exchanges/fundingrate"
"github.com/thrasher-corp/gocryptotrader/exchanges/futures"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/log"
)
// SetupOrderManager will boot up the OrderManager
func SetupOrderManager(exchangeManager iExchangeManager, communicationsManager iCommsManager, wg *sync.WaitGroup, cfg *config.OrderManager) (*OrderManager, error) {
if exchangeManager == nil {
return nil, errNilExchangeManager
}
if communicationsManager == nil {
return nil, errNilCommunicationsManager
}
if wg == nil {
return nil, errNilWaitGroup
}
if cfg == nil {
return nil, fmt.Errorf("%w OrderManager", errNilConfig)
}
if cfg.ActivelyTrackFuturesPositions && cfg.FuturesTrackingSeekDuration <= 0 {
return nil, errInvalidFuturesTrackingSeekDuration
}
om := &OrderManager{
shutdown: make(chan struct{}),
activelyTrackFuturesPositions: cfg.ActivelyTrackFuturesPositions,
futuresPositionSeekDuration: cfg.FuturesTrackingSeekDuration,
respectOrderHistoryLimits: cfg.RespectOrderHistoryLimits,
orderStore: store{
Orders: make(map[string][]*order.Detail),
exchangeManager: exchangeManager,
commsManager: communicationsManager,
wg: wg,
futuresPositionController: futures.SetupPositionController(),
},
verbose: cfg.Verbose,
cfg: orderManagerConfig{
CancelOrdersOnShutdown: cfg.CancelOrdersOnShutdown,
},
}
return om, nil
}
// IsRunning safely checks whether the subsystem is running
func (m *OrderManager) IsRunning() bool {
return m != nil && atomic.LoadInt32(&m.started) == 1
}
// Start runs the subsystem
func (m *OrderManager) Start() error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return fmt.Errorf("order manager %w", ErrSubSystemAlreadyStarted)
}
log.Debugln(log.OrderMgr, "Order manager starting...")
m.shutdown = make(chan struct{})
m.orderStore.wg.Add(1)
go m.run()
return nil
}
// Stop attempts to shutdown the subsystem
func (m *OrderManager) Stop() error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
log.Debugln(log.OrderMgr, "Order manager shutting down...")
close(m.shutdown)
atomic.CompareAndSwapInt32(&m.started, 1, 0)
return nil
}
// gracefulShutdown cancels all orders (if enabled) before shutting down
func (m *OrderManager) gracefulShutdown() {
if !m.cfg.CancelOrdersOnShutdown {
return
}
log.Debugln(log.OrderMgr, "Cancelling any open orders...")
exchanges, err := m.orderStore.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.OrderMgr, "Order manager cannot get exchanges: %v", err)
return
}
m.CancelAllOrders(context.TODO(), exchanges)
}
// run will periodically process orders
func (m *OrderManager) run() {
log.Debugln(log.OrderMgr, "Order manager started.")
m.processOrders()
for {
select {
case <-m.shutdown:
m.gracefulShutdown()
m.orderStore.wg.Done()
log.Debugln(log.OrderMgr, "Order manager shutdown.")
return
case <-time.After(orderManagerInterval):
// Process orders go routine allows shutdown procedures to continue
go m.processOrders()
}
}
}
// CancelAllOrders iterates and cancels all orders for each exchange provided
func (m *OrderManager) CancelAllOrders(ctx context.Context, exchanges []exchange.IBotExchange) {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return
}
allOrders := m.orderStore.get()
if len(allOrders) == 0 {
return
}
for i := range exchanges {
orders, ok := allOrders[strings.ToLower(exchanges[i].GetName())]
if !ok {
continue
}
for j := range orders {
log.Debugf(log.OrderMgr, "Cancelling order(s) for exchange %s.", exchanges[i].GetName())
cancel, err := orders[j].DeriveCancel()
if err != nil {
log.Errorln(log.OrderMgr, err)
continue
}
err = m.Cancel(ctx, cancel)
if err != nil {
log.Errorln(log.OrderMgr, err)
}
}
}
}
// Cancel will find the order in the OrderManager, send a cancel request
// to the exchange and if successful, update the status of the order
func (m *OrderManager) Cancel(ctx context.Context, cancel *order.Cancel) error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
var err error
defer func() {
if err != nil {
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: err.Error(),
})
}
}()
if cancel == nil {
err = errors.New("order cancel param is nil")
return err
}
if cancel.Exchange == "" {
err = errors.New("order exchange name is empty")
return err
}
if cancel.OrderID == "" {
err = errors.New("order id is empty")
return err
}
exch, err := m.orderStore.exchangeManager.GetExchangeByName(cancel.Exchange)
if err != nil {
return err
}
if cancel.AssetType.String() != "" && !exch.GetAssetTypes(false).Contains(cancel.AssetType) {
return fmt.Errorf("%w %v", asset.ErrNotSupported, cancel.AssetType)
}
log.Debugf(log.OrderMgr, "Cancelling order ID %v [%+v]",
cancel.OrderID, cancel)
err = exch.CancelOrder(ctx, cancel)
if err != nil {
err = fmt.Errorf("%v - Failed to cancel order: %w", cancel.Exchange, err)
return err
}
od, err := m.orderStore.getByExchangeAndID(cancel.Exchange, cancel.OrderID)
if err != nil {
err = fmt.Errorf("%v - Failed to retrieve order %v to update cancelled status: %w",
cancel.Exchange, cancel.OrderID, err)
return err
}
od.Status = order.Cancelled
err = m.orderStore.updateExisting(od)
if err != nil {
err = fmt.Errorf("%v - Failed to update existing order when cancelled: %w", cancel.Exchange, err)
return err
}
msg := fmt.Sprintf("Exchange %s order ID=%v cancelled.",
od.Exchange, od.OrderID)
log.Debugln(log.OrderMgr, msg)
m.orderStore.commsManager.PushEvent(base.Event{Type: "order", Message: msg})
return nil
}
// GetFuturesPositionsForExchange returns futures positions stored within
// the order manager's futures position tracker that match the provided params
func (m *OrderManager) GetFuturesPositionsForExchange(exch string, item asset.Item, pair currency.Pair) ([]futures.Position, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if !item.IsFutures() {
return nil, fmt.Errorf("%v %w", item, futures.ErrNotFuturesAsset)
}
return m.orderStore.futuresPositionController.GetPositionsForExchange(exch, item, pair)
}
// GetOpenFuturesPosition returns an open futures position stored within
// the order manager's futures position tracker that match the provided params
func (m *OrderManager) GetOpenFuturesPosition(exch string, item asset.Item, pair currency.Pair) (*futures.Position, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if !item.IsFutures() {
return nil, fmt.Errorf("%v %w", item, futures.ErrNotFuturesAsset)
}
if !m.activelyTrackFuturesPositions {
return nil, errFuturesTrackingDisabled
}
return m.orderStore.futuresPositionController.GetOpenPosition(exch, item, pair)
}
// GetAllOpenFuturesPositions returns all open futures positions stored within
// the order manager's futures position tracker that match the provided params
func (m *OrderManager) GetAllOpenFuturesPositions() ([]futures.Position, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if !m.activelyTrackFuturesPositions {
return nil, errFuturesTrackingDisabled
}
return m.orderStore.futuresPositionController.GetAllOpenPositions()
}
// ClearFuturesTracking will clear existing futures positions for a given exchange,
// asset, pair for the event that positions have not been tracked accurately
func (m *OrderManager) ClearFuturesTracking(exch string, item asset.Item, pair currency.Pair) error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if !item.IsFutures() {
return fmt.Errorf("%v %w", item, futures.ErrNotFuturesAsset)
}
return m.orderStore.futuresPositionController.ClearPositionsForExchange(exch, item, pair)
}
// UpdateOpenPositionUnrealisedPNL finds an open position from
// an exchange asset pair, then calculates the unrealisedPNL
// using the latest ticker data
func (m *OrderManager) UpdateOpenPositionUnrealisedPNL(e string, item asset.Item, pair currency.Pair, last float64, updated time.Time) (decimal.Decimal, error) {
if m == nil {
return decimal.Zero, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return decimal.Zero, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if !item.IsFutures() {
return decimal.Zero, fmt.Errorf("%v %w", item, futures.ErrNotFuturesAsset)
}
return m.orderStore.futuresPositionController.UpdateOpenPositionUnrealisedPNL(e, item, pair, last, updated)
}
// GetOrderInfo calls the exchange's wrapper GetOrderInfo function
// and stores the result in the order manager
func (m *OrderManager) GetOrderInfo(ctx context.Context, exchangeName, orderID string, cp currency.Pair, a asset.Item) (order.Detail, error) {
if m == nil {
return order.Detail{}, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return order.Detail{}, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if orderID == "" {
return order.Detail{}, ErrOrderIDCannotBeEmpty
}
exch, err := m.orderStore.exchangeManager.GetExchangeByName(exchangeName)
if err != nil {
return order.Detail{}, err
}
result, err := exch.GetOrderInfo(ctx, orderID, cp, a)
if err != nil {
return order.Detail{}, err
}
upsertResponse, err := m.orderStore.upsert(result)
if err != nil {
return order.Detail{}, err
}
return upsertResponse.OrderDetails, nil
}
// validate ensures a submitted order is valid before adding to the manager
func (m *OrderManager) validate(exch exchange.IBotExchange, newOrder *order.Submit) error {
if newOrder == nil {
return errNilOrder
}
if newOrder.Exchange == "" {
return common.ErrExchangeNameNotSet
}
if err := newOrder.Validate(exch.GetTradingRequirements()); err != nil {
return fmt.Errorf("order manager: %w", err)
}
if m.cfg.EnforceLimitConfig {
if !m.cfg.AllowMarketOrders && newOrder.Type == order.Market {
return errors.New("order market type is not allowed")
}
if m.cfg.LimitAmount > 0 && newOrder.Amount > m.cfg.LimitAmount {
return errors.New("order limit exceeds allowed limit")
}
if len(m.cfg.AllowedExchanges) > 0 &&
!common.StringSliceCompareInsensitive(m.cfg.AllowedExchanges, newOrder.Exchange) {
return errors.New("order exchange not found in allowed list")
}
if len(m.cfg.AllowedPairs) > 0 && !m.cfg.AllowedPairs.Contains(newOrder.Pair, true) {
return errors.New("order pair not found in allowed list")
}
}
return nil
}
// Modify depends on the order.Modify.ID and order.Modify.Exchange fields to uniquely
// identify an order to modify.
func (m *OrderManager) Modify(ctx context.Context, mod *order.Modify) (*order.ModifyResponse, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
// Fetch details from locally managed order store.
det, err := m.orderStore.getByExchangeAndID(mod.Exchange, mod.OrderID)
if det == nil || err != nil {
return nil, fmt.Errorf("order does not exist: %w", err)
}
// Populate additional Modify fields as some of them are required by various
// exchange implementations.
mod.Pair = det.Pair
mod.Side = det.Side
mod.TimeInForce = det.TimeInForce
// Following is just a precaution to not modify orders by mistake if exchange
// implementations do not check fields of the Modify struct for zero values.
if mod.Amount == 0 {
mod.Amount = det.Amount
}
if mod.Price == 0 {
mod.Price = det.Price
}
// Get exchange instance and submit order modification request.
exch, err := m.orderStore.exchangeManager.GetExchangeByName(mod.Exchange)
if err != nil {
return nil, err
}
res, err := exch.ModifyOrder(ctx, mod)
if err != nil {
message := fmt.Sprintf(
"Exchange %s order ID=%v: failed to modify",
mod.Exchange,
mod.OrderID,
)
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: message,
})
return nil, err
}
// If modification is successful, apply changes to local order store.
//
// XXX: This comes with a race condition, because [request -> changes] are not
// atomic.
err = m.orderStore.modifyExisting(mod.OrderID, res)
// Notify observers.
var message string
if err != nil {
message = "Exchange %s order ID=%v: modified on exchange, but failed to modify locally"
} else {
message = "Exchange %s order ID=%v: modified successfully"
}
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: fmt.Sprintf(message, mod.Exchange, res.OrderID),
})
return &order.ModifyResponse{OrderID: res.OrderID}, err
}
// Submit will take in an order struct, send it to the exchange and
// populate it in the OrderManager if successful
func (m *OrderManager) Submit(ctx context.Context, newOrder *order.Submit) (*OrderSubmitResponse, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if newOrder == nil {
return nil, errNilOrder
}
exch, err := m.orderStore.exchangeManager.GetExchangeByName(newOrder.Exchange)
if err != nil {
return nil, err
}
err = m.validate(exch, newOrder)
if err != nil {
return nil, err
}
// Checks for exchange min max limits for order amounts before order
// execution can occur
err = exch.CheckOrderExecutionLimits(newOrder.AssetType,
newOrder.Pair,
newOrder.Price,
newOrder.Amount,
newOrder.Type)
if err != nil && errors.Is(err, currencystate.ErrCurrencyStateNotFound) {
return nil, fmt.Errorf("order manager: exchange %s unable to place order: %w",
newOrder.Exchange,
err)
}
// Determines if current trading activity is turned off by the exchange for
// the currency pair
err = exch.CanTradePair(newOrder.Pair, newOrder.AssetType)
if err != nil {
return nil, fmt.Errorf("order manager: exchange %s cannot trade pair %s %s: %w",
newOrder.Exchange,
newOrder.Pair,
newOrder.AssetType,
err)
}
result, err := exch.SubmitOrder(ctx, newOrder)
if err != nil {
return nil, err
}
return m.processSubmittedOrder(result)
}
// SubmitFakeOrder runs through the same process as order submission
// but does not touch live endpoints
func (m *OrderManager) SubmitFakeOrder(newOrder *order.Submit, resultingOrder *order.SubmitResponse, checkExchangeLimits bool) (*OrderSubmitResponse, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if newOrder == nil {
return nil, errNilOrder
}
exch, err := m.orderStore.exchangeManager.GetExchangeByName(newOrder.Exchange)
if err != nil {
return nil, err
}
err = m.validate(exch, newOrder)
if err != nil {
return nil, err
}
if checkExchangeLimits {
// Checks for exchange min max limits for order amounts before order
// execution can occur
err = exch.CheckOrderExecutionLimits(newOrder.AssetType,
newOrder.Pair,
newOrder.Price,
newOrder.Amount,
newOrder.Type)
if err != nil {
return nil, fmt.Errorf("order manager: exchange %s unable to place order: %w",
newOrder.Exchange,
err)
}
}
return m.processSubmittedOrder(resultingOrder)
}
// GetOrdersSnapshot returns a snapshot of all orders in the orderstore. It optionally filters any orders that do not match the status
// but a status of "" or ANY will include all
// the time adds contexts for when the snapshot is relevant for
func (m *OrderManager) GetOrdersSnapshot(s order.Status) []order.Detail {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return nil
}
var os []order.Detail
for _, v := range m.orderStore.Orders {
for i := range v {
if s != v[i].Status && s != order.AnyStatus && s != order.UnknownStatus {
continue
}
os = append(os, *v[i])
}
}
return os
}
// GetOrdersFiltered returns a snapshot of all orders in the order store.
// Filtering is applied based on the order.Filter unless entries are empty
func (m *OrderManager) GetOrdersFiltered(f *order.Filter) ([]order.Detail, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.getFilteredOrders(f)
}
// GetOrdersActive returns a snapshot of all orders in the order store
// that have a status that indicates it's currently tradable
func (m *OrderManager) GetOrdersActive(f *order.Filter) ([]order.Detail, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.getActiveOrders(f), nil
}
// processSubmittedOrder adds a new order to the manager
func (m *OrderManager) processSubmittedOrder(newOrderResp *order.SubmitResponse) (*OrderSubmitResponse, error) {
if newOrderResp == nil {
return nil, order.ErrOrderDetailIsNil
}
id, err := uuid.NewV4()
if err != nil {
log.Warnf(log.OrderMgr, "Unable to generate UUID. Err: %s", err)
}
detail, err := newOrderResp.DeriveDetail(id)
if err != nil {
return nil, err
}
if err := m.orderStore.add(detail.CopyToPointer()); errors.Is(err, ErrOrdersAlreadyExists) {
// Streamed by ws before we got here. Details from ws supersede since they are more recent.
detail = m.orderStore.getByDetail(detail)
} else if err != nil {
// Non-fatal error: Unable to store order, but error does not need to be returned to caller
log.Errorf(log.OrderMgr, "Unable to add %v order %v to orderStore: %s", detail.Exchange, detail.OrderID, err)
}
msg := fmt.Sprintf("Exchange %s submitted order ID=%v [Ours: %v] pair=%v price=%v amount=%v quoteAmount=%v side=%v type=%v for time %v.",
detail.Exchange,
detail.OrderID,
detail.InternalOrderID.String(),
detail.Pair,
detail.Price,
detail.Amount,
detail.QuoteAmount,
detail.Side,
detail.Type,
detail.Date)
log.Debugln(log.OrderMgr, msg)
if m.orderStore.commsManager != nil {
m.orderStore.commsManager.PushEvent(base.Event{Type: "order", Message: msg})
}
return &OrderSubmitResponse{Detail: detail, InternalOrderID: detail.InternalOrderID.String()}, nil
}
// processOrders iterates over all exchange orders via API
// and adds them to the internal order store
func (m *OrderManager) processOrders() {
if !atomic.CompareAndSwapInt32(&m.processingOrders, 0, 1) {
return
}
defer atomic.StoreInt32(&m.processingOrders, 0)
exchanges, err := m.orderStore.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.OrderMgr, "order manager cannot get exchanges: %v", err)
return
}
var wg sync.WaitGroup
for x := range exchanges {
if !exchanges[x].IsRESTAuthenticationSupported() {
continue
}
if m.verbose {
log.Debugf(log.OrderMgr,
"Processing orders for exchange %v",
exchanges[x].GetName())
}
enabledAssets := exchanges[x].GetAssetTypes(true)
for y := range enabledAssets {
var pairs currency.Pairs
pairs, err = exchanges[x].GetEnabledPairs(enabledAssets[y])
if err != nil {
log.Errorf(log.OrderMgr,
"Unable to get enabled pairs for %s and asset type %s: %s",
exchanges[x].GetName(),
enabledAssets[y],
err)
continue
}
if len(pairs) == 0 {
if m.verbose {
log.Debugf(log.OrderMgr,
"No pairs enabled for %s and asset type %s, skipping...",
exchanges[x].GetName(),
enabledAssets[y])
}
continue
}
filter := &order.Filter{Exchange: exchanges[x].GetName()}
orders := m.orderStore.getActiveOrders(filter)
order.FilterOrdersByPairs(&orders, pairs)
var result []order.Detail
result, err = exchanges[x].GetActiveOrders(context.TODO(), &order.MultiOrderRequest{
Side: order.AnySide,
Type: order.AnyType,
Pairs: pairs,
AssetType: enabledAssets[y],
})
if err != nil {
log.Errorf(log.OrderMgr,
"Unable to get active orders for %s and asset type %s: %s",
exchanges[x].GetName(),
enabledAssets[y],
err)
continue
}
for z := range result {
var upsertResponse *OrderUpsertResponse
upsertResponse, err = m.UpsertOrder(&result[z])
if err != nil {
log.Errorln(log.OrderMgr, err)
continue
}
for i := range orders {
if orders[i].InternalOrderID != upsertResponse.OrderDetails.InternalOrderID {
continue
}
orders[i] = orders[len(orders)-1]
orders = orders[:len(orders)-1]
break
}
}
if exchanges[x].GetBase().GetSupportedFeatures().RESTCapabilities.GetOrder {
wg.Add(1)
go m.processMatchingOrders(exchanges[x], orders, &wg)
}
supportedFeatures := exchanges[x].GetSupportedFeatures()
if m.activelyTrackFuturesPositions && enabledAssets[y].IsFutures() && supportedFeatures.FuturesCapabilities.OrderManagerPositionTracking {
var positions []futures.PositionResponse
var sd time.Time
sd, err = m.orderStore.futuresPositionController.LastUpdated()
if err != nil {
log.Errorln(log.OrderMgr, err)
return
}
if sd.IsZero() {
sd = time.Now().Add(-m.futuresPositionSeekDuration)
}
positions, err = exchanges[x].GetFuturesPositionOrders(context.TODO(), &futures.PositionsRequest{
Asset: enabledAssets[y],
Pairs: pairs,
StartDate: sd,
RespectOrderHistoryLimits: m.respectOrderHistoryLimits,
})
if err != nil {
if !errors.Is(err, common.ErrNotYetImplemented) {
log.Errorln(log.OrderMgr, err)
}
return
}
for z := range positions {
if len(positions[z].Orders) == 0 {
continue
}
err = m.processFuturesPositions(exchanges[x], &positions[z])
if err != nil {
log.Errorf(log.OrderMgr, "unable to process future positions for %v %v %v. err: %v", exchanges[x].GetName(), positions[z].Asset, positions[z].Pair, err)
}
}
}
}
}
wg.Wait()
if m.verbose {
log.Debugf(log.OrderMgr, "Finished processing orders")
}
}
// processFuturesPositions ensures any open position found is kept up to date in the order manager
func (m *OrderManager) processFuturesPositions(exch exchange.IBotExchange, position *futures.PositionResponse) error {
if !m.activelyTrackFuturesPositions {
return errFuturesTrackingDisabled
}
if exch == nil {
return fmt.Errorf("%w IBotExchange", common.ErrNilPointer)
}
if position == nil {
return fmt.Errorf("%w PositionResponse", common.ErrNilPointer)
}
if len(position.Orders) == 0 {
return fmt.Errorf("%w position for '%v' '%v' '%v' has no orders", errNilOrder, exch.GetName(), position.Asset, position.Pair)
}
sort.Slice(position.Orders, func(i, j int) bool {
return position.Orders[i].Date.Before(position.Orders[j].Date)
})
feat := exch.GetSupportedFeatures()
var err error
for i := range position.Orders {
err = m.orderStore.futuresPositionController.TrackNewOrder(&position.Orders[i])
if err != nil {
return err
}
}
_, err = m.orderStore.futuresPositionController.GetOpenPosition(exch.GetName(), position.Asset, position.Pair)
if err != nil {
if errors.Is(err, futures.ErrPositionNotFound) {
return nil
}
return err
}
tick, err := exch.GetCachedTicker(position.Pair, position.Asset)
if err != nil {
return fmt.Errorf("%w when fetching ticker data for %v %v %v", err, exch.GetName(), position.Asset, position.Pair)
}
_, err = m.UpdateOpenPositionUnrealisedPNL(exch.GetName(), position.Asset, position.Pair, tick.Last, tick.LastUpdated)
if err != nil {
return fmt.Errorf("%w when updating unrealised PNL for %v %v %v", err, exch.GetName(), position.Asset, position.Pair)
}
if !feat.FuturesCapabilities.FundingRates {
return nil
}
isPerp, err := exch.IsPerpetualFutureCurrency(position.Asset, position.Pair)
if err != nil {
return err
}
if !isPerp {
return nil
}
frp, err := exch.GetHistoricalFundingRates(context.TODO(), &fundingrate.HistoricalRatesRequest{
Asset: position.Asset,
Pair: position.Pair,
StartDate: position.Orders[0].Date,
EndDate: time.Now(),
IncludePayments: true,
IncludePredictedRate: true,
})
if err != nil {
return err
}
return m.orderStore.futuresPositionController.TrackFundingDetails(frp)
}
func (m *OrderManager) processMatchingOrders(exch exchange.IBotExchange, orders []order.Detail, wg *sync.WaitGroup) {
for x := range orders {
if time.Since(orders[x].LastUpdated) < time.Minute {
continue
}
err := m.FetchAndUpdateExchangeOrder(exch, &orders[x], orders[x].AssetType)
if err != nil {
log.Errorln(log.OrderMgr, err)
}
}
if wg != nil {
wg.Done()
}
}
// FetchAndUpdateExchangeOrder calls the exchange to upsert an order to the order store
func (m *OrderManager) FetchAndUpdateExchangeOrder(exch exchange.IBotExchange, ord *order.Detail, assetType asset.Item) error {
if ord == nil {
return errors.New("order manager: Order is nil")
}
fetchedOrder, err := exch.GetOrderInfo(context.TODO(), ord.OrderID, ord.Pair, assetType)
if err != nil {
ord.Status = order.UnknownStatus
return err
}
fetchedOrder.LastUpdated = time.Now()
_, err = m.UpsertOrder(fetchedOrder)
return err
}
// Exists checks whether an order exists in the order store
func (m *OrderManager) Exists(o *order.Detail) bool {
return m != nil && atomic.LoadInt32(&m.started) != 0 && m.orderStore.exists(o)
}
// Add adds an order to the orderstore
func (m *OrderManager) Add(o *order.Detail) error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.add(o)
}
// GetByExchangeAndID returns a copy of an order from an exchange if it matches the ID
func (m *OrderManager) GetByExchangeAndID(exchangeName, id string) (*order.Detail, error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.getByExchangeAndID(exchangeName, id)
}
// UpdateExistingOrder will update an existing order in the orderstore
func (m *OrderManager) UpdateExistingOrder(od *order.Detail) error {
if m == nil {
return fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
return m.orderStore.updateExisting(od)
}
// UpsertOrder updates an existing order or adds a new one to the orderstore
func (m *OrderManager) UpsertOrder(od *order.Detail) (resp *OrderUpsertResponse, err error) {
if m == nil {
return nil, fmt.Errorf("order manager %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return nil, fmt.Errorf("order manager %w", ErrSubSystemNotStarted)
}
if od == nil {
return nil, errNilOrder
}
var msg string
defer func(message *string) {
if message == nil {
log.Errorf(log.OrderMgr, "UpsertOrder: produced nil order event message\n")
return
}
m.orderStore.commsManager.PushEvent(base.Event{
Type: "order",
Message: *message,
})
}(&msg)
upsertResponse, err := m.orderStore.upsert(od)
if err != nil {
msg = fmt.Sprintf(
"Exchange %s unable to upsert order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v status=%v: %s",
od.Exchange, od.OrderID, od.InternalOrderID, od.Pair, od.Price, od.Amount, od.Side, od.Type, od.Status, err)
return nil, err
}
status := "updated"
if upsertResponse.IsNewOrder {
status = "added"
}
msg = fmt.Sprintf("Exchange %s %s order ID=%v internal ID=%v pair=%v price=%.8f amount=%.8f side=%v type=%v status=%v.",
upsertResponse.OrderDetails.Exchange, status, upsertResponse.OrderDetails.OrderID, upsertResponse.OrderDetails.InternalOrderID,
upsertResponse.OrderDetails.Pair, upsertResponse.OrderDetails.Price, upsertResponse.OrderDetails.Amount,
upsertResponse.OrderDetails.Side, upsertResponse.OrderDetails.Type, upsertResponse.OrderDetails.Status)
if upsertResponse.IsNewOrder {
log.Infoln(log.OrderMgr, msg)
return upsertResponse, nil
}
log.Debugln(log.OrderMgr, msg)
return upsertResponse, nil
}
// get returns a copy of all orders for all exchanges.
func (s *store) get() map[string][]*order.Detail {
orders := make(map[string][]*order.Detail)
s.m.Lock()
for k, val := range s.Orders {
orders[k] = order.CopyPointerOrderSlice(val)
}
s.m.Unlock()
return orders
}
// getByExchangeAndID returns a specific order by exchange and id
func (s *store) getByExchangeAndID(exch, id string) (*order.Detail, error) {
s.m.Lock()
defer s.m.Unlock()
r, ok := s.Orders[strings.ToLower(exch)]
if !ok {
return nil, ErrExchangeNotFound
}
for x := range r {
if r[x].OrderID == id {
return r[x].CopyToPointer(), nil
}
}
return nil, ErrOrderNotFound
}
// updateExisting checks if an order exists in the orderstore
// and then updates it
func (s *store) updateExisting(od *order.Detail) error {
if od == nil {
return errNilOrder
}
s.m.Lock()
defer s.m.Unlock()
r, ok := s.Orders[strings.ToLower(od.Exchange)]
if !ok {
return ErrExchangeNotFound
}
for x := range r {
if r[x].OrderID != od.OrderID {
continue
}
err := r[x].UpdateOrderFromDetail(od)
if err != nil {
return err
}
if !r[x].AssetType.IsFutures() {
return nil
}
err = s.futuresPositionController.TrackNewOrder(r[x])
if err != nil && !errors.Is(err, futures.ErrPositionClosed) {
return err
}
return nil
}
return ErrOrderNotFound
}
// modifyExisting depends on mod.Exchange and given ID to uniquely identify an order and
// modify it.
func (s *store) modifyExisting(id string, mod *order.ModifyResponse) error {
s.m.Lock()
defer s.m.Unlock()
r, ok := s.Orders[strings.ToLower(mod.Exchange)]
if !ok {
return ErrExchangeNotFound
}
for x := range r {
if r[x].OrderID != id {
continue
}
r[x].UpdateOrderFromModifyResponse(mod)
if !r[x].AssetType.IsFutures() {
return nil
}
err := s.futuresPositionController.TrackNewOrder(r[x])
if err != nil && !errors.Is(err, futures.ErrPositionClosed) {
return err
}
return nil
}
return ErrOrderNotFound
}
// upsert (1) checks if such an exchange exists in the exchangeManager, (2) checks if
// order exists and updates/creates it.
func (s *store) upsert(od *order.Detail) (*OrderUpsertResponse, error) {
if od == nil {
return nil, errNilOrder
}
lName := strings.ToLower(od.Exchange)
_, err := s.exchangeManager.GetExchangeByName(lName)
if err != nil {
return nil, err
}
s.m.Lock()
defer s.m.Unlock()
if od.AssetType.IsFutures() {
err = s.futuresPositionController.TrackNewOrder(od)
if err != nil && !errors.Is(err, futures.ErrPositionClosed) {
return nil, err
}
}
// TODO: Return pointer to slice because new orders we are accessing map
// twice for lookup.
exchangeOrders := s.Orders[lName]
for x := range exchangeOrders {
if exchangeOrders[x].OrderID != od.OrderID {
continue
}
err := exchangeOrders[x].UpdateOrderFromDetail(od)
if err != nil {
return nil, err
}
return &OrderUpsertResponse{
OrderDetails: exchangeOrders[x].Copy(),
IsNewOrder: false,
}, nil
}
// Untracked websocket orders will not have internalIDs yet
od.GenerateInternalOrderID()
s.Orders[lName] = append(s.Orders[lName], od)
return &OrderUpsertResponse{OrderDetails: od.Copy(), IsNewOrder: true}, nil
}
// exists verifies if the orderstore contains the provided order
func (s *store) exists(det *order.Detail) bool {
return s.getByDetail(det) != nil
}
// getByDetail fetches an order from the store and returns it
// returns nil if not found
func (s *store) getByDetail(det *order.Detail) *order.Detail {
if det == nil {
return nil
}
s.m.RLock()
defer s.m.RUnlock()
exchangeOrders := s.Orders[strings.ToLower(det.Exchange)]
for _, o := range exchangeOrders {
if o.OrderID == det.OrderID {
return o.CopyToPointer()
}
}
return nil
}
// Add Adds an order to the orderStore for tracking the lifecycle
func (s *store) add(det *order.Detail) error {
if det == nil {
return errNilOrder
}
name := strings.ToLower(det.Exchange)
if _, err := s.exchangeManager.GetExchangeByName(name); err != nil {
return err
}
s.m.Lock()
defer s.m.Unlock()
// Inline copy of getByDetail to avoid possible lock races
for _, o := range s.Orders[name] {
if o.OrderID == det.OrderID {
return ErrOrdersAlreadyExists
}
}
// Untracked websocket orders will not have internalIDs yet
det.GenerateInternalOrderID()
s.Orders[name] = append(s.Orders[name], det)
if !det.AssetType.IsFutures() {
return nil
}
return s.futuresPositionController.TrackNewOrder(det)
}
// getFilteredOrders returns a filtered copy of the orders
func (s *store) getFilteredOrders(f *order.Filter) ([]order.Detail, error) {
if f == nil {
return nil, errors.New("filter is nil")
}
s.m.RLock()
defer s.m.RUnlock()
var os []order.Detail
// optimization if Exchange is filtered
if f.Exchange != "" {
if e, ok := s.Orders[strings.ToLower(f.Exchange)]; ok {
for i := range e {
if !e[i].MatchFilter(f) {
continue
}
os = append(os, e[i].Copy())
}
}
} else {
for _, e := range s.Orders {
for i := range e {
if !e[i].MatchFilter(f) {
continue
}
os = append(os, e[i].Copy())
}
}
}
return os, nil
}
// getActiveOrders returns copy of the orders that are active
func (s *store) getActiveOrders(f *order.Filter) []order.Detail {
s.m.RLock()
defer s.m.RUnlock()
var orders []order.Detail
switch {
case f == nil:
for _, e := range s.Orders {
for i := range e {
if e[i].Status != order.UnknownStatus && !e[i].IsActive() {
continue
}
orders = append(orders, e[i].Copy())
}
}
case f.Exchange != "":
// optimization if Exchange is filtered
if e, ok := s.Orders[strings.ToLower(f.Exchange)]; ok {
for i := range e {
if e[i].Status != order.UnknownStatus && (!e[i].IsActive() || !e[i].MatchFilter(f)) {
continue
}
orders = append(orders, e[i].Copy())
}
}
default:
for _, e := range s.Orders {
for i := range e {
if e[i].Status != order.UnknownStatus && (!e[i].IsActive() || !e[i].MatchFilter(f)) {
continue
}
orders = append(orders, e[i].Copy())
}
}
}
return orders
}