account: refactor for save holdings (#1846)

* account: refactor for save holdings

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* Update exchanges/account/account.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/account/account.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* account: minor changes

Signed-off-by: Ye Sijun <junnplus@gmail.com>

---------

Signed-off-by: Ye Sijun <junnplus@gmail.com>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
Jun
2025-04-02 10:32:23 +09:00
committed by GitHub
parent 563ae7883b
commit 1b7fa2259a
5 changed files with 143 additions and 150 deletions

View File

@@ -28,10 +28,9 @@ type PairAsset struct {
Asset asset.Item
}
// SubAccountCurrencyAsset is a unique map key signature for subaccount, currency code and asset
type SubAccountCurrencyAsset struct {
// SubAccountAsset is a unique map key signature for subaccount and asset
type SubAccountAsset struct {
SubAccount string
Currency *currency.Item
Asset asset.Item
}

View File

@@ -653,36 +653,6 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream
return err
}
initAcc, err := exch.GetCachedAccountInfo(stream.Context(), assetType)
if err != nil {
return err
}
accounts := make([]*gctrpc.Account, len(initAcc.Accounts))
for x := range initAcc.Accounts {
subAccounts := make([]*gctrpc.AccountCurrencyInfo, len(initAcc.Accounts[x].Currencies))
for y := range initAcc.Accounts[x].Currencies {
subAccounts[y] = &gctrpc.AccountCurrencyInfo{
Currency: initAcc.Accounts[x].Currencies[y].Currency.String(),
TotalValue: initAcc.Accounts[x].Currencies[y].Total,
Hold: initAcc.Accounts[x].Currencies[y].Hold,
UpdatedAt: timestamppb.New(initAcc.Accounts[x].Currencies[y].UpdatedAt),
}
}
accounts[x] = &gctrpc.Account{
Id: initAcc.Accounts[x].ID,
Currencies: subAccounts,
}
}
err = stream.Send(&gctrpc.GetAccountInfoResponse{
Exchange: initAcc.Exchange,
Accounts: accounts,
})
if err != nil {
return err
}
pipe, err := account.SubscribeToExchangeAccount(r.Exchange)
if err != nil {
return err
@@ -694,16 +664,23 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream
log.Errorln(log.DispatchMgr, pipeErr)
}
}()
init := make(chan struct{}, 1)
init <- struct{}{}
for {
data, ok := <-pipe.Channel()
if !ok {
return errDispatchSystem
select {
case <-stream.Context().Done():
return stream.Context().Err()
case _, ok := <-pipe.Channel():
if !ok {
return errDispatchSystem
}
case <-init:
}
holdings, ok := data.(*account.Holdings)
if !ok {
return common.GetTypeAssertError("*account.Holdings", data)
holdings, err := exch.GetCachedAccountInfo(stream.Context(), assetType)
if err != nil {
return err
}
accounts := make([]*gctrpc.Account, len(holdings.Accounts))

View File

@@ -73,7 +73,7 @@ func SubscribeToExchangeAccount(exchange string) (dispatch.Pipe, error) {
// Process processes new account holdings updates
func Process(h *Holdings, c *Credentials) error {
return service.Update(h, c)
return service.Save(h, c)
}
// GetHoldings returns full holdings for an exchange.
@@ -101,28 +101,30 @@ func GetHoldings(exch string, creds *Credentials, assetType asset.Item) (Holding
return Holdings{}, fmt.Errorf("%s %w: `%s`", exch, ErrExchangeHoldingsNotFound, assetType)
}
subAccountHoldings, ok := accounts.SubAccounts[*creds]
subAccountHoldings, ok := accounts.subAccounts[*creds]
if !ok {
return Holdings{}, fmt.Errorf("%s %s %s %w %w", exch, creds, assetType, errNoCredentialBalances, ErrExchangeHoldingsNotFound)
}
currencyBalances := make([]Balance, 0, len(subAccountHoldings))
cpy := *creds
for mapKey, assetHoldings := range subAccountHoldings {
for mapKey, assets := range subAccountHoldings {
if mapKey.Asset != assetType {
continue
}
assetHoldings.m.Lock()
currencyBalances = append(currencyBalances, Balance{
Currency: mapKey.Currency.Currency().Upper(),
Total: assetHoldings.total,
Hold: assetHoldings.hold,
Free: assetHoldings.free,
AvailableWithoutBorrow: assetHoldings.availableWithoutBorrow,
Borrowed: assetHoldings.borrowed,
UpdatedAt: assetHoldings.updatedAt,
})
assetHoldings.m.Unlock()
for currItem, bal := range assets {
bal.m.Lock()
currencyBalances = append(currencyBalances, Balance{
Currency: currItem.Currency().Upper(),
Total: bal.total,
Hold: bal.hold,
Free: bal.free,
AvailableWithoutBorrow: bal.availableWithoutBorrow,
Borrowed: bal.borrowed,
UpdatedAt: bal.updatedAt,
})
bal.m.Unlock()
}
if cpy.SubAccount == "" && mapKey.SubAccount != "" {
// TODO: fix this backwards population
// the subAccount here may not be associated with the balance across all subAccountHoldings
@@ -141,13 +143,13 @@ func GetHoldings(exch string, creds *Credentials, assetType asset.Item) (Holding
}
// GetBalance returns the internal balance for that asset item.
func GetBalance(exch, subAccount string, creds *Credentials, ai asset.Item, c currency.Code) (*ProtectedBalance, error) {
func GetBalance(exch, subAccount string, creds *Credentials, a asset.Item, c currency.Code) (*ProtectedBalance, error) {
if exch == "" {
return nil, fmt.Errorf("cannot get balance: %w", errExchangeNameUnset)
}
if !ai.IsValid() {
return nil, fmt.Errorf("cannot get balance: %s %w", ai, asset.ErrNotSupported)
if !a.IsValid() {
return nil, fmt.Errorf("cannot get balance: %s %w", a, asset.ErrNotSupported)
}
if creds.IsEmpty() {
@@ -164,29 +166,31 @@ func GetBalance(exch, subAccount string, creds *Credentials, ai asset.Item, c cu
accounts, ok := service.exchangeAccounts[exch]
if !ok {
return nil, fmt.Errorf("%s %w", exch, ErrExchangeHoldingsNotFound)
return nil, fmt.Errorf("%w for %s", ErrExchangeHoldingsNotFound, exch)
}
subAccounts, ok := accounts.SubAccounts[*creds]
subAccounts, ok := accounts.subAccounts[*creds]
if !ok {
return nil, fmt.Errorf("%s %s %w",
exch, creds, errNoCredentialBalances)
return nil, fmt.Errorf("%w for %s %s", errNoCredentialBalances, exch, creds)
}
bal, ok := subAccounts[key.SubAccountCurrencyAsset{
assets, ok := subAccounts[key.SubAccountAsset{
SubAccount: subAccount,
Currency: c.Item,
Asset: ai,
Asset: a,
}]
if !ok {
return nil, fmt.Errorf("%s %s %s %s %w",
exch, subAccount, ai, c, errNoExchangeSubAccountBalances)
return nil, fmt.Errorf("%w for %s SubAccount %q %s %s", errNoExchangeSubAccountBalances, exch, subAccount, a, c)
}
bal, ok := assets[c.Item]
if !ok {
return nil, fmt.Errorf("%w for %s SubAccount %q %s %s", errNoExchangeSubAccountBalances, exch, subAccount, a, c)
}
return bal, nil
}
// Update updates holdings with new account info
func (s *Service) Update(incoming *Holdings, creds *Credentials) error {
// Save saves the holdings with new account info
// incoming should be a full update, and any missing currencies will be zeroed
func (s *Service) Save(incoming *Holdings, creds *Credentials) error {
if incoming == nil {
return fmt.Errorf("cannot update holdings: %w", errHoldingsIsNil)
}
@@ -202,19 +206,26 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error {
exch := strings.ToLower(incoming.Exchange)
s.mu.Lock()
defer s.mu.Unlock()
accounts, ok := s.exchangeAccounts[exch]
if !ok {
accounts, exist := s.exchangeAccounts[exch]
if !exist {
id, err := s.mux.GetID()
if err != nil {
return err
}
accounts = &Accounts{
ID: id,
SubAccounts: make(map[Credentials]map[key.SubAccountCurrencyAsset]*ProtectedBalance),
subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances),
}
s.exchangeAccounts[exch] = accounts
}
subAccounts, exist := accounts.subAccounts[*creds]
if !exist {
subAccounts = make(map[key.SubAccountAsset]currencyBalances)
accounts.subAccounts[*creds] = subAccounts
}
var errs error
for x := range incoming.Accounts {
if !incoming.Accounts[x].AssetType.IsValid() {
@@ -235,46 +246,47 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error {
}
incoming.Accounts[x].Credentials.creds = cpy
var subAccounts map[key.SubAccountCurrencyAsset]*ProtectedBalance
subAccounts, ok = accounts.SubAccounts[*creds]
if !ok {
subAccounts = make(map[key.SubAccountCurrencyAsset]*ProtectedBalance)
accounts.SubAccounts[*creds] = subAccounts
accAsset := key.SubAccountAsset{
SubAccount: incoming.Accounts[x].ID,
Asset: incoming.Accounts[x].AssetType,
}
assets, exist := subAccounts[accAsset]
if !exist {
assets = make(map[*currency.Item]*ProtectedBalance)
accounts.subAccounts[*creds][accAsset] = assets
}
updated := make(map[*currency.Item]bool)
for y := range incoming.Accounts[x].Currencies {
if incoming.Accounts[x].Currencies[y].UpdatedAt.IsZero() {
incoming.Accounts[x].Currencies[y].UpdatedAt = time.Now()
accBal := &incoming.Accounts[x].Currencies[y]
if accBal.UpdatedAt.IsZero() {
accBal.UpdatedAt = time.Now()
}
// Note: Sub accounts are case sensitive and an account "name" is
// different to account "naMe".
bal, ok := subAccounts[key.SubAccountCurrencyAsset{
SubAccount: incoming.Accounts[x].ID,
Currency: incoming.Accounts[x].Currencies[y].Currency.Item,
Asset: incoming.Accounts[x].AssetType,
}]
bal, ok := assets[accBal.Currency.Item]
if !ok || bal == nil {
bal = &ProtectedBalance{}
subAccounts[key.SubAccountCurrencyAsset{
SubAccount: incoming.Accounts[x].ID,
Currency: incoming.Accounts[x].Currencies[y].Currency.Item,
Asset: incoming.Accounts[x].AssetType,
}] = bal
}
if err := bal.load(&incoming.Accounts[x].Currencies[y]); err != nil {
if err := bal.load(accBal); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w for account ID `%s` [%s %s]: %w",
errLoadingBalance,
incoming.Accounts[x].ID,
incoming.Accounts[x].AssetType,
incoming.Accounts[x].Currencies[y].Currency,
err))
continue
}
assets[accBal.Currency.Item] = bal
updated[accBal.Currency.Item] = true
}
for cur, bal := range assets {
if !updated[cur] {
bal.reset()
}
}
}
err := s.mux.Publish(incoming, accounts.ID)
if err != nil {
return err
if err := s.mux.Publish(incoming.Accounts[x], accounts.ID); err != nil {
errs = common.AppendError(errs, fmt.Errorf("cannot publish load for %s %w", exch, err))
}
}
return errs
@@ -342,3 +354,16 @@ func (b *ProtectedBalance) GetFree() float64 {
defer b.m.Unlock()
return b.free
}
func (b *ProtectedBalance) reset() {
b.m.Lock()
defer b.m.Unlock()
b.total = 0
b.hold = 0
b.free = 0
b.availableWithoutBorrow = 0
b.borrowed = 0
b.updatedAt = time.Now()
b.notice.Alert()
}

View File

@@ -113,25 +113,6 @@ func TestGetHoldings(t *testing.T) {
}, happyCredentials)
assert.NoError(t, err)
// process again with no changes
err = Process(&Holdings{
Exchange: "Test",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
Currency: currency.BTC,
Total: 100,
Hold: 20,
},
},
},
},
}, happyCredentials)
assert.NoError(t, err)
_, err = GetHoldings("", nil, asset.Spot)
assert.ErrorIs(t, err, errExchangeNameUnset)
@@ -337,20 +318,16 @@ func TestGetFree(t *testing.T) {
}
}
func TestUpdate(t *testing.T) {
func TestSave(t *testing.T) {
t.Parallel()
s := &Service{exchangeAccounts: make(map[string]*Accounts), mux: dispatch.GetNewMux(nil)}
err := s.Update(nil, nil)
if !errors.Is(err, errHoldingsIsNil) {
t.Fatalf("received: '%v' but expected: '%v'", err, errHoldingsIsNil)
}
err := s.Save(nil, nil)
assert.ErrorIs(t, err, errHoldingsIsNil)
err = s.Update(&Holdings{}, nil)
if !errors.Is(err, errExchangeNameUnset) {
t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset)
}
err = s.Save(&Holdings{}, nil)
assert.ErrorIs(t, err, errExchangeNameUnset)
err = s.Update(&Holdings{
err = s.Save(&Holdings{
Exchange: "TeSt",
Accounts: []SubAccount{
{
@@ -365,24 +342,11 @@ func TestUpdate(t *testing.T) {
},
},
{AssetType: asset.UpsideProfitContract, ID: "1337"},
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
Currency: currency.BTC,
Total: 100,
Hold: 20,
},
},
},
},
}, happyCredentials)
if !errors.Is(err, asset.ErrNotSupported) {
t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported)
}
assert.ErrorIs(t, err, asset.ErrNotSupported)
err = s.Update(&Holdings{ // No change
err = s.Save(&Holdings{ // No change
Exchange: "tEsT",
Accounts: []SubAccount{
{
@@ -398,25 +362,51 @@ func TestUpdate(t *testing.T) {
},
},
}, happyCredentials)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
require.NoError(t, err)
acc, ok := s.exchangeAccounts["test"]
if !ok {
t.Fatal("account should be loaded")
}
require.True(t, ok)
b, ok := acc.SubAccounts[Credentials{Key: "AAAAA"}][key.SubAccountCurrencyAsset{
assets, ok := acc.subAccounts[*happyCredentials][key.SubAccountAsset{
SubAccount: "1337",
Currency: currency.BTC.Item,
Asset: asset.Spot,
}]
if !ok {
t.Fatal("account should be loaded")
}
require.True(t, ok)
b, ok := assets[currency.BTC.Item]
require.True(t, ok)
assert.NotEmpty(t, b.updatedAt)
assert.Equal(t, 100.0, b.total)
assert.Equal(t, 20.0, b.hold)
err = s.Save(&Holdings{
Exchange: "tEsT",
Accounts: []SubAccount{
{
AssetType: asset.Spot,
ID: "1337",
Currencies: []Balance{
{
Currency: currency.ETH,
Total: 80,
Hold: 20,
},
},
},
},
}, happyCredentials)
require.NoError(t, err)
b, ok = assets[currency.BTC.Item]
require.True(t, ok)
assert.NotEmpty(t, b.updatedAt)
assert.Zero(t, b.total)
assert.Zero(t, b.hold)
e, ok := assets[currency.ETH.Item]
require.True(t, ok)
assert.NotEmpty(t, e.updatedAt)
assert.Equal(t, 80.0, e.total)
assert.Equal(t, 20.0, e.hold)
}

View File

@@ -34,9 +34,11 @@ type Accounts struct {
// TODO: Credential tracker to match to keys that are managed and return
// pointer.
// TODO: Have different cred struct for centralized verse DEFI exchanges.
SubAccounts map[Credentials]map[key.SubAccountCurrencyAsset]*ProtectedBalance
subAccounts map[Credentials]map[key.SubAccountAsset]currencyBalances
}
type currencyBalances = map[*currency.Item]*ProtectedBalance
// Holdings is a generic type to hold each exchange's holdings for all enabled
// currencies
type Holdings struct {