account: storage, processing and method on balances update (#916)

* account: update account storage, retrieval and implement alert functionality when a currency change occurs.

* account: Add cancel channel

* account: remove old code

* account: don't embed mutex

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* account: addr nits

* account: Pull out test into indiv.

* account: Add test for update method

* account: add no change to test

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* glorious: nits

* account/portfolio: differentiate between asset type segregation and default to spot holdings.

* glorious: nits

* thrasher: nit

* ticker: fix spelling

* Update engine/portfolio_manager.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* thrasher: nits

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2022-05-02 14:01:03 +10:00
committed by GitHub
parent 21b3d6a6c9
commit 6a02097431
32 changed files with 724 additions and 174 deletions

View File

@@ -361,6 +361,8 @@ func ({{.Variable}} *{{.CapitalName}}) UpdateOrderbook(ctx context.Context, pair
// UpdateAccountInfo retrieves balances for all enabled currencies
func ({{.Variable}} *{{.CapitalName}}) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (account.Holdings, error) {
// If fetching requires more than one asset type please set
// HasAssetTypeAccountSegregation to true in RESTCapabilities above.
return account.Holdings{}, common.ErrNotYetImplemented
}

View File

@@ -414,6 +414,15 @@ func (e Errors) Error() string {
return r[:len(r)-2]
}
// Unwrap implements interface behaviour for errors.Is() matching NOTE: only
// returns first element.
func (e Errors) Unwrap() error {
if len(e) == 0 {
return nil
}
return e[0]
}
// StartEndTimeCheck provides some basic checks which occur
// frequently in the codebase
func StartEndTimeCheck(start, end time.Time) error {

View File

@@ -627,8 +627,9 @@ func TestErrors(t *testing.T) {
if test.Error() != "" {
t.Fatal("string should be nil")
}
test = append(test, errors.New("test1"))
if test.Error() != "test1" {
errTestOne := errors.New("test1")
test = append(test, errTestOne)
if !errors.Is(test, errTestOne) {
t.Fatal("does not match error")
}
test = append(test, errors.New("test2"))

View File

@@ -10,6 +10,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/gocryptotrader/portfolio"
)
@@ -76,6 +77,7 @@ func (m *portfolioManager) Start(wg *sync.WaitGroup) error {
log.Debugf(log.PortfolioMgr, "Portfolio manager %s", MsgSubSystemStarting)
m.shutdown = make(chan struct{})
wg.Add(1)
go m.run(wg)
return nil
}
@@ -100,21 +102,21 @@ func (m *portfolioManager) Stop() error {
// run periodically will check and update portfolio holdings
func (m *portfolioManager) run(wg *sync.WaitGroup) {
log.Debugln(log.PortfolioMgr, "Portfolio manager started.")
wg.Add(1)
tick := time.NewTicker(m.portfolioManagerDelay)
defer func() {
tick.Stop()
wg.Done()
log.Debugf(log.PortfolioMgr, "Portfolio manager shutdown.")
}()
go m.processPortfolio()
timer := time.NewTimer(0)
for {
select {
case <-m.shutdown:
if !timer.Stop() {
<-timer.C
}
wg.Done()
log.Debugf(log.PortfolioMgr, "Portfolio manager shutdown.")
return
case <-tick.C:
case <-timer.C:
// This is run in a go-routine to not prevent the application from
// shutting down.
go m.processPortfolio()
timer.Reset(m.portfolioManagerDelay)
}
}
}
@@ -126,6 +128,13 @@ func (m *portfolioManager) processPortfolio() {
}
m.m.Lock()
defer m.m.Unlock()
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.PortfolioMgr, "Portfolio manager cannot get exchanges: %v", err)
}
allExchangesHoldings := m.getExchangeAccountInfo(exchanges)
m.seedExchangeAccountInfo(allExchangesHoldings)
data := m.base.GetPortfolioGroupedCoin()
for key, value := range data {
err := m.base.UpdatePortfolio(value, key)
@@ -142,13 +151,6 @@ func (m *portfolioManager) processPortfolio() {
key,
value)
}
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.PortfolioMgr, "Portfolio manager cannot get exchanges: %v", err)
}
d := m.getExchangeAccountInfo(exchanges)
m.seedExchangeAccountInfo(d)
atomic.CompareAndSwapInt32(&m.processing, 1, 0)
}
@@ -242,7 +244,7 @@ func (m *portfolioManager) seedExchangeAccountInfo(accounts []account.Holdings)
func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExchange) []account.Holdings {
response := make([]account.Holdings, 0, len(exchanges))
for x := range exchanges {
if exchanges[x] == nil || !exchanges[x].IsEnabled() {
if !exchanges[x].IsEnabled() {
continue
}
if !exchanges[x].GetAuthenticatedAPISupport(exchange.RestAuthentication) {
@@ -253,10 +255,23 @@ func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExcha
}
continue
}
assetTypes := exchanges[x].GetAssetTypes(false) // left as available for now, to sync the full spectrum
var exchangeHoldings account.Holdings
assetTypes := asset.Items{asset.Spot}
if exchanges[x].HasAssetTypeAccountSegregation() {
// Get enabled exchange asset types to sync account information.
// TODO: Update with further api key asset segration e.g. Kraken has
// individual keys associated with different asset types.
assetTypes = exchanges[x].GetAssetTypes(true)
}
exchangeHoldings := account.Holdings{
Exchange: exchanges[x].GetName(),
Accounts: make([]account.SubAccount, 0, len(assetTypes)),
}
for y := range assetTypes {
accountHoldings, err := exchanges[x].FetchAccountInfo(context.TODO(), assetTypes[y])
// Update account info to process account updates in memory on
// every fetch.
accountHoldings, err := exchanges[x].UpdateAccountInfo(context.TODO(), assetTypes[y])
if err != nil {
log.Errorf(log.PortfolioMgr,
"Error encountered retrieving exchange account info for %s. Error %s\n",
@@ -264,13 +279,11 @@ func (m *portfolioManager) getExchangeAccountInfo(exchanges []exchange.IBotExcha
err)
continue
}
for z := range accountHoldings.Accounts {
accountHoldings.Accounts[z].AssetType = assetTypes[y]
}
exchangeHoldings.Exchange = exchanges[x].GetName()
exchangeHoldings.Accounts = append(exchangeHoldings.Accounts, accountHoldings.Accounts...)
}
response = append(response, exchangeHoldings)
if len(exchangeHoldings.Accounts) > 0 {
response = append(response, exchangeHoldings)
}
}
return response
}

View File

@@ -4,18 +4,31 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/dispatch"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
func init() {
service = new(Service)
service.accounts = make(map[string]*Account)
service.exchangeAccounts = make(map[string]*Accounts)
service.mux = dispatch.GetNewMux()
}
var (
errHoldingsIsNil = errors.New("holdings cannot be nil")
errExchangeNameUnset = errors.New("exchange name unset")
errExchangeHoldingsNotFound = errors.New("exchange holdings not found")
errAssetHoldingsNotFound = errors.New("asset holdings not found")
errExchangeAccountsNotFound = errors.New("exchange accounts not found")
errNoExchangeSubAccountBalances = errors.New("no exchange sub account balances")
errNoBalanceFound = errors.New("no balance found")
errBalanceIsNil = errors.New("balance is nil")
)
// CollectBalances converts a map of sub-account balances into a slice
func CollectBalances(accountBalances map[string][]Balance, assetType asset.Item) (accounts []SubAccount, err error) {
if accountBalances == nil {
@@ -37,80 +50,255 @@ func CollectBalances(accountBalances map[string][]Balance, assetType asset.Item)
return
}
// SubscribeToExchangeAccount subcribes to your exchange account
// SubscribeToExchangeAccount subscribes to your exchange account
func SubscribeToExchangeAccount(exchange string) (dispatch.Pipe, error) {
exchange = strings.ToLower(exchange)
service.mu.Lock()
acc, ok := service.accounts[exchange]
if !ok {
service.mu.Unlock()
return dispatch.Pipe{},
fmt.Errorf("%s exchange account holdings not found", exchange)
}
defer service.mu.Unlock()
return service.mux.Subscribe(acc.ID)
accounts, ok := service.exchangeAccounts[exchange]
if !ok {
return dispatch.Pipe{}, fmt.Errorf("cannot subscribe %s %w",
exchange,
errExchangeAccountsNotFound)
}
return service.mux.Subscribe(accounts.ID)
}
// Process processes new account holdings updates
func Process(h *Holdings) error {
if h == nil {
return errors.New("cannot be nil")
}
if h.Exchange == "" {
return errors.New("exchange name unset")
}
return service.Update(h)
}
// GetHoldings returns full holdings for an exchange
func GetHoldings(exch string, assetType asset.Item) (Holdings, error) {
if exch == "" {
return Holdings{}, errors.New("exchange name unset")
return Holdings{}, errExchangeNameUnset
}
if !assetType.IsValid() {
return Holdings{}, fmt.Errorf("%s %s %w", exch, assetType, asset.ErrNotSupported)
}
exch = strings.ToLower(exch)
if !assetType.IsValid() {
return Holdings{}, fmt.Errorf("assetType %v is invalid", assetType)
service.mu.Lock()
defer service.mu.Unlock()
accounts, ok := service.exchangeAccounts[exch]
if !ok {
return Holdings{}, fmt.Errorf("%s %s %w", exch, assetType, errExchangeHoldingsNotFound)
}
var accountsHoldings []SubAccount
for subAccount, assetHoldings := range accounts.SubAccounts {
for ai, currencyHoldings := range assetHoldings {
if ai != assetType {
continue
}
var currencyBalances = make([]Balance, len(currencyHoldings))
target := 0
for item, balance := range currencyHoldings {
balance.m.Lock()
currencyBalances[target] = Balance{
CurrencyName: currency.Code{Item: item, UpperCase: true},
Total: balance.total,
Hold: balance.hold,
Free: balance.free,
AvailableWithoutBorrow: balance.availableWithoutBorrow,
Borrowed: balance.borrowed,
}
balance.m.Unlock()
target++
}
if len(currencyBalances) == 0 {
continue
}
accountsHoldings = append(accountsHoldings, SubAccount{
ID: subAccount,
AssetType: ai,
Currencies: currencyBalances,
})
break
}
}
if len(accountsHoldings) == 0 {
return Holdings{}, fmt.Errorf("%s %s %w",
exch,
assetType,
errAssetHoldingsNotFound)
}
return Holdings{Exchange: exch, Accounts: accountsHoldings}, nil
}
// GetBalance returns the internal balance for that asset item.
func GetBalance(exch, subAccount string, ai asset.Item, c currency.Code) (*ProtectedBalance, error) {
if exch == "" {
return nil, errExchangeNameUnset
}
if !ai.IsValid() {
return nil, fmt.Errorf("%s %w", ai, asset.ErrNotSupported)
}
if c.IsEmpty() {
return nil, currency.ErrCurrencyCodeEmpty
}
exch = strings.ToLower(exch)
subAccount = strings.ToLower(subAccount)
service.mu.Lock()
defer service.mu.Unlock()
h, ok := service.accounts[exch]
accounts, ok := service.exchangeAccounts[exch]
if !ok {
return Holdings{}, errors.New("exchange account holdings not found")
return nil, fmt.Errorf("%s %w", exch, errExchangeHoldingsNotFound)
}
for y := range h.h.Accounts {
if h.h.Accounts[y].AssetType == assetType {
return *h.h, nil
}
assetBalances, ok := accounts.SubAccounts[subAccount]
if !ok {
return nil, fmt.Errorf("%s %s %w",
exch, subAccount, errNoExchangeSubAccountBalances)
}
return Holdings{}, fmt.Errorf("%v holdings data not found for %s", assetType, exch)
currencyBalances, ok := assetBalances[ai]
if !ok {
return nil, fmt.Errorf("%s %s %s %w",
exch, subAccount, ai, errAssetHoldingsNotFound)
}
bal, ok := currencyBalances[c.Item]
if !ok {
return nil, fmt.Errorf("%s %s %s %s %w",
exch, subAccount, ai, c, errNoBalanceFound)
}
return bal, nil
}
// Update updates holdings with new account info
func (s *Service) Update(a *Holdings) error {
if a == nil {
return errHoldingsIsNil
}
if a.Exchange == "" {
return errExchangeNameUnset
}
exch := strings.ToLower(a.Exchange)
s.mu.Lock()
acc, ok := s.accounts[exch]
defer s.mu.Unlock()
accounts, ok := s.exchangeAccounts[exch]
if !ok {
id, err := s.mux.GetID()
if err != nil {
s.mu.Unlock()
return err
}
s.accounts[exch] = &Account{h: a, ID: id}
s.mu.Unlock()
return nil
accounts = &Accounts{
ID: id,
SubAccounts: make(map[string]map[asset.Item]map[*currency.Item]*ProtectedBalance),
}
s.exchangeAccounts[exch] = accounts
}
acc.h.Accounts = a.Accounts
defer s.mu.Unlock()
var errs common.Errors
for x := range a.Accounts {
if !a.Accounts[x].AssetType.IsValid() {
errs = append(errs, fmt.Errorf("cannot load sub account holdings for %s [%s] %w",
a.Accounts[x].ID,
a.Accounts[x].AssetType,
asset.ErrNotSupported))
continue
}
return s.mux.Publish([]uuid.UUID{acc.ID}, acc.h)
lowerSA := strings.ToLower(a.Accounts[x].ID)
var accountAssets map[asset.Item]map[*currency.Item]*ProtectedBalance
accountAssets, ok = accounts.SubAccounts[lowerSA]
if !ok {
accountAssets = make(map[asset.Item]map[*currency.Item]*ProtectedBalance)
accounts.SubAccounts[lowerSA] = accountAssets
}
var currencyBalances map[*currency.Item]*ProtectedBalance
currencyBalances, ok = accountAssets[a.Accounts[x].AssetType]
if !ok {
currencyBalances = make(map[*currency.Item]*ProtectedBalance)
accountAssets[a.Accounts[x].AssetType] = currencyBalances
}
for y := range a.Accounts[x].Currencies {
bal := currencyBalances[a.Accounts[x].Currencies[y].CurrencyName.Item]
if bal == nil {
bal = &ProtectedBalance{}
currencyBalances[a.Accounts[x].Currencies[y].CurrencyName.Item] = bal
}
bal.load(a.Accounts[x].Currencies[y])
}
}
err := s.mux.Publish([]uuid.UUID{accounts.ID}, a)
if err != nil {
return err
}
if errs != nil {
return errs
}
return nil
}
// load checks to see if there is a change from incoming balance, if there is a
// change it will change then alert external routines.
func (b *ProtectedBalance) load(change Balance) {
b.m.Lock()
defer b.m.Unlock()
if b.total == change.Total &&
b.hold == change.Hold &&
b.free == change.Free &&
b.availableWithoutBorrow == change.AvailableWithoutBorrow &&
b.borrowed == change.Borrowed {
return
}
b.total = change.Total
b.hold = change.Hold
b.free = change.Free
b.availableWithoutBorrow = change.AvailableWithoutBorrow
b.borrowed = change.Borrowed
b.notice.Alert()
}
// Wait waits for a change in amounts for an asset type. This will pause
// indefinitely if no change ever occurs. Max wait will return true if it failed
// to achieve a state change in the time specified. If Max wait is not specified
// it will default to a minute wait time.
func (b *ProtectedBalance) Wait(maxWait time.Duration) (wait <-chan bool, cancel chan<- struct{}, err error) {
if b == nil {
return nil, nil, errBalanceIsNil
}
if maxWait <= 0 {
maxWait = time.Minute
}
ch := make(chan struct{})
go func(ch chan<- struct{}, until time.Duration) {
time.Sleep(until)
select {
case ch <- struct{}{}:
default:
}
}(ch, maxWait)
return b.notice.Wait(ch), ch, nil
}
// GetFree returns the current free balance for the exchange
func (b *ProtectedBalance) GetFree() float64 {
if b == nil {
return 0
}
b.m.Lock()
defer b.m.Unlock()
return b.free
}

View File

@@ -1,6 +1,7 @@
package account
import (
"errors"
"sync"
"testing"
"time"
@@ -11,6 +12,7 @@ import (
)
func TestCollectBalances(t *testing.T) {
t.Parallel()
accounts, err := CollectBalances(
map[string][]Balance{
"someAccountID": {
@@ -49,21 +51,26 @@ func TestCollectBalances(t *testing.T) {
if err == nil {
t.Errorf("expecting err %s", errAccountBalancesIsNil.Error())
}
_, err = CollectBalances(map[string][]Balance{}, asset.Empty)
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
}
func TestHoldings(t *testing.T) {
func TestGetHoldings(t *testing.T) {
err := dispatch.Start(dispatch.DefaultMaxWorkers, dispatch.DefaultJobsLimit)
if err != nil {
t.Fatal(err)
}
err = Process(nil)
if err == nil {
t.Error("error cannot be nil")
if !errors.Is(err, errHoldingsIsNil) {
t.Fatalf("received: '%v' but expected: '%v'", err, errHoldingsIsNil)
}
err = Process(&Holdings{})
if err == nil {
t.Error("error cannot be nil")
if !errors.Is(err, errExchangeNameUnset) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset)
}
holdings := Holdings{
@@ -77,35 +84,76 @@ func TestHoldings(t *testing.T) {
err = Process(&Holdings{
Exchange: "Test",
Accounts: []SubAccount{{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
Accounts: []SubAccount{
{
ID: "1337",
}},
})
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
err = Process(&Holdings{
Exchange: "Test",
Accounts: []SubAccount{
{
AssetType: asset.UpsideProfitContract,
ID: "1337",
},
}},
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
},
}},
})
if err != nil {
t.Error(err)
}
// process again with no changes
err = Process(&Holdings{
Exchange: "Test",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
},
}},
})
if err != nil {
t.Error(err)
}
_, err = GetHoldings("", asset.Spot)
if err == nil {
t.Error("error cannot be nil")
if !errors.Is(err, errExchangeNameUnset) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset)
}
_, err = GetHoldings("bla", asset.Spot)
if err == nil {
t.Error("error cannot be nil")
if !errors.Is(err, errExchangeHoldingsNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeHoldingsNotFound)
}
_, err = GetHoldings("bla", asset.Empty)
if err == nil {
t.Error("error cannot be nil since an invalid asset type is provided")
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
_, err = GetHoldings("Test", asset.UpsideProfitContract)
if !errors.Is(err, errAssetHoldingsNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errAssetHoldingsNotFound)
}
u, err := GetHoldings("Test", asset.Spot)
@@ -117,7 +165,7 @@ func TestHoldings(t *testing.T) {
t.Errorf("expecting 1337 but received %s", u.Accounts[0].ID)
}
if u.Accounts[0].Currencies[0].CurrencyName != currency.BTC {
if !u.Accounts[0].Currencies[0].CurrencyName.Equal(currency.BTC) {
t.Errorf("expecting BTC but received %s",
u.Accounts[0].Currencies[0].CurrencyName)
}
@@ -133,8 +181,8 @@ func TestHoldings(t *testing.T) {
}
_, err = SubscribeToExchangeAccount("nonsense")
if err == nil {
t.Fatal("error cannot be nil")
if !errors.Is(err, errExchangeAccountsNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeAccountsNotFound)
}
p, err := SubscribeToExchangeAccount("Test")
@@ -159,7 +207,8 @@ func TestHoldings(t *testing.T) {
err = Process(&Holdings{
Exchange: "Test",
Accounts: []SubAccount{{
ID: "1337",
ID: "1337",
AssetType: asset.MarginFunding,
Currencies: []Balance{
{
CurrencyName: currency.BTC,
@@ -175,3 +224,238 @@ func TestHoldings(t *testing.T) {
wg.Wait()
}
func TestGetBalance(t *testing.T) {
_, err := GetBalance("", "", asset.Empty, currency.Code{})
if !errors.Is(err, errExchangeNameUnset) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset)
}
_, err = GetBalance("bruh", "", asset.Empty, currency.Code{})
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
_, err = GetBalance("bruh", "", asset.Spot, currency.Code{})
if !errors.Is(err, currency.ErrCurrencyCodeEmpty) {
t.Fatalf("received: '%v' but expected: '%v'", err, currency.ErrCurrencyCodeEmpty)
}
_, err = GetBalance("bruh", "", asset.Spot, currency.BTC)
if !errors.Is(err, errExchangeHoldingsNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeHoldingsNotFound)
}
err = Process(&Holdings{
Exchange: "bruh",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
},
},
})
if err != nil {
t.Error(err)
}
_, err = GetBalance("bruh", "1336", asset.Spot, currency.BTC)
if !errors.Is(err, errNoExchangeSubAccountBalances) {
t.Fatalf("received: '%v' but expected: '%v'", err, errNoExchangeSubAccountBalances)
}
_, err = GetBalance("bruh", "1337", asset.Futures, currency.BTC)
if !errors.Is(err, errAssetHoldingsNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errAssetHoldingsNotFound)
}
_, err = GetBalance("bruh", "1337", asset.Spot, currency.BTC)
if !errors.Is(err, errNoBalanceFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, errNoBalanceFound)
}
err = Process(&Holdings{
Exchange: "bruh",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 2,
Hold: 1,
},
},
},
},
})
if err != nil {
t.Error(err)
}
bal, err := GetBalance("bruh", "1337", asset.Spot, currency.BTC)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
bal.m.Lock()
if bal.total != 2 {
t.Fatal("unexpected value")
}
if bal.hold != 1 {
t.Fatal("unexpected value")
}
}
func TestBalanceInternalWait(t *testing.T) {
t.Parallel()
var bi *ProtectedBalance
_, _, err := bi.Wait(0)
if !errors.Is(err, errBalanceIsNil) {
t.Fatalf("received: '%v' but expected: '%v'", err, errBalanceIsNil)
}
bi = &ProtectedBalance{}
waiter, _, err := bi.Wait(time.Nanosecond)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
if !<-waiter {
t.Fatal("should been alerted by timeout")
}
waiter, _, err = bi.Wait(0)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
bi.notice.Alert()
if <-waiter {
t.Fatal("should have been alerted by change notice")
}
}
func TestBalanceInternalLoad(t *testing.T) {
t.Parallel()
bi := &ProtectedBalance{}
bi.load(Balance{Total: 1, Hold: 2, Free: 3, AvailableWithoutBorrow: 4, Borrowed: 5})
bi.m.Lock()
if bi.total != 1 {
t.Fatal("unexpected value")
}
if bi.hold != 2 {
t.Fatal("unexpected value")
}
if bi.free != 3 {
t.Fatal("unexpected value")
}
if bi.availableWithoutBorrow != 4 {
t.Fatal("unexpected value")
}
if bi.borrowed != 5 {
t.Fatal("unexpected value")
}
bi.m.Unlock()
if bi.GetFree() != 3 {
t.Fatal("unexpected value")
}
}
func TestGetFree(t *testing.T) {
t.Parallel()
var bi *ProtectedBalance
if bi.GetFree() != 0 {
t.Fatal("unexpected value")
}
bi = &ProtectedBalance{}
bi.free = 1
if bi.GetFree() != 1 {
t.Fatal("unexpected value")
}
}
func TestUpdate(t *testing.T) {
t.Parallel()
s := &Service{exchangeAccounts: make(map[string]*Accounts), mux: dispatch.GetNewMux()}
err := s.Update(nil)
if !errors.Is(err, errHoldingsIsNil) {
t.Fatalf("received: '%v' but expected: '%v'", err, errHoldingsIsNil)
}
err = s.Update(&Holdings{})
if !errors.Is(err, errExchangeNameUnset) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset)
}
err = s.Update(&Holdings{
Exchange: "TeSt",
Accounts: []SubAccount{
{
AssetType: 6969,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
},
},
{AssetType: asset.UpsideProfitContract, ID: "1337"},
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
},
},
},
})
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
err = s.Update(&Holdings{ // No change
Exchange: "tEsT",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
CurrencyName: currency.BTC,
Total: 100,
Hold: 20,
},
},
},
},
})
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
acc, ok := s.exchangeAccounts["test"]
if !ok {
t.Fatal("account should be loaded")
}
b, ok := acc.SubAccounts["1337"][asset.Spot][currency.BTC.Item]
if !ok {
t.Fatal("account should be loaded")
}
if b.total != 100 {
t.Errorf("expecting 100 but received %f", b.total)
}
if b.hold != 20 {
t.Errorf("expecting 20 but received %f", b.hold)
}
}

