Files
gocryptotrader/engine/currency_state_manager.go
Ryan O'Hara-Reid 5dfbbf84de engine/exchanges: Add exchange currency state subsystem (#774)
* state: Add management system (init)

* linter: fix

* engine: gofmt

* gct: after merge fixup

* documentation: add

* rpc: implement services for testing

* gctcli: gofmt state_management.go

* documentation: reinstate lost information

* state: Add pair check to determine trading operation

* exchanges: add interface for specific state scoped subsystem functionality

* engine/order_man: reduce code footprint using new method

* RPC: implement pair trading request and change exported name to something specific to state

* engine: add tests

* engine: Add to withdraw manager

* documentation: reinstate soxipy in contrib. list

* engine: const fake name

* Glorious: NITERINOS

* merge: fix issues

* engine: csm incorporate service name into log output

* engine: fix linter issues

* gct: fix tests

* currencystate: remove management type

* rpc: fix tests

* backtester: fix tests

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/currencystate/currency_state.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/alert/alert.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/alert/alert.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* config: integrate with config and remove flag delay adjustment

* gctcli: fix issues after name changes

* engine: gofmt manager file

* Update engine/rpcserver.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* engine: Add enable/disable manager functions, add default popoulation for potential assets

* linter: fix

* engine/test: bump subsystem count

* Update engine/currency_state_manager.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/bithumb/bithumb.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits addressed

* alert: fix commenting for its generalized purpose

* glorious: nits

* engine: use standard string in log output

* bitfinex: apply patch, thanks @thrasher-

* bitfinex: fix spelling

* engine/currencystate: Add logs/fix logs

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
2021-09-27 13:33:49 +10:00

277 lines
7.8 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/currencystate"
"github.com/thrasher-corp/gocryptotrader/gctrpc"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
// CurrencyStateManagementName defines the manager name string
CurrencyStateManagementName = "currency_state_manager"
// DefaultStateManagerDelay defines the default duration when the manager
// fetches and updates each exchange for its currency state
DefaultStateManagerDelay = time.Minute
)
var enabled = &gctrpc.GenericResponse{Status: "enabled"}
// CurrencyStateManager manages currency states
type CurrencyStateManager struct {
started int32
shutdown chan struct{}
wg sync.WaitGroup
iExchangeManager
sleep time.Duration
}
// SetupCurrencyStateManager applies configuration parameters before running
func SetupCurrencyStateManager(interval time.Duration, em iExchangeManager) (*CurrencyStateManager, error) {
if em == nil {
return nil, errNilExchangeManager
}
var c CurrencyStateManager
if interval <= 0 {
log.Warnf(log.ExchangeSys,
"Currency state manager interval is invalid, defaulting to: %s",
DefaultStateManagerDelay)
interval = DefaultStateManagerDelay
}
c.sleep = interval
c.iExchangeManager = em
c.shutdown = make(chan struct{})
return &c, nil
}
// Start runs the subsystem
func (c *CurrencyStateManager) Start() error {
log.Debugln(log.ExchangeSys, "Currency state manager starting...")
if c == nil {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrNilSubsystem)
}
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemAlreadyStarted)
}
c.wg.Add(1)
go c.monitor()
log.Debugln(log.ExchangeSys, "Currency state manager started.")
return nil
}
// Stop stops the subsystem
func (c *CurrencyStateManager) Stop() error {
if c == nil {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrNilSubsystem)
}
if atomic.LoadInt32(&c.started) == 0 {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
log.Debugf(log.ExchangeSys, "Currency state manager %s", MsgSubSystemShuttingDown)
close(c.shutdown)
c.wg.Wait()
c.shutdown = make(chan struct{})
log.Debugf(log.ExchangeSys, "Currency state manager %s", MsgSubSystemShutdown)
atomic.StoreInt32(&c.started, 0)
return nil
}
// IsRunning safely checks whether the subsystem is running
func (c *CurrencyStateManager) IsRunning() bool {
if c == nil {
return false
}
return atomic.LoadInt32(&c.started) == 1
}
func (c *CurrencyStateManager) monitor() {
defer c.wg.Done()
timer := time.NewTimer(0) // Prime firing of channel for initial sync.
for {
select {
case <-c.shutdown:
return
case <-timer.C:
var wg sync.WaitGroup
exchs, err := c.GetExchanges()
if err != nil {
log.Errorf(log.Global,
"Currency state manager failed to get exchanges error: %v",
err)
}
for x := range exchs {
wg.Add(1)
go c.update(exchs[x], &wg, exchs[x].GetAssetTypes(true))
}
wg.Wait() // This causes some variability in the timer due to
// longest length of request time. Can do time.Ticker but don't
// want routines to stack behind, this is more uniform.
timer.Reset(c.sleep)
}
}
}
func (c *CurrencyStateManager) update(exch exchange.IBotExchange, wg *sync.WaitGroup, enabledAssets asset.Items) {
defer wg.Done()
for y := range enabledAssets {
err := exch.UpdateCurrencyStates(context.TODO(), enabledAssets[y])
if err != nil {
if errors.Is(err, common.ErrNotYetImplemented) {
// Deploy default values for outbound gRPC aspects.
var pairs currency.Pairs
pairs, err = exch.GetAvailablePairs(enabledAssets[y])
if err != nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
return
}
// Deploys a full spectrum supported list for the currency states
update := map[currency.Code]currencystate.Options{}
for x := range pairs {
update[pairs[x].Base] = currencystate.Options{}
update[pairs[x].Quote] = currencystate.Options{}
}
b := exch.GetBase()
if b == nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
"cannot update because base is nil")
return
}
err = b.States.UpdateAll(enabledAssets[y], update)
if err != nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
}
return
}
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
}
}
}
// GetAllRPC returns a full snapshot of currency states, whether they are able
// to be withdrawn, deposited or traded on an exchange for RPC.
func (c *CurrencyStateManager) GetAllRPC(exchName string) (*gctrpc.CurrencyStateResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
sh, err := exch.GetCurrencyStateSnapshot()
if err != nil {
return nil, err
}
var resp = &gctrpc.CurrencyStateResponse{}
for x := range sh {
resp.CurrencyStates = append(resp.CurrencyStates, &gctrpc.CurrencyState{
Currency: sh[x].Code.String(),
Asset: sh[x].Asset.String(),
WithdrawEnabled: sh[x].Withdraw == nil || *sh[x].Withdraw,
DepositEnabled: sh[x].Deposit == nil || *sh[x].Deposit,
TradingEnabled: sh[x].Trade == nil || *sh[x].Trade,
})
}
return resp, nil
}
// CanWithdrawRPC determines if the currency code is operational for withdrawal
// from an exchange for RPC
func (c *CurrencyStateManager) CanWithdrawRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanWithdraw(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanDepositRPC determines if the currency code is operational for depositing
// to an exchange for RPC
func (c *CurrencyStateManager) CanDepositRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanDeposit(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanTradeRPC determines if the currency code is operational for trading for
// RPC
func (c *CurrencyStateManager) CanTradeRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanTrade(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanTradePairRPC determines if the pair is operational for trading for RPC
func (c *CurrencyStateManager) CanTradePairRPC(exchName string, pair currency.Pair, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanTradePair(pair, a)
if err != nil {
return nil, err
}
return enabled, nil
}