Files
gocryptotrader/engine/currency_state_manager.go
Scott fcc5ad4551 exchanges/qa: Add exchange wrapper testing suite (#1159)
* initial concept of a nice validation tester for exchanges

* adds some datahandler design

* expand testing

* more tests and fixes

* minor end of day fix for bithumb

* fixes implementation issues

* more test coverage and improvements, but not sure if i should continue

* fix more wrapper implementations

* adds error type, more fixes

* changes signature, fixes implementations

* fixes more wrapper implementations

* one more bit

* more cleanup

* WOW things work?

* lintle 1/1337

* mini bump

* fixes all linting

* neaten

* GetOrderInfo+ asset pair fixes+improvements

* adds new websocket test

* expand ws testing

* fix bug, expand tests, improve implementation

* code coverage of a lot of new codes

* fixes everything

* reverts accidental changes

* minor fixes from reviewing code

* removes Bitfinex cancelBatchOrder implementation

* fixes dumb baby typo for babies

* mini nit fixes

* so many nits to address

* addresses all the nits

* Titlecase

* switcheroo

* removes websocket testing for now

* fix appveyor, minor test fix

* fixes typo, re-kindles killed kode

* skip binance wrapper tests when running CI

* expired context, huobi okx fixes

* kodespull

* fix ordering

* time fix because why not

* fix exmo, others

* hopefully this fixes all of my life's problems

* last thing today

* huobi, more like hypotrophy

* golangci-lint, more like mypooroldknee-splint

* fix huobi times by removing them

* should fix okx currency issues

* blocks the application

* adds last little contingency for pairs

* addresses most nits and new problems

* lovely fixed before seeing why okx sucks

* fixes issues with okx websocket

* the classic receieieivaier

* lintle

* adds test and fixes existing tests

* expands error handling messages during setup

* fixes dumb okx bugs introduced

* quick fix for lint and exmo

* fixes nixes

* fix exmo deposit issue

* lint

* fixes issue with extra asset runs missing

* fix surprise race

* all the lint and merge fixes

* fixes surprise bugs in OKx

* fixes issues with times and chains

* fixing all the merge stuff

* merge fix

* rm logs and a panic potential

* lovely lint lament

* an easy demonstration of scenario, but not of initial purpose

* put it in the bin

* Revert "put it in the bin"

This reverts commit 15c6490f713233d43f10957367fcbf18e3818bdd.

* re-add after immediate error popup

* fix mini poor test design

* okx okay

* merge fixes

* fixes issues discovered in lovely test

* I FORGOT TO COMMIT THIS

* nit fixaroonaboo

* forgoetten test fix

* revert old okx asset intrument work

* fixes

* revert problems I didnt understand. update bybit

* fix merge bugs

* test cleanup

* further improvements

* reshuffle and lint

* rm redundant CI_TEST by rm the CI_TEST field that is redundant

* path fix

* move to its own section, dont run on 32 bit + appveyor

* lint

* fix lbank

* address nits

* let it rip

* fix failing test time range

* niteroo boogaloo

* mod tidy, use common.SimpleTimeFormat
2023-07-03 11:09:43 +10:00

277 lines
7.8 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/currencystate"
"github.com/thrasher-corp/gocryptotrader/gctrpc"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
// CurrencyStateManagementName defines the manager name string
CurrencyStateManagementName = "currency_state_manager"
// DefaultStateManagerDelay defines the default duration when the manager
// fetches and updates each exchange for its currency state
DefaultStateManagerDelay = time.Minute
)
var enabled = &gctrpc.GenericResponse{Status: "enabled"}
// CurrencyStateManager manages currency states
type CurrencyStateManager struct {
started int32
shutdown chan struct{}
wg sync.WaitGroup
iExchangeManager
sleep time.Duration
}
// SetupCurrencyStateManager applies configuration parameters before running
func SetupCurrencyStateManager(interval time.Duration, em iExchangeManager) (*CurrencyStateManager, error) {
if em == nil {
return nil, errNilExchangeManager
}
var c CurrencyStateManager
if interval <= 0 {
log.Warnf(log.ExchangeSys,
"Currency state manager interval is invalid, defaulting to: %s",
DefaultStateManagerDelay)
interval = DefaultStateManagerDelay
}
c.sleep = interval
c.iExchangeManager = em
c.shutdown = make(chan struct{})
return &c, nil
}
// Start runs the subsystem
func (c *CurrencyStateManager) Start() error {
log.Debugln(log.ExchangeSys, "Currency state manager starting...")
if c == nil {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrNilSubsystem)
}
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemAlreadyStarted)
}
c.wg.Add(1)
go c.monitor()
log.Debugln(log.ExchangeSys, "Currency state manager started.")
return nil
}
// Stop stops the subsystem
func (c *CurrencyStateManager) Stop() error {
if c == nil {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrNilSubsystem)
}
if atomic.LoadInt32(&c.started) == 0 {
return fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
log.Debugf(log.ExchangeSys, "Currency state manager %s", MsgSubSystemShuttingDown)
close(c.shutdown)
c.wg.Wait()
c.shutdown = make(chan struct{})
log.Debugf(log.ExchangeSys, "Currency state manager %s", MsgSubSystemShutdown)
atomic.StoreInt32(&c.started, 0)
return nil
}
// IsRunning safely checks whether the subsystem is running
func (c *CurrencyStateManager) IsRunning() bool {
if c == nil {
return false
}
return atomic.LoadInt32(&c.started) == 1
}
func (c *CurrencyStateManager) monitor() {
defer c.wg.Done()
timer := time.NewTimer(0) // Prime firing of channel for initial sync.
for {
select {
case <-c.shutdown:
return
case <-timer.C:
var wg sync.WaitGroup
exchs, err := c.GetExchanges()
if err != nil {
log.Errorf(log.Global,
"Currency state manager failed to get exchanges error: %v",
err)
}
for x := range exchs {
wg.Add(1)
go c.update(exchs[x], &wg, exchs[x].GetAssetTypes(true))
}
wg.Wait() // This causes some variability in the timer due to the
// longest length of request time. Can do time.Ticker but don't
// want routines to stack behind, this is more uniform.
timer.Reset(c.sleep)
}
}
}
func (c *CurrencyStateManager) update(exch exchange.IBotExchange, wg *sync.WaitGroup, enabledAssets asset.Items) {
defer wg.Done()
for y := range enabledAssets {
err := exch.UpdateCurrencyStates(context.TODO(), enabledAssets[y])
if err != nil {
if errors.Is(err, common.ErrNotYetImplemented) {
// Deploy default values for outbound gRPC aspects.
var pairs currency.Pairs
pairs, err = exch.GetAvailablePairs(enabledAssets[y])
if err != nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
return
}
// Deploys a full spectrum supported list for the currency states
update := map[currency.Code]currencystate.Options{}
for x := range pairs {
update[pairs[x].Base] = currencystate.Options{}
update[pairs[x].Quote] = currencystate.Options{}
}
b := exch.GetBase()
if b == nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
"cannot update because base is nil")
return
}
err = b.States.UpdateAll(enabledAssets[y], update)
if err != nil {
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
}
return
}
log.Errorf(log.ExchangeSys, "Currency state manager %s %s: %v",
exch.GetName(),
enabledAssets[y],
err)
}
}
}
// GetAllRPC returns a full snapshot of currency states, whether they are able
// to be withdrawn, deposited or traded on an exchange for RPC.
func (c *CurrencyStateManager) GetAllRPC(exchName string) (*gctrpc.CurrencyStateResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
sh, err := exch.GetCurrencyStateSnapshot()
if err != nil {
return nil, err
}
var resp = &gctrpc.CurrencyStateResponse{}
for x := range sh {
resp.CurrencyStates = append(resp.CurrencyStates, &gctrpc.CurrencyState{
Currency: sh[x].Code.String(),
Asset: sh[x].Asset.String(),
WithdrawEnabled: sh[x].Withdraw == nil || *sh[x].Withdraw,
DepositEnabled: sh[x].Deposit == nil || *sh[x].Deposit,
TradingEnabled: sh[x].Trade == nil || *sh[x].Trade,
})
}
return resp, nil
}
// CanWithdrawRPC determines if the currency code is operational for withdrawal
// from an exchange for RPC
func (c *CurrencyStateManager) CanWithdrawRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanWithdraw(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanDepositRPC determines if the currency code is operational for depositing
// to an exchange for RPC
func (c *CurrencyStateManager) CanDepositRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanDeposit(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanTradeRPC determines if the currency code is operational for trading for
// RPC
func (c *CurrencyStateManager) CanTradeRPC(exchName string, cc currency.Code, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanTrade(cc, a)
if err != nil {
return nil, err
}
return enabled, nil
}
// CanTradePairRPC determines if the pair is operational for trading for RPC
func (c *CurrencyStateManager) CanTradePairRPC(exchName string, pair currency.Pair, a asset.Item) (*gctrpc.GenericResponse, error) {
if !c.IsRunning() {
return nil, fmt.Errorf("%s %w", CurrencyStateManagementName, ErrSubSystemNotStarted)
}
exch, err := c.GetExchangeByName(exchName)
if err != nil {
return nil, err
}
err = exch.CanTradePair(pair, a)
if err != nil {
return nil, err
}
return enabled, nil
}