View File

@@ -7,26 +7,27 @@ import (
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/dispatch"
"github.com/thrasher-corp/gocryptotrader/exchanges/alert"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
// Vars for the ticker package
var (
service *Service
service Service
errAccountBalancesIsNil = errors.New("account balances is nil")
)
// Service holds ticker information for each individual exchange
type Service struct {
accounts map[string]*Account
mux *dispatch.Mux
mu sync.Mutex
exchangeAccounts map[string]*Accounts
mux *dispatch.Mux
mu sync.Mutex
}
// Account holds a stream ID and a pointer to the exchange holdings
type Account struct {
h *Holdings
ID uuid.UUID
// Accounts holds a stream ID and a map to the exchange holdings
type Accounts struct {
ID uuid.UUID
SubAccounts map[string]map[asset.Item]map[*currency.Item]*ProtectedBalance
}
// Holdings is a generic type to hold each exchange's holdings for all enabled
@@ -61,3 +62,17 @@ type Change struct {
Amount float64
Account string
}
// ProtectedBalance stores the full balance information for that specific asset
type ProtectedBalance struct {
total float64
hold float64
free float64
availableWithoutBorrow float64
borrowed float64
m sync.Mutex
// notice alerts for when the balance changes for strategy inspection and
// usage.
notice alert.Notice
}

View File

@@ -113,6 +113,7 @@ func (a *Alphapoint) UpdateAccountInfo(ctx context.Context, assetType asset.Item
response.Accounts = append(response.Accounts, account.SubAccount{
Currencies: balances,
AssetType: assetType,
})
err = account.Process(&response)

View File

@@ -121,27 +121,28 @@ func (b *Binance) SetDefaults() {
REST: true,
Websocket: true,
RESTCapabilities: protocol.Features{
TickerBatching: true,
TickerFetching: true,
KlineFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
GetOrder: true,
GetOrders: true,
CancelOrders: true,
CancelOrder: true,
SubmitOrder: true,
DepositHistory: true,
WithdrawalHistory: true,
TradeFetching: true,
UserTradeHistory: true,
TradeFee: true,
CryptoWithdrawalFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
TickerBatching: true,
TickerFetching: true,
KlineFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
GetOrder: true,
GetOrders: true,
CancelOrders: true,
CancelOrder: true,
SubmitOrder: true,
DepositHistory: true,
WithdrawalHistory: true,
TradeFetching: true,
UserTradeHistory: true,
TradeFee: true,
CryptoWithdrawalFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
HasAssetTypeAccountSegregation: true,
},
WebsocketCapabilities: protocol.Features{
TradeFetching: true,
@@ -701,6 +702,7 @@ func (b *Binance) UpdateOrderbook(ctx context.Context, p currency.Pair, assetTyp
func (b *Binance) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (account.Holdings, error) {
var info account.Holdings
var acc account.SubAccount
acc.AssetType = assetType
info.Exchange = b.Name
switch assetType {
case asset.Spot:

View File

@@ -497,11 +497,11 @@ func (b *Bitfinex) UpdateAccountInfo(ctx context.Context, assetType asset.Item)
}
var Accounts = []account.SubAccount{
{ID: "deposit"},
{ID: "exchange"},
{ID: "trading"},
{ID: "margin"},
{ID: "funding "},
{ID: "deposit", AssetType: assetType},
{ID: "exchange", AssetType: assetType},
{ID: "trading", AssetType: assetType},
{ID: "margin", AssetType: assetType},
{ID: "funding", AssetType: assetType},
}
for x := range accountBalance {

View File

@@ -448,6 +448,7 @@ func (b *Bitstamp) UpdateAccountInfo(ctx context.Context, assetType asset.Item)
})
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -421,6 +421,7 @@ func (b *Bittrex) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (
}
resp.Accounts = append(resp.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})
resp.Exchange = b.Name

View File

@@ -411,6 +411,7 @@ func (b *BTSE) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (acc
a.Exchange = b.Name
a.Accounts = []account.SubAccount{
{
AssetType: assetType,
Currencies: currencies,
},
}

View File

@@ -391,6 +391,7 @@ func (c *COINUT) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
info.Exchange = c.Name
info.Accounts = append(info.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: balances,
})

View File

@@ -451,7 +451,7 @@ func (b *Base) SetEnabled(enabled bool) {
// IsEnabled is a method that returns if the current exchange is enabled
func (b *Base) IsEnabled() bool {
return b.Enabled
return b != nil && b.Enabled
}
// SetupDefaults sets the exchange settings based on the supplied config
@@ -1300,3 +1300,9 @@ func (b *Base) CalculateTotalCollateral(ctx context.Context, calculator *order.T
func (b *Base) GetFuturesPositions(context.Context, asset.Item, currency.Pair, time.Time, time.Time) ([]order.Detail, error) {
return nil, common.ErrNotYetImplemented
}
// HasAssetTypeAccountSegregation returns if the accounts are divided into asset
// types instead of just being denoted as spot holdings.
func (b *Base) HasAssetTypeAccountSegregation() bool {
return b.Features.Supports.RESTCapabilities.HasAssetTypeAccountSegregation
}

View File

@@ -379,6 +379,7 @@ func (e *EXMO) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (acc
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -363,6 +363,7 @@ func (g *Gateio) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
info.Accounts = append(info.Accounts, account.SubAccount{
Currencies: currData,
AssetType: assetType,
})
} else {
balance, err := g.GetBalances(ctx)
@@ -420,6 +421,7 @@ func (g *Gateio) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
info.Accounts = append(info.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: balances,
})
}

View File

@@ -330,6 +330,7 @@ func (g *Gemini) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -449,6 +449,7 @@ func (h *HitBTC) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -102,22 +102,23 @@ func (h *HUOBI) SetDefaults() {
REST: true,
Websocket: true,
RESTCapabilities: protocol.Features{
TickerFetching: true,
KlineFetching: true,
TradeFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
GetOrder: true,
GetOrders: true,
CancelOrders: true,
CancelOrder: true,
SubmitOrder: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
TradeFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
TickerFetching: true,
KlineFetching: true,
TradeFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
GetOrder: true,
GetOrders: true,
CancelOrders: true,
CancelOrder: true,
SubmitOrder: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
TradeFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
HasAssetTypeAccountSegregation: true,
},
WebsocketCapabilities: protocol.Features{
KlineFetching: true,

View File

@@ -39,8 +39,6 @@ type IBotExchange interface {
UpdateTradablePairs(ctx context.Context, forceUpdate bool) error
GetEnabledPairs(a asset.Item) (currency.Pairs, error)
GetAvailablePairs(a asset.Item) (currency.Pairs, error)
FetchAccountInfo(ctx context.Context, a asset.Item) (account.Holdings, error)
UpdateAccountInfo(ctx context.Context, a asset.Item) (account.Holdings, error)
GetAuthenticatedAPISupport(endpoint uint8) bool
SetPairs(pairs currency.Pairs, a asset.Item, enabled bool) error
GetAssetTypes(enabled bool) asset.Items
@@ -98,6 +96,8 @@ type IBotExchange interface {
GetOrderExecutionLimits(a asset.Item, cp currency.Pair) (*order.Limits, error)
CheckOrderExecutionLimits(a asset.Item, cp currency.Pair, price, amount float64, orderType order.Type) error
UpdateOrderExecutionLimits(ctx context.Context, a asset.Item) error
AccountManagement
}
// CurrencyStateManagement defines functionality for currency state management
@@ -109,3 +109,10 @@ type CurrencyStateManagement interface {
CanWithdraw(c currency.Code, a asset.Item) error
CanDeposit(c currency.Code, a asset.Item) error
}
// AccountManagement defines functionality for exchange account management
type AccountManagement interface {
UpdateAccountInfo(ctx context.Context, a asset.Item) (account.Holdings, error)
FetchAccountInfo(ctx context.Context, a asset.Item) (account.Holdings, error)
HasAssetTypeAccountSegregation() bool
}

View File

@@ -299,6 +299,7 @@ func (i *ItBit) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (ac
}
info.Accounts = append(info.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: fullBalance,
})

View File

@@ -106,29 +106,30 @@ func (k *Kraken) SetDefaults() {
REST: true,
Websocket: true,
RESTCapabilities: protocol.Features{
TickerBatching: true,
TickerFetching: true,
KlineFetching: true,
TradeFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
GetOrder: true,
GetOrders: true,
CancelOrder: true,
SubmitOrder: true,
UserTradeHistory: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
FiatDeposit: true,
FiatWithdraw: true,
TradeFee: true,
FiatDepositFee: true,
FiatWithdrawalFee: true,
CryptoDepositFee: true,
CryptoWithdrawalFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
TickerBatching: true,
TickerFetching: true,
KlineFetching: true,
TradeFetching: true,
OrderbookFetching: true,
AutoPairUpdates: true,
AccountInfo: true,
GetOrder: true,
GetOrders: true,
CancelOrder: true,
SubmitOrder: true,
UserTradeHistory: true,
CryptoDeposit: true,
CryptoWithdrawal: true,
FiatDeposit: true,
FiatWithdraw: true,
TradeFee: true,
FiatDepositFee: true,
FiatWithdrawalFee: true,
CryptoDepositFee: true,
CryptoWithdrawalFee: true,
MultiChainDeposits: true,
MultiChainWithdrawals: true,
HasAssetTypeAccountSegregation: true,
},
WebsocketCapabilities: protocol.Features{
TickerFetching: true,
@@ -608,6 +609,7 @@ func (k *Kraken) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (a
}
info.Accounts = append(info.Accounts, account.SubAccount{
Currencies: balances,
AssetType: assetType,
})
case asset.Futures:
bal, err := k.GetFuturesAccountData(ctx)

View File

@@ -327,7 +327,7 @@ func (l *Lbank) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (ac
if err != nil {
return info, err
}
var acc account.SubAccount
acc := account.SubAccount{AssetType: assetType}
for key, val := range data.Info.Asset {
c := currency.NewCode(key)
hold, ok := data.Info.Freeze[key]

View File

@@ -281,7 +281,7 @@ func (l *LocalBitcoins) UpdateOrderbook(ctx context.Context, p currency.Pair, as
// UpdateAccountInfo retrieves balances for all enabled currencies for the
// LocalBitcoins exchange
func (l *LocalBitcoins) UpdateAccountInfo(ctx context.Context, _ asset.Item) (account.Holdings, error) {
func (l *LocalBitcoins) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (account.Holdings, error) {
var response account.Holdings
response.Exchange = l.Name
accountBalance, err := l.GetWalletBalance(ctx)
@@ -290,6 +290,7 @@ func (l *LocalBitcoins) UpdateAccountInfo(ctx context.Context, _ asset.Item) (ac
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: []account.Balance{
{
CurrencyName: currency.BTC,

View File

@@ -194,7 +194,7 @@ func (o *OKGroup) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (
var resp account.Holdings
resp.Exchange = o.Name
currencyAccount := account.SubAccount{}
currencyAccount := account.SubAccount{AssetType: assetType}
for i := range currencies {
hold, parseErr := strconv.ParseFloat(currencies[i].Hold, 64)

View File

@@ -423,6 +423,7 @@ func (p *Poloniex) UpdateAccountInfo(ctx context.Context, assetType asset.Item)
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -44,4 +44,8 @@ type Features struct {
MultiChainDeposits bool `json:"multiChainDeposits,omitempty"`
MultiChainWithdrawals bool `json:"multiChainWithdrawals,omitempty"`
MultiChainDepositRequiresChainSet bool `json:"multiChainDepositRequiresChainSet,omitempty"`
// HasAssetTypeAccountSegregation is when the assets are divided into asset
// types instead of just being denoted as spot holdings.
HasAssetTypeAccountSegregation bool `json:"hasAssetTypeAccountSegregation,omitempty"`
}

View File

@@ -25,7 +25,7 @@ func init() {
service.mux = dispatch.GetNewMux()
}
// SubscribeTicker subcribes to a ticker and returns a communication channel to
// SubscribeTicker subscribes to a ticker and returns a communication channel to
// stream new ticker updates
func SubscribeTicker(exchange string, p currency.Pair, a asset.Item) (dispatch.Pipe, error) {
exchange = strings.ToLower(exchange)
@@ -42,7 +42,7 @@ func SubscribeTicker(exchange string, p currency.Pair, a asset.Item) (dispatch.P
return service.mux.Subscribe(tick.Main)
}
// SubscribeToExchangeTickers subcribes to all tickers on an exchange
// SubscribeToExchangeTickers subscribes to all tickers on an exchange
func SubscribeToExchangeTickers(exchange string) (dispatch.Pipe, error) {
exchange = strings.ToLower(exchange)
service.mu.Lock()

View File

@@ -324,6 +324,7 @@ func (y *Yobit) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (ac
}
response.Accounts = append(response.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: currencies,
})

View File

@@ -393,6 +393,7 @@ func (z *ZB) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (accou
info.Exchange = z.Name
info.Accounts = append(info.Accounts, account.SubAccount{
AssetType: assetType,
Currencies: balances,
})

View File

@@ -196,7 +196,8 @@ func (w Wrapper) AccountInformation(ctx context.Context, exch string, assetType
Exchange: exch,
Accounts: []account.SubAccount{
{
ID: exch,
ID: exch,
AssetType: assetType,
Currencies: []account.Balance{
{
CurrencyName: currency.Code{