Files
gocryptotrader/engine/portfolio_manager.go
Ryan O'Hara-Reid 11da520dc8 Currency: Add additional functionality, refactor and improvements (#881)
* currency: Add method to derive pair

* currency: Add method to lower entire charset but used the slice copy and returned that. This will change the original, just gotta see if this is an issue, but the slice usually goes out of scope anyway.

* currency/pairs: add filter method

* currency: add function to derive select currencies from currency pairs

* currency/engine: slight adjustments

* currency: fix linter issue also shift burden of proof to caller instead of repair, more performant.

* currency: more linter

* pairs: optimize; reduce allocs/op and B/op

* currency: Add in function 'NewPairsFromString' for testing purposes

* currency: don't suppress error

* currency: stop panic on empty currency code

* currency: Add helper method to match currencies between exchanges

* currency: fixed my bad spelling

* currency: Implement stable coin checks, refactored base code methods, optimized upper and lower case strings for currency code/pairs

* currency: add pairs method to derive stable coins from internal list.

* Currency: Cleanup, fix tests.

* engine/exchanges/currency: fix whoops

* Currency: force govet no copy on Item datatype

* Currency: fix naughty linter issues

* exchange: revert change

* currency/config: fix config upgrade mistake

* currency: re-implement currency sub-systems

* *RetrieveConfigCurrencyPairs removed
*CheckCurrencyConfigValues to only provide warnings, add additional support when, disable when support is lost or not available and set default values.
*Drop Cryptocurrencies from configuration as this is not needed.
*Drop REST Poll delay field as this was unused.
*Update default values for currencyFileUpdateDuration & foreignExchangeUpdateDuration.
*Allow Role to be marshalled for file type.
*Refactor RunUpdater to verify and check config values and set default running foreign exchange provider.

* currency: cleanup

* currency: change match -> equal for comparison which is more of a standard and little easier to find

* currency: address nits

* currency: fix whoops

* currency: Add some more pairs methods

* currency: linter issues

* currency: RM unused field

* currency: rm verbose

* currency: fix word

* currency: gocritic

* currency: fix another whoopsie

* example_config: default to show log system name

* Currency: Force all support packages to use Equal method for comparison as there is a small comparison bug when checking upper and lower casing, this has a more of a pronounced impact between exchanges and client instances of currency generation

* currency: fix log name

* ordermanager: fix potential panic

* currency: small optim.

* engine: display correct bool and force shutdown

* currency: add function and fix regression
* Change ConvertCurrency -> ConvertFiat to be more precise
* ADD GetForeignExchangeRate to get specific exchange rate for fiat pair
* Fix currency display and formatting regression and tied in with config.Currency fields

* engine: fix tests

* currency: return the amount when no conversion needs to take place

* currency: reduce method name

* currency: Address nits glorious nits

* currency: fix linter

* currency: addr nits

* currency: check underlying role in test

* gct: change to EMPTYCODE and EMPTYPAIR across codebase

* currency: fix nits

* currency: this fixes test race but this issue has not been resolved. Please see: https://trello.com/c/54eizOIo/143-currency-package-upgrades

* currency: Add temp dir for testing

* Update engine/engine.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* documentation: update and regen

* currency: Address niterinos

* currency: Add test case for config upgrade when falling over to exchange rate host as default from exchangeRates provider

* currency: addr nits

* currency: fix whoops

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
2022-02-17 16:24:57 +11:00

340 lines
9.5 KiB
Go

