diff --git a/common/key/key.go b/common/key/key.go index cfd6c2d6..5db6bb4f 100644 --- a/common/key/key.go +++ b/common/key/key.go @@ -28,10 +28,9 @@ type PairAsset struct { Asset asset.Item } -// SubAccountCurrencyAsset is a unique map key signature for subaccount, currency code and asset -type SubAccountCurrencyAsset struct { +// SubAccountAsset is a unique map key signature for subaccount and asset +type SubAccountAsset struct { SubAccount string - Currency *currency.Item Asset asset.Item } diff --git a/engine/rpcserver.go b/engine/rpcserver.go index 4b2699b6..1e8719f9 100644 --- a/engine/rpcserver.go +++ b/engine/rpcserver.go @@ -653,36 +653,6 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream return err } - initAcc, err := exch.GetCachedAccountInfo(stream.Context(), assetType) - if err != nil { - return err - } - - accounts := make([]*gctrpc.Account, len(initAcc.Accounts)) - for x := range initAcc.Accounts { - subAccounts := make([]*gctrpc.AccountCurrencyInfo, len(initAcc.Accounts[x].Currencies)) - for y := range initAcc.Accounts[x].Currencies { - subAccounts[y] = &gctrpc.AccountCurrencyInfo{ - Currency: initAcc.Accounts[x].Currencies[y].Currency.String(), - TotalValue: initAcc.Accounts[x].Currencies[y].Total, - Hold: initAcc.Accounts[x].Currencies[y].Hold, - UpdatedAt: timestamppb.New(initAcc.Accounts[x].Currencies[y].UpdatedAt), - } - } - accounts[x] = &gctrpc.Account{ - Id: initAcc.Accounts[x].ID, - Currencies: subAccounts, - } - } - - err = stream.Send(&gctrpc.GetAccountInfoResponse{ - Exchange: initAcc.Exchange, - Accounts: accounts, - }) - if err != nil { - return err - } - pipe, err := account.SubscribeToExchangeAccount(r.Exchange) if err != nil { return err @@ -694,16 +664,23 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream log.Errorln(log.DispatchMgr, pipeErr) } }() + init := make(chan struct{}, 1) + init <- struct{}{} for { - data, ok := <-pipe.Channel() - if !ok { - return errDispatchSystem + select { + case <-stream.Context().Done(): + return stream.Context().Err() + case _, ok := <-pipe.Channel(): + if !ok { + return errDispatchSystem + } + case <-init: } - holdings, ok := data.(*account.Holdings) - if !ok { - return common.GetTypeAssertError("*account.Holdings", data) + holdings, err := exch.GetCachedAccountInfo(stream.Context(), assetType) + if err != nil { + return err } accounts := make([]*gctrpc.Account, len(holdings.Accounts)) diff --git a/exchanges/account/account.go b/exchanges/account/account.go index 2f1a0be9..2f55d7be 100644 --- a/exchanges/account/account.go +++ b/exchanges/account/account.go @@ -73,7 +73,7 @@ func SubscribeToExchangeAccount(exchange string) (dispatch.Pipe, error) { // Process processes new account holdings updates func Process(h *Holdings, c *Credentials) error { - return service.Update(h, c) + return service.Save(h, c) } // GetHoldings returns full holdings for an exchange. @@ -101,28 +101,30 @@ func GetHoldings(exch string, creds *Credentials, assetType asset.Item) (Holding return Holdings{}, fmt.Errorf("%s %w: `%s`", exch, ErrExchangeHoldingsNotFound, assetType) } - subAccountHoldings, ok := accounts.SubAccounts[*creds] + subAccountHoldings, ok := accounts.subAccounts[*creds] if !ok { return Holdings{}, fmt.Errorf("%s %s %s %w %w", exch, creds, assetType, errNoCredentialBalances, ErrExchangeHoldingsNotFound) } currencyBalances := make([]Balance, 0, len(subAccountHoldings)) cpy := *creds - for mapKey, assetHoldings := range subAccountHoldings { + for mapKey, assets := range subAccountHoldings { if mapKey.Asset != assetType { continue } - assetHoldings.m.Lock() - currencyBalances = append(currencyBalances, Balance{ - Currency: mapKey.Currency.Currency().Upper(), - Total: assetHoldings.total, - Hold: assetHoldings.hold, - Free: assetHoldings.free, - AvailableWithoutBorrow: assetHoldings.availableWithoutBorrow, - Borrowed: assetHoldings.borrowed, - UpdatedAt: assetHoldings.updatedAt, - }) - assetHoldings.m.Unlock() + for currItem, bal := range assets { + bal.m.Lock() + currencyBalances = append(currencyBalances, Balance{ + Currency: currItem.Currency().Upper(), + Total: bal.total, + Hold: bal.hold, + Free: bal.free, + AvailableWithoutBorrow: bal.availableWithoutBorrow, + Borrowed: bal.borrowed, + UpdatedAt: bal.updatedAt, + }) + bal.m.Unlock() + } if cpy.SubAccount == "" && mapKey.SubAccount != "" { // TODO: fix this backwards population // the subAccount here may not be associated with the balance across all subAccountHoldings @@ -141,13 +143,13 @@ func GetHoldings(exch string, creds *Credentials, assetType asset.Item) (Holding } // GetBalance returns the internal balance for that asset item. -func GetBalance(exch, subAccount string, creds *Credentials, ai asset.Item, c currency.Code) (*ProtectedBalance, error) { +func GetBalance(exch, subAccount string, creds *Credentials, a asset.Item, c currency.Code) (*ProtectedBalance, error) { if exch == "" { return nil, fmt.Errorf("cannot get balance: %w", errExchangeNameUnset) } - if !ai.IsValid() { - return nil, fmt.Errorf("cannot get balance: %s %w", ai, asset.ErrNotSupported) + if !a.IsValid() { + return nil, fmt.Errorf("cannot get balance: %s %w", a, asset.ErrNotSupported) } if creds.IsEmpty() { @@ -164,29 +166,31 @@ func GetBalance(exch, subAccount string, creds *Credentials, ai asset.Item, c cu accounts, ok := service.exchangeAccounts[exch] if !ok { - return nil, fmt.Errorf("%s %w", exch, ErrExchangeHoldingsNotFound) + return nil, fmt.Errorf("%w for %s", ErrExchangeHoldingsNotFound, exch) } - subAccounts, ok := accounts.SubAccounts[*creds] + subAccounts, ok := accounts.subAccounts[*creds] if !ok { - return nil, fmt.Errorf("%s %s %w", - exch, creds, errNoCredentialBalances) + return nil, fmt.Errorf("%w for %s %s", errNoCredentialBalances, exch, creds) } - bal, ok := subAccounts[key.SubAccountCurrencyAsset{ + assets, ok := subAccounts[key.SubAccountAsset{ SubAccount: subAccount, - Currency: c.Item, - Asset: ai, + Asset: a, }] if !ok { - return nil, fmt.Errorf("%s %s %s %s %w", - exch, subAccount, ai, c, errNoExchangeSubAccountBalances) + return nil, fmt.Errorf("%w for %s SubAccount %q %s %s", errNoExchangeSubAccountBalances, exch, subAccount, a, c) + } + bal, ok := assets[c.Item] + if !ok { + return nil, fmt.Errorf("%w for %s SubAccount %q %s %s", errNoExchangeSubAccountBalances, exch, subAccount, a, c) } return bal, nil } -// Update updates holdings with new account info -func (s *Service) Update(incoming *Holdings, creds *Credentials) error { +// Save saves the holdings with new account info +// incoming should be a full update, and any missing currencies will be zeroed +func (s *Service) Save(incoming *Holdings, creds *Credentials) error { if incoming == nil { return fmt.Errorf("cannot update holdings: %w", errHoldingsIsNil) } @@ -202,19 +206,26 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error { exch := strings.ToLower(incoming.Exchange) s.mu.Lock() defer s.mu.Unlock() - accounts, ok := s.exchangeAccounts[exch] - if !ok { + + accounts, exist := s.exchangeAccounts[exch] + if !exist { id, err := s.mux.GetID() if err != nil { return err } accounts = &Accounts{ ID: id, - SubAccounts: make(map[Credentials]map[key.SubAccountCurrencyAsset]*ProtectedBalance), + subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances), } s.exchangeAccounts[exch] = accounts } + subAccounts, exist := accounts.subAccounts[*creds] + if !exist { + subAccounts = make(map[key.SubAccountAsset]currencyBalances) + accounts.subAccounts[*creds] = subAccounts + } + var errs error for x := range incoming.Accounts { if !incoming.Accounts[x].AssetType.IsValid() { @@ -235,46 +246,47 @@ func (s *Service) Update(incoming *Holdings, creds *Credentials) error { } incoming.Accounts[x].Credentials.creds = cpy - var subAccounts map[key.SubAccountCurrencyAsset]*ProtectedBalance - subAccounts, ok = accounts.SubAccounts[*creds] - if !ok { - subAccounts = make(map[key.SubAccountCurrencyAsset]*ProtectedBalance) - accounts.SubAccounts[*creds] = subAccounts + accAsset := key.SubAccountAsset{ + SubAccount: incoming.Accounts[x].ID, + Asset: incoming.Accounts[x].AssetType, + } + assets, exist := subAccounts[accAsset] + if !exist { + assets = make(map[*currency.Item]*ProtectedBalance) + accounts.subAccounts[*creds][accAsset] = assets } + updated := make(map[*currency.Item]bool) for y := range incoming.Accounts[x].Currencies { - if incoming.Accounts[x].Currencies[y].UpdatedAt.IsZero() { - incoming.Accounts[x].Currencies[y].UpdatedAt = time.Now() + accBal := &incoming.Accounts[x].Currencies[y] + if accBal.UpdatedAt.IsZero() { + accBal.UpdatedAt = time.Now() } - // Note: Sub accounts are case sensitive and an account "name" is - // different to account "naMe". - bal, ok := subAccounts[key.SubAccountCurrencyAsset{ - SubAccount: incoming.Accounts[x].ID, - Currency: incoming.Accounts[x].Currencies[y].Currency.Item, - Asset: incoming.Accounts[x].AssetType, - }] + bal, ok := assets[accBal.Currency.Item] if !ok || bal == nil { bal = &ProtectedBalance{} - subAccounts[key.SubAccountCurrencyAsset{ - SubAccount: incoming.Accounts[x].ID, - Currency: incoming.Accounts[x].Currencies[y].Currency.Item, - Asset: incoming.Accounts[x].AssetType, - }] = bal } - if err := bal.load(&incoming.Accounts[x].Currencies[y]); err != nil { + if err := bal.load(accBal); err != nil { errs = common.AppendError(errs, fmt.Errorf("%w for account ID `%s` [%s %s]: %w", errLoadingBalance, incoming.Accounts[x].ID, incoming.Accounts[x].AssetType, incoming.Accounts[x].Currencies[y].Currency, err)) + continue + } + assets[accBal.Currency.Item] = bal + updated[accBal.Currency.Item] = true + } + for cur, bal := range assets { + if !updated[cur] { + bal.reset() } } - } - err := s.mux.Publish(incoming, accounts.ID) - if err != nil { - return err + if err := s.mux.Publish(incoming.Accounts[x], accounts.ID); err != nil { + errs = common.AppendError(errs, fmt.Errorf("cannot publish load for %s %w", exch, err)) + } } return errs @@ -342,3 +354,16 @@ func (b *ProtectedBalance) GetFree() float64 { defer b.m.Unlock() return b.free } + +func (b *ProtectedBalance) reset() { + b.m.Lock() + defer b.m.Unlock() + + b.total = 0 + b.hold = 0 + b.free = 0 + b.availableWithoutBorrow = 0 + b.borrowed = 0 + b.updatedAt = time.Now() + b.notice.Alert() +} diff --git a/exchanges/account/account_test.go b/exchanges/account/account_test.go index 960b6ba6..022f403c 100644 --- a/exchanges/account/account_test.go +++ b/exchanges/account/account_test.go @@ -113,25 +113,6 @@ func TestGetHoldings(t *testing.T) { }, happyCredentials) assert.NoError(t, err) - // process again with no changes - err = Process(&Holdings{ - Exchange: "Test", - Accounts: []SubAccount{ - { - AssetType: asset.Spot, - ID: "1337", - Currencies: []Balance{ - { - Currency: currency.BTC, - Total: 100, - Hold: 20, - }, - }, - }, - }, - }, happyCredentials) - assert.NoError(t, err) - _, err = GetHoldings("", nil, asset.Spot) assert.ErrorIs(t, err, errExchangeNameUnset) @@ -337,20 +318,16 @@ func TestGetFree(t *testing.T) { } } -func TestUpdate(t *testing.T) { +func TestSave(t *testing.T) { t.Parallel() s := &Service{exchangeAccounts: make(map[string]*Accounts), mux: dispatch.GetNewMux(nil)} - err := s.Update(nil, nil) - if !errors.Is(err, errHoldingsIsNil) { - t.Fatalf("received: '%v' but expected: '%v'", err, errHoldingsIsNil) - } + err := s.Save(nil, nil) + assert.ErrorIs(t, err, errHoldingsIsNil) - err = s.Update(&Holdings{}, nil) - if !errors.Is(err, errExchangeNameUnset) { - t.Fatalf("received: '%v' but expected: '%v'", err, errExchangeNameUnset) - } + err = s.Save(&Holdings{}, nil) + assert.ErrorIs(t, err, errExchangeNameUnset) - err = s.Update(&Holdings{ + err = s.Save(&Holdings{ Exchange: "TeSt", Accounts: []SubAccount{ { @@ -365,24 +342,11 @@ func TestUpdate(t *testing.T) { }, }, {AssetType: asset.UpsideProfitContract, ID: "1337"}, - { - AssetType: asset.Spot, - ID: "1337", - Currencies: []Balance{ - { - Currency: currency.BTC, - Total: 100, - Hold: 20, - }, - }, - }, }, }, happyCredentials) - if !errors.Is(err, asset.ErrNotSupported) { - t.Fatalf("received: '%v' but expected: '%v'", err, asset.ErrNotSupported) - } + assert.ErrorIs(t, err, asset.ErrNotSupported) - err = s.Update(&Holdings{ // No change + err = s.Save(&Holdings{ // No change Exchange: "tEsT", Accounts: []SubAccount{ { @@ -398,25 +362,51 @@ func TestUpdate(t *testing.T) { }, }, }, happyCredentials) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected: '%v'", err, nil) - } + require.NoError(t, err) acc, ok := s.exchangeAccounts["test"] - if !ok { - t.Fatal("account should be loaded") - } + require.True(t, ok) - b, ok := acc.SubAccounts[Credentials{Key: "AAAAA"}][key.SubAccountCurrencyAsset{ + assets, ok := acc.subAccounts[*happyCredentials][key.SubAccountAsset{ SubAccount: "1337", - Currency: currency.BTC.Item, Asset: asset.Spot, }] - if !ok { - t.Fatal("account should be loaded") - } + require.True(t, ok) + b, ok := assets[currency.BTC.Item] + require.True(t, ok) + + assert.NotEmpty(t, b.updatedAt) assert.Equal(t, 100.0, b.total) assert.Equal(t, 20.0, b.hold) + + err = s.Save(&Holdings{ + Exchange: "tEsT", + Accounts: []SubAccount{ + { + AssetType: asset.Spot, + ID: "1337", + Currencies: []Balance{ + { + Currency: currency.ETH, + Total: 80, + Hold: 20, + }, + }, + }, + }, + }, happyCredentials) + require.NoError(t, err) + + b, ok = assets[currency.BTC.Item] + require.True(t, ok) assert.NotEmpty(t, b.updatedAt) + assert.Zero(t, b.total) + assert.Zero(t, b.hold) + + e, ok := assets[currency.ETH.Item] + require.True(t, ok) + assert.NotEmpty(t, e.updatedAt) + assert.Equal(t, 80.0, e.total) + assert.Equal(t, 20.0, e.hold) } diff --git a/exchanges/account/account_types.go b/exchanges/account/account_types.go index 145dee0a..e4f46874 100644 --- a/exchanges/account/account_types.go +++ b/exchanges/account/account_types.go @@ -34,9 +34,11 @@ type Accounts struct { // TODO: Credential tracker to match to keys that are managed and return // pointer. // TODO: Have different cred struct for centralized verse DEFI exchanges. - SubAccounts map[Credentials]map[key.SubAccountCurrencyAsset]*ProtectedBalance + subAccounts map[Credentials]map[key.SubAccountAsset]currencyBalances } +type currencyBalances = map[*currency.Item]*ProtectedBalance + // Holdings is a generic type to hold each exchange's holdings for all enabled // currencies type Holdings struct {