package engine
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/gocryptotrader/portfolio"
)
// PortfolioManagerName is an exported subsystem name
const PortfolioManagerName = "portfolio"
var (
// PortfolioSleepDelay defines the default sleep time between portfolio manager runs
PortfolioSleepDelay = time.Minute
)
// portfolioManager routinely retrieves a user's holdings through exchange APIs as well
// as through addresses provided in the config
type portfolioManager struct {
started int32
processing int32
portfolioManagerDelay time.Duration
exchangeManager *ExchangeManager
shutdown chan struct{}
base *portfolio.Base
m sync.Mutex
}
// setupPortfolioManager creates a new portfolio manager
func setupPortfolioManager(e *ExchangeManager, portfolioManagerDelay time.Duration, cfg *portfolio.Base) (*portfolioManager, error) {
if e == nil {
return nil, errNilExchangeManager
}
if portfolioManagerDelay <= 0 {
portfolioManagerDelay = PortfolioSleepDelay
}
if cfg == nil {
cfg = &portfolio.Base{Addresses: []portfolio.Address{}}
}
m := &portfolioManager{
portfolioManagerDelay: portfolioManagerDelay,
exchangeManager: e,
shutdown: make(chan struct{}),
base: cfg,
}
return m, nil
}
// IsRunning safely checks whether the subsystem is running
func (m *portfolioManager) IsRunning() bool {
if m == nil {
return false
}
return atomic.LoadInt32(&m.started) == 1
}
// Start runs the subsystem
func (m *portfolioManager) Start(wg *sync.WaitGroup) error {
if m == nil {
return fmt.Errorf("portfolio manager %w", ErrNilSubsystem)
}
if wg == nil {
return errNilWaitGroup
}
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return fmt.Errorf("portfolio manager %w", ErrSubSystemAlreadyStarted)
}
log.Debugf(log.PortfolioMgr, "Portfolio manager %s", MsgSubSystemStarting)
m.shutdown = make(chan struct{})
go m.run(wg)
return nil
}
// Stop attempts to shutdown the subsystem
func (m *portfolioManager) Stop() error {
if m == nil {
return fmt.Errorf("portfolio manager %w", ErrNilSubsystem)
}
if !atomic.CompareAndSwapInt32(&m.started, 1, 0) {
return fmt.Errorf("portfolio manager %w", ErrSubSystemNotStarted)
}
defer func() {
atomic.CompareAndSwapInt32(&m.started, 1, 0)
}()
log.Debugf(log.PortfolioMgr, "Portfolio manager %s", MsgSubSystemShuttingDown)
close(m.shutdown)
return nil
}
// run periodically will check and update portfolio holdings
func (m *portfolioManager) run(wg *sync.WaitGroup) {
log.Debugln(log.PortfolioMgr, "Portfolio manager started.")
wg.Add(1)
tick := time.NewTicker(m.portfolioManagerDelay)
defer func() {
tick.Stop()
wg.Done()
log.Debugf(log.PortfolioMgr, "Portfolio manager shutdown.")
}()
go m.processPortfolio()
for {
select {
case <-m.shutdown:
return
case <-tick.C:
go m.processPortfolio()
}
}
}
// processPortfolio updates portfolio holdings
func (m *portfolioManager) processPortfolio() {
if !atomic.CompareAndSwapInt32(&m.processing, 0, 1) {
return
}
m.m.Lock()
defer m.m.Unlock()
data := m.base.GetPortfolioGroupedCoin()
for key, value := range data {
err := m.base.UpdatePortfolio(value, key)
if err != nil {
log.Errorf(log.PortfolioMgr,
"PortfolioWatcher error %s for currency %s\n",
err,
key)
continue
}
log.Debugf(log.PortfolioMgr,
"Portfolio manager: Successfully updated address balance for %s address(es) %s\n",
key,
value)
}
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.PortfolioMgr, "Portfolio manager cannot get exchanges: %v", err)
}
d := m.getExchangeAccountInfo(exchanges)
m.seedExchangeAccountInfo(d)
atomic.CompareAndSwapInt32(&m.processing, 1, 0)
}
// seedExchangeAccountInfo seeds account info
func (m *portfolioManager) seedExchangeAccountInfo(accounts []account.Holdings) {
if len(accounts) == 0 {
return
}
for x := range accounts {
exchangeName := accounts[x].Exchange
var currencies []account.Balance
for y := range accounts[x].Accounts {
for z := range accounts[x].Accounts[y].Currencies {
var update bool
for i := range currencies {
if accounts[x].Accounts[y].Currencies[z].CurrencyName.Equal(currencies[i].CurrencyName) {
currencies[i].Hold += accounts[x].Accounts[y].Currencies[z].Hold
currencies[i].TotalValue += accounts[x].Accounts[y].Currencies[z].TotalValue
update = true
}
}
if update {
continue
}
currencies = append(currencies, account.Balance{
CurrencyName: accounts[x].Accounts[y].Currencies[z].CurrencyName,
TotalValue: accounts[x].Accounts[y].Currencies[z].TotalValue,
Hold: accounts[x].Accounts[y].Currencies[z].Hold,
})
}
}
for x := range currencies {
currencyName := currencies[x].CurrencyName
total := currencies[x].TotalValue
if !m.base.ExchangeAddressExists(exchangeName, currencyName) {
if total <= 0 {
continue
}
log.Debugf(log.PortfolioMgr, "Portfolio: Adding new exchange address: %s, %s, %f, %s\n",
exchangeName,
currencyName,
total,
portfolio.ExchangeAddress)
m.base.Addresses = append(
m.base.Addresses,
portfolio.Address{Address: exchangeName,
CoinType: currencyName,
Balance: total,
Description: portfolio.ExchangeAddress})
} else {
if total <= 0 {
log.Debugf(log.PortfolioMgr, "Portfolio: Removing %s %s entry.\n",
exchangeName,
currencyName)
m.base.RemoveExchangeAddress(exchangeName, currencyName)
} else {
balance, ok := m.base.GetAddressBalance(exchangeName,
portfolio.ExchangeAddress,
currencyName)
if !ok {
continue
}
if balance != total {
log.Debugf(log.PortfolioMgr, "Portfolio: Updating %s %s entry with balance %f.\n",
exchangeName,
currencyName,
total)
m.base.UpdateExchangeAddressBalance(exchangeName,
currencyName,
total)
}
}
}
}
}
}
// getExchangeAccountInfo returns all the current enabled exchanges
func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExchange) []account.Holdings {
var response []account.Holdings
for x := range exchanges {
if exchanges[x] == nil || !exchanges[x].IsEnabled() {
continue
}
if !exchanges[x].GetAuthenticatedAPISupport(exchange.RestAuthentication) {
if m.base.Verbose {
log.Debugf(log.PortfolioMgr,
"skipping %s due to disabled authenticated API support.\n",
exchanges[x].GetName())
}
continue
}
assetTypes := exchanges[x].GetAssetTypes(false) // left as available for now, to sync the full spectrum
var exchangeHoldings account.Holdings
for y := range assetTypes {
accountHoldings, err := exchanges[x].FetchAccountInfo(context.TODO(), assetTypes[y])
if err != nil {
log.Errorf(log.PortfolioMgr,
"Error encountered retrieving exchange account info for %s. Error %s\n",
exchanges[x].GetName(),
err)
continue
}
for z := range accountHoldings.Accounts {
accountHoldings.Accounts[z].AssetType = assetTypes[y]
}
exchangeHoldings.Exchange = exchanges[x].GetName()
exchangeHoldings.Accounts = append(exchangeHoldings.Accounts, accountHoldings.Accounts...)
}
response = append(response, exchangeHoldings)
}
return response
}
// AddAddress adds a new portfolio address for the portfolio manager to track
func (m *portfolioManager) AddAddress(address, description string, coinType currency.Code, balance float64) error {
if m == nil {
return fmt.Errorf("portfolio manager %w", ErrNilSubsystem)
}
if !m.IsRunning() {
return fmt.Errorf("portfolio manager %w", ErrSubSystemNotStarted)
}
m.m.Lock()
defer m.m.Unlock()
return m.base.AddAddress(address, description, coinType, balance)
}
// RemoveAddress removes a portfolio address
func (m *portfolioManager) RemoveAddress(address, description string, coinType currency.Code) error {
if m == nil {
return fmt.Errorf("portfolio manager %w", ErrNilSubsystem)
}
if !m.IsRunning() {
return fmt.Errorf("portfolio manager %w", ErrSubSystemNotStarted)
}
m.m.Lock()
defer m.m.Unlock()
return m.base.RemoveAddress(address, description, coinType)
}
// GetPortfolioSummary returns a summary of all portfolio holdings
func (m *portfolioManager) GetPortfolioSummary() portfolio.Summary {
if m == nil || !m.IsRunning() {
return portfolio.Summary{}
}
return m.base.GetPortfolioSummary()
}
// GetAddresses returns all addresses
func (m *portfolioManager) GetAddresses() []portfolio.Address {
if m == nil || !m.IsRunning() {
return nil
}
return m.base.Addresses
}
// GetPortfolio returns a copy of the internal portfolio base for
// saving addresses to the config
func (m *portfolioManager) GetPortfolio() *portfolio.Base {
if m == nil || !m.IsRunning() {
return nil
}
resp := m.base
return resp
}
// IsWhiteListed checks if an address is whitelisted to withdraw to
func (m *portfolioManager) IsWhiteListed(address string) bool {
if m == nil || !m.IsRunning() {
return false
}
return m.base.IsWhiteListed(address)
}
// IsExchangeSupported checks if an exchange is supported
func (m *portfolioManager) IsExchangeSupported(exchange, address string) bool {
if m == nil || !m.IsRunning() {
return false
}
return m.base.IsExchangeSupported(exchange, address)
}