account: add ProcessChange for update balance (#1875)

Signed-off-by: Ye Sijun <junnplus@gmail.com>
This commit is contained in:
Jun
2025-05-09 13:52:11 +09:00
committed by GitHub
parent c9bead8ff3
commit 05880d62e6
14 changed files with 424 additions and 119 deletions

View File

@@ -337,12 +337,12 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data any
log.Warnln(log.WebsocketMgr, d.Message)
case account.Change:
if m.verbose {
m.printAccountHoldingsChangeSummary(d)
m.printAccountHoldingsChangeSummary(exchName, d)
}
case []account.Change:
if m.verbose {
for x := range d {
m.printAccountHoldingsChangeSummary(d[x])
m.printAccountHoldingsChangeSummary(exchName, d[x])
}
}
case []trade.Data, trade.Data:
@@ -404,16 +404,16 @@ func (m *WebsocketRoutineManager) printOrderSummary(o *order.Detail, isUpdate bo
// printAccountHoldingsChangeSummary this function will be deprecated when a
// account holdings update is done.
func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(o account.Change) {
if m == nil || atomic.LoadInt32(&m.state) == stoppedState {
func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(exch string, o account.Change) {
if m == nil || atomic.LoadInt32(&m.state) == stoppedState || o.Balance == nil {
return
}
log.Debugf(log.WebsocketMgr,
"Account Holdings Balance Changed: %s %s %s has changed balance by %f for account: %s",
o.Exchange,
o.Asset,
o.Currency,
o.Amount,
exch,
o.AssetType,
o.Balance.Currency,
o.Balance.Total,
o.Account)
}

View File

@@ -26,7 +26,6 @@ var (
var (
errHoldingsIsNil = errors.New("holdings cannot be nil")
errExchangeNameUnset = errors.New("exchange name unset")
errExchangeAccountsNotFound = errors.New("exchange accounts not found")
errNoExchangeSubAccountBalances = errors.New("no exchange sub account balances")
errBalanceIsNil = errors.New("balance is nil")
errNoCredentialBalances = errors.New("no balances associated with credentials")
@@ -34,8 +33,29 @@ var (
errOutOfSequence = errors.New("out of sequence")
errUpdatedAtIsZero = errors.New("updatedAt may not be zero")
errLoadingBalance = errors.New("error loading balance")
errExchangeAlreadyExists = errors.New("exchange already exists")
errCannotUpdateBalance = errors.New("cannot update balance")
)
// initAccounts adds a new empty shared account accounts entry for an exchange
// must be called with s.mu locked
func (s *Service) initAccounts(exch string) (*Accounts, error) {
id, err := s.mux.GetID()
if err != nil {
return nil, err
}
_, ok := s.exchangeAccounts[exch]
if ok {
return nil, errExchangeAlreadyExists
}
accounts := &Accounts{
ID: id,
subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances),
}
s.exchangeAccounts[exch] = accounts
return accounts, nil
}
// CollectBalances converts a map of sub-account balances into a slice
func CollectBalances(accountBalances map[string][]Balance, assetType asset.Item) (accounts []SubAccount, err error) {
if accountBalances == nil {
@@ -64,9 +84,10 @@ func SubscribeToExchangeAccount(exchange string) (dispatch.Pipe, error) {
defer service.mu.Unlock()
accounts, ok := service.exchangeAccounts[exchange]
if !ok {
return dispatch.Pipe{}, fmt.Errorf("cannot subscribe %s %w",
exchange,
errExchangeAccountsNotFound)
var err error
if accounts, err = service.initAccounts(exchange); err != nil {
return dispatch.Pipe{}, fmt.Errorf("cannot subscribe to exchange account %w", err)
}
}
return service.mux.Subscribe(accounts.ID)
}
@@ -76,6 +97,11 @@ func Process(h *Holdings, c *Credentials) error {
return service.Save(h, c)
}
// ProcessChange updates the changes to the exchange account
func ProcessChange(exch string, changes []Change, c *Credentials) error {
return service.Update(exch, changes, c)
}
// GetHoldings returns full holdings for an exchange.
// NOTE: Due to credentials these amounts could be N*APIKEY actual holdings.
// TODO: Add jurisdiction and differentiation between APIKEY holdings.
@@ -192,36 +218,31 @@ func GetBalance(exch, subAccount string, creds *Credentials, a asset.Item, c cur
// incoming should be a full update, and any missing currencies will be zeroed
func (s *Service) Save(incoming *Holdings, creds *Credentials) error {
if incoming == nil {
return fmt.Errorf("cannot update holdings: %w", errHoldingsIsNil)
return fmt.Errorf("cannot save holdings: %w", errHoldingsIsNil)
}
if incoming.Exchange == "" {
return fmt.Errorf("cannot update holdings: %w", errExchangeNameUnset)
return fmt.Errorf("cannot save holdings: %w", errExchangeNameUnset)
}
if creds.IsEmpty() {
return fmt.Errorf("cannot update holdings: %w", errCredentialsAreNil)
return fmt.Errorf("cannot save holdings: %w", errCredentialsAreNil)
}
exch := strings.ToLower(incoming.Exchange)
s.mu.Lock()
defer s.mu.Unlock()
accounts, exist := s.exchangeAccounts[exch]
if !exist {
id, err := s.mux.GetID()
if err != nil {
return err
accounts, ok := s.exchangeAccounts[exch]
if !ok {
var err error
if accounts, err = s.initAccounts(exch); err != nil {
return fmt.Errorf("cannot save holdings for %s %w", exch, err)
}
accounts = &Accounts{
ID: id,
subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances),
}
s.exchangeAccounts[exch] = accounts
}
subAccounts, exist := accounts.subAccounts[*creds]
if !exist {
subAccounts, ok := accounts.subAccounts[*creds]
if !ok {
subAccounts = make(map[key.SubAccountAsset]currencyBalances)
accounts.subAccounts[*creds] = subAccounts
}
@@ -250,8 +271,8 @@ func (s *Service) Save(incoming *Holdings, creds *Credentials) error {
SubAccount: incoming.Accounts[x].ID,
Asset: incoming.Accounts[x].AssetType,
}
assets, exist := subAccounts[accAsset]
if !exist {
assets, ok := subAccounts[accAsset]
if !ok {
assets = make(map[*currency.Item]*ProtectedBalance)
accounts.subAccounts[*creds][accAsset] = assets
}
@@ -292,6 +313,78 @@ func (s *Service) Save(incoming *Holdings, creds *Credentials) error {
return errs
}
// Update updates the balance for a specific exchange and credentials
func (s *Service) Update(exch string, changes []Change, creds *Credentials) error {
if exch == "" {
return fmt.Errorf("%w: %w", errCannotUpdateBalance, errExchangeNameUnset)
}
if creds.IsEmpty() {
return fmt.Errorf("%w: %w", errCannotUpdateBalance, errCredentialsAreNil)
}
exch = strings.ToLower(exch)
s.mu.Lock()
defer s.mu.Unlock()
accounts, ok := s.exchangeAccounts[exch]
if !ok {
var err error
if accounts, err = s.initAccounts(exch); err != nil {
return fmt.Errorf("%w for %s %w", errCannotUpdateBalance, exch, err)
}
}
subAccounts, ok := accounts.subAccounts[*creds]
if !ok {
subAccounts = make(map[key.SubAccountAsset]currencyBalances)
accounts.subAccounts[*creds] = subAccounts
}
var errs error
for _, change := range changes {
if !change.AssetType.IsValid() {
errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s %w",
errCannotUpdateBalance, change.Account, change.AssetType, asset.ErrNotSupported))
continue
}
if change.Balance == nil {
errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s %w",
errCannotUpdateBalance, change.Account, change.AssetType, errBalanceIsNil))
continue
}
accAsset := key.SubAccountAsset{
SubAccount: change.Account,
Asset: change.AssetType,
}
assets, ok := subAccounts[accAsset]
if !ok {
assets = make(map[*currency.Item]*ProtectedBalance)
accounts.subAccounts[*creds][accAsset] = assets
}
bal, ok := assets[change.Balance.Currency.Item]
if !ok || bal == nil {
bal = &ProtectedBalance{}
assets[change.Balance.Currency.Item] = bal
}
if err := bal.load(change.Balance); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s.%s %w",
errCannotUpdateBalance,
change.Account,
change.AssetType,
change.Balance.Currency,
err))
continue
}
if err := s.mux.Publish(change, accounts.ID); err != nil {
errs = common.AppendError(errs, fmt.Errorf("cannot publish update balance for %s: %w", exch, err))
}
}
return errs
}
// load checks to see if there is a change from incoming balance, if there is a
// change it will change then alert external routines.
func (b *ProtectedBalance) load(change *Balance) error {

View File

@@ -144,7 +144,7 @@ func TestGetHoldings(t *testing.T) {
assert.Equal(t, 20.0, u.Accounts[0].Currencies[0].Hold)
_, err = SubscribeToExchangeAccount("nonsense")
assert.ErrorIs(t, err, errExchangeAccountsNotFound)
require.NoError(t, err)
p, err := SubscribeToExchangeAccount("Test")
require.NoError(t, err)
@@ -410,3 +410,105 @@ func TestSave(t *testing.T) {
assert.Equal(t, 80.0, e.total)
assert.Equal(t, 20.0, e.hold)
}
func TestUpdate(t *testing.T) {
t.Parallel()
s := &Service{exchangeAccounts: make(map[string]*Accounts), mux: dispatch.GetNewMux(nil)}
err := s.Update("", nil, nil)
assert.ErrorIs(t, err, errExchangeNameUnset)
err = s.Update("test", nil, nil)
assert.ErrorIs(t, err, errCredentialsAreNil)
err = s.Update("test", []Change{
{
AssetType: 6969,
Balance: &Balance{
Currency: currency.BTC,
Free: 100,
},
},
}, happyCredentials)
assert.ErrorIs(t, err, asset.ErrNotSupported)
now := time.Now()
err = s.Update("test", []Change{
{
AssetType: asset.Spot,
Account: "1337",
Balance: &Balance{
Currency: currency.BTC,
Total: 100,
Free: 80,
UpdatedAt: now,
},
},
}, happyCredentials)
require.NoError(t, err)
acc, ok := s.exchangeAccounts["test"]
require.True(t, ok, "Update must add the exchange")
assets, ok := acc.subAccounts[*happyCredentials][key.SubAccountAsset{
SubAccount: "1337",
Asset: asset.Spot,
}]
require.True(t, ok, "Update must add subAccount for the credentials")
b, ok := assets[currency.BTC.Item]
require.True(t, ok, "Update must add currency to the subAccount")
assert.Equal(t, 100.0, b.total, "Update should set total correctly")
assert.Equal(t, 80.0, b.free, "Update should set free correctly")
assert.Equal(t, now, b.updatedAt, "Update should set updatedAt correctly")
err = s.Update("test", []Change{
{
AssetType: asset.Spot,
Account: "1337",
Balance: &Balance{
Currency: currency.BTC,
Total: 100,
Free: 100,
UpdatedAt: now.Add(-1 * time.Second),
},
},
}, happyCredentials)
assert.ErrorIs(t, err, errOutOfSequence)
err = s.Update("test", []Change{
{
AssetType: asset.Spot,
Account: "1337",
Balance: &Balance{
Currency: currency.BTC,
Total: 100,
Free: 100,
UpdatedAt: now.Add(1 * time.Second),
},
},
}, happyCredentials)
require.NoError(t, err)
assert.Equal(t, 100.0, b.total)
assert.Equal(t, 100.0, b.free)
assert.Equal(t, now.Add(1*time.Second), b.updatedAt)
}
func TestTrackNewAccounts(t *testing.T) {
t.Parallel()
s := &Service{
exchangeAccounts: make(map[string]*Accounts),
mux: dispatch.GetNewMux(nil),
}
s.mu.Lock()
_, err := s.initAccounts("binance")
s.mu.Unlock()
require.NoError(t, err)
s.mu.Lock()
_, err = s.initAccounts("binance")
s.mu.Unlock()
assert.ErrorIs(t, err, errExchangeAlreadyExists)
}

View File

@@ -67,11 +67,9 @@ type Balance struct {
// Change defines incoming balance change on currency holdings
type Change struct {
Exchange string
Currency currency.Code
Asset asset.Item
Amount float64
Account string
Account string
AssetType asset.Item
Balance *Balance
}
// ProtectedBalance stores the full balance information for that specific asset

View File

@@ -345,19 +345,26 @@ func (by *Bybit) wsProcessWalletPushData(assetType asset.Item, resp []byte) erro
if err != nil {
return err
}
accounts := []account.Change{}
creds, err := by.GetCredentials(context.TODO())
if err != nil {
return err
}
var changes []account.Change
for x := range result.Data {
for y := range result.Data[x].Coin {
accounts = append(accounts, account.Change{
Exchange: by.Name,
Currency: currency.NewCode(result.Data[x].Coin[y].Coin),
Asset: assetType,
Amount: result.Data[x].Coin[y].WalletBalance.Float64(),
changes = append(changes, account.Change{
AssetType: assetType,
Balance: &account.Balance{
Currency: currency.NewCode(result.Data[x].Coin[y].Coin),
Total: result.Data[x].Coin[y].WalletBalance.Float64(),
Free: result.Data[x].Coin[y].WalletBalance.Float64(),
UpdatedAt: result.CreationTime.Time(),
},
})
}
}
by.Websocket.DataHandler <- accounts
return nil
by.Websocket.DataHandler <- changes
return account.ProcessChange(by.Name, changes, creds)
}
// wsProcessOrder the order stream to see changes to your orders in real-time.

View File

@@ -8,6 +8,7 @@ import (
"log"
"os"
"slices"
"strings"
"sync"
"testing"
"time"
@@ -20,6 +21,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/fundingrate"
"github.com/thrasher-corp/gocryptotrader/exchanges/futures"
@@ -2026,7 +2028,8 @@ const wsBalancesPushDataJSON = `{"time": 1605248616, "channel": "spot.balances",
func TestBalancesPushData(t *testing.T) {
t.Parallel()
if err := g.WsHandleSpotData(t.Context(), []byte(wsBalancesPushDataJSON)); err != nil {
ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"})
if err := g.WsHandleSpotData(ctx, []byte(wsBalancesPushDataJSON)); err != nil {
t.Errorf("%s websocket balances push data error: %v", g.Name, err)
}
}
@@ -2044,7 +2047,8 @@ const wsCrossMarginBalancePushDataJSON = `{"time": 1605248616,"channel": "spot.c
func TestCrossMarginBalancePushData(t *testing.T) {
t.Parallel()
if err := g.WsHandleSpotData(t.Context(), []byte(wsCrossMarginBalancePushDataJSON)); err != nil {
ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"})
if err := g.WsHandleSpotData(ctx, []byte(wsCrossMarginBalancePushDataJSON)); err != nil {
t.Errorf("%s websocket cross margin balance push data error: %v", g.Name, err)
}
}
@@ -2063,7 +2067,13 @@ func TestFuturesDataHandler(t *testing.T) {
t.Parallel()
g := new(Gateio) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
testexch.FixtureToDataHandler(t, "testdata/wsFutures.json", func(m []byte) error { return g.WsHandleFuturesData(t.Context(), m, asset.CoinMarginedFutures) })
testexch.FixtureToDataHandler(t, "testdata/wsFutures.json", func(m []byte) error {
ctx := t.Context()
if strings.Contains(string(m), "futures.balances") {
ctx = account.DeployCredentialsToContext(ctx, &account.Credentials{Key: "test", Secret: "test"})
}
return g.WsHandleFuturesData(ctx, m, asset.CoinMarginedFutures)
})
close(g.Websocket.DataHandler)
assert.Len(t, g.Websocket.DataHandler, 14, "Should see the correct number of messages")
for resp := range g.Websocket.DataHandler {
@@ -2227,7 +2237,8 @@ const optionsBalancePushDataJSON = `{ "channel": "options.balances", "event": "u
func TestOptionsBalancePushData(t *testing.T) {
t.Parallel()
if err := g.WsHandleOptionsData(t.Context(), []byte(optionsBalancePushDataJSON)); err != nil {
ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"})
if err := g.WsHandleOptionsData(ctx, []byte(optionsBalancePushDataJSON)); err != nil {
t.Errorf("%s websocket options balance push data error: %v", g.Name, err)
}
}

View File

@@ -163,7 +163,7 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st
}
// WsHandleSpotData handles spot data
func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error {
func (g *Gateio) WsHandleSpotData(ctx context.Context, respRaw []byte) error {
push, err := parseWSHeader(respRaw)
if err != nil {
return err
@@ -195,13 +195,13 @@ func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error {
case spotUserTradesChannel:
return g.processUserPersonalTrades(respRaw)
case spotBalancesChannel:
return g.processSpotBalances(respRaw)
return g.processSpotBalances(ctx, respRaw)
case marginBalancesChannel:
return g.processMarginBalances(respRaw)
return g.processMarginBalances(ctx, respRaw)
case spotFundingBalanceChannel:
return g.processFundingBalances(respRaw)
case crossMarginBalanceChannel:
return g.processCrossMarginBalance(respRaw)
return g.processCrossMarginBalance(ctx, respRaw)
case crossMarginLoanChannel:
return g.processCrossMarginLoans(respRaw)
case spotPongChannel:
@@ -543,7 +543,7 @@ func (g *Gateio) processUserPersonalTrades(data []byte) error {
return g.Websocket.Fills.Update(fills...)
}
func (g *Gateio) processSpotBalances(data []byte) error {
func (g *Gateio) processSpotBalances(ctx context.Context, data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
@@ -554,21 +554,29 @@ func (g *Gateio) processSpotBalances(data []byte) error {
if err != nil {
return err
}
accountChanges := make([]account.Change, len(resp.Result))
for x := range resp.Result {
code := currency.NewCode(resp.Result[x].Currency)
accountChanges[x] = account.Change{
Exchange: g.Name,
Currency: code,
Asset: asset.Spot,
Amount: resp.Result[x].Available.Float64(),
creds, err := g.GetCredentials(ctx)
if err != nil {
return err
}
changes := make([]account.Change, len(resp.Result))
for i := range resp.Result {
changes[i] = account.Change{
Account: resp.Result[i].User,
AssetType: asset.Spot,
Balance: &account.Balance{
Currency: currency.NewCode(resp.Result[i].Currency),
Total: resp.Result[i].Total.Float64(),
Free: resp.Result[i].Available.Float64(),
Hold: resp.Result[i].Total.Float64() - resp.Result[i].Available.Float64(),
UpdatedAt: resp.Result[i].Timestamp.Time(),
},
}
}
g.Websocket.DataHandler <- accountChanges
return nil
g.Websocket.DataHandler <- changes
return account.ProcessChange(g.Name, changes, creds)
}
func (g *Gateio) processMarginBalances(data []byte) error {
func (g *Gateio) processMarginBalances(ctx context.Context, data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
@@ -579,18 +587,25 @@ func (g *Gateio) processMarginBalances(data []byte) error {
if err != nil {
return err
}
accountChange := make([]account.Change, len(resp.Result))
creds, err := g.GetCredentials(ctx)
if err != nil {
return err
}
changes := make([]account.Change, len(resp.Result))
for x := range resp.Result {
code := currency.NewCode(resp.Result[x].Currency)
accountChange[x] = account.Change{
Exchange: g.Name,
Currency: code,
Asset: asset.Margin,
Amount: resp.Result[x].Available.Float64(),
changes[x] = account.Change{
AssetType: asset.Margin,
Balance: &account.Balance{
Currency: currency.NewCode(resp.Result[x].Currency),
Total: resp.Result[x].Available.Float64() + resp.Result[x].Freeze.Float64(),
Free: resp.Result[x].Available.Float64(),
Hold: resp.Result[x].Freeze.Float64(),
UpdatedAt: resp.Result[x].Timestamp.Time(),
},
}
}
g.Websocket.DataHandler <- accountChange
return nil
g.Websocket.DataHandler <- changes
return account.ProcessChange(g.Name, changes, creds)
}
func (g *Gateio) processFundingBalances(data []byte) error {
@@ -608,7 +623,7 @@ func (g *Gateio) processFundingBalances(data []byte) error {
return nil
}
func (g *Gateio) processCrossMarginBalance(data []byte) error {
func (g *Gateio) processCrossMarginBalance(ctx context.Context, data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
@@ -619,19 +634,25 @@ func (g *Gateio) processCrossMarginBalance(data []byte) error {
if err != nil {
return err
}
accountChanges := make([]account.Change, len(resp.Result))
creds, err := g.GetCredentials(ctx)
if err != nil {
return err
}
changes := make([]account.Change, len(resp.Result))
for x := range resp.Result {
code := currency.NewCode(resp.Result[x].Currency)
accountChanges[x] = account.Change{
Exchange: g.Name,
Currency: code,
Asset: asset.Margin,
Amount: resp.Result[x].Available.Float64(),
Account: resp.Result[x].User,
changes[x] = account.Change{
Account: resp.Result[x].User,
AssetType: asset.Margin,
Balance: &account.Balance{
Currency: currency.NewCode(resp.Result[x].Currency),
Total: resp.Result[x].Total.Float64(),
Free: resp.Result[x].Available.Float64(),
UpdatedAt: resp.Result[x].Timestamp.Time(),
},
}
}
g.Websocket.DataHandler <- accountChanges
return nil
g.Websocket.DataHandler <- changes
return account.ProcessChange(g.Name, changes, creds)
}
func (g *Gateio) processCrossMarginLoans(data []byte) error {

View File

@@ -139,7 +139,7 @@ func (g *Gateio) FuturesUnsubscribe(ctx context.Context, conn websocket.Connecti
}
// WsHandleFuturesData handles futures websocket data
func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.Item) error {
func (g *Gateio) WsHandleFuturesData(ctx context.Context, respRaw []byte, a asset.Item) error {
push, err := parseWSHeader(respRaw)
if err != nil {
return err
@@ -182,7 +182,7 @@ func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.
case futuresAutoPositionCloseChannel:
return g.processPositionCloseData(respRaw)
case futuresBalancesChannel:
return g.processBalancePushData(respRaw, a)
return g.processBalancePushData(ctx, respRaw, a)
case futuresReduceRiskLimitsChannel:
return g.processFuturesReduceRiskLimitNotification(respRaw)
case futuresPositionsChannel:
@@ -652,7 +652,7 @@ func (g *Gateio) processPositionCloseData(data []byte) error {
return nil
}
func (g *Gateio) processBalancePushData(data []byte, assetType asset.Item) error {
func (g *Gateio) processBalancePushData(ctx context.Context, data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
@@ -663,23 +663,29 @@ func (g *Gateio) processBalancePushData(data []byte, assetType asset.Item) error
if err != nil {
return err
}
accountChange := make([]account.Change, len(resp.Result))
for x := range resp.Result {
info := strings.Split(resp.Result[x].Text, currency.UnderscoreDelimiter)
creds, err := g.GetCredentials(ctx)
if err != nil {
return err
}
changes := make([]account.Change, len(resp.Result))
for x, bal := range resp.Result {
info := strings.Split(bal.Text, currency.UnderscoreDelimiter)
if len(info) != 2 {
return errors.New("malformed text")
}
code := currency.NewCode(info[0])
accountChange[x] = account.Change{
Exchange: g.Name,
Currency: code,
Asset: assetType,
Amount: resp.Result[x].Balance,
Account: resp.Result[x].User,
changes[x] = account.Change{
AssetType: assetType,
Account: bal.User,
Balance: &account.Balance{
Currency: currency.NewCode(info[0]),
Total: bal.Balance,
Free: bal.Balance,
UpdatedAt: bal.Time.Time(),
},
}
}
g.Websocket.DataHandler <- accountChange
return nil
g.Websocket.DataHandler <- changes
return account.ProcessChange(g.Name, changes, creds)
}
func (g *Gateio) processFuturesReduceRiskLimitNotification(data []byte) error {

View File

@@ -293,7 +293,7 @@ func (g *Gateio) OptionsUnsubscribe(ctx context.Context, conn websocket.Connecti
}
// WsHandleOptionsData handles options websocket data
func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error {
func (g *Gateio) WsHandleOptionsData(ctx context.Context, respRaw []byte) error {
push, err := parseWSHeader(respRaw)
if err != nil {
return err
@@ -339,7 +339,7 @@ func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error {
case optionsPositionCloseChannel:
return g.processPositionCloseData(respRaw)
case optionsBalancesChannel:
return g.processBalancePushData(respRaw, asset.Options)
return g.processBalancePushData(ctx, respRaw, asset.Options)
case optionsPositionsChannel:
return g.processOptionsPositionPushData(respRaw)
default:

View File

@@ -2302,6 +2302,9 @@ func TestGetAuthenticatedServersInstances(t *testing.T) {
func TestPushData(t *testing.T) {
t.Parallel()
ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
ku.SetCredentials("mock", "test", "test", "", "", "")
ku.API.AuthenticatedSupport = true
ku.API.AuthenticatedWebsocketSupport = true
testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", ku.wsHandleData)
}

View File

@@ -358,13 +358,24 @@ func (ku *Kucoin) processFuturesAccountBalanceEvent(respData []byte) error {
if err := json.Unmarshal(respData, &resp); err != nil {
return err
}
ku.Websocket.DataHandler <- account.Change{
Exchange: ku.Name,
Currency: currency.NewCode(resp.Currency),
Asset: asset.Futures,
Amount: resp.AvailableBalance,
creds, err := ku.GetCredentials(context.TODO())
if err != nil {
return err
}
return nil
changes := []account.Change{
{
AssetType: asset.Futures,
Balance: &account.Balance{
Currency: currency.NewCode(resp.Currency),
Total: resp.AvailableBalance + resp.HoldBalance,
Hold: resp.HoldBalance,
Free: resp.AvailableBalance,
UpdatedAt: resp.Timestamp.Time(),
},
},
}
ku.Websocket.DataHandler <- changes
return account.ProcessChange(ku.Name, changes, creds)
}
// processFuturesStopOrderLifecycleEvent processes futures stop orders lifecycle events.
@@ -681,13 +692,24 @@ func (ku *Kucoin) processAccountBalanceChange(respData []byte) error {
if err != nil {
return err
}
ku.Websocket.DataHandler <- account.Change{
Exchange: ku.Name,
Currency: currency.NewCode(response.Currency),
Asset: asset.Futures,
Amount: response.Available,
creds, err := ku.GetCredentials(context.TODO())
if err != nil {
return err
}
return nil
changes := []account.Change{
{
AssetType: asset.Futures,
Balance: &account.Balance{
Currency: currency.NewCode(response.Currency),
Total: response.Total,
Hold: response.Hold,
Free: response.Available,
UpdatedAt: response.Time.Time(),
},
},
}
ku.Websocket.DataHandler <- changes
return account.ProcessChange(ku.Name, changes, creds)
}
// processOrderChangeEvent processes order update events.

View File

@@ -4024,7 +4024,18 @@ var pushDataMap = map[string]string{
func TestPushData(t *testing.T) {
t.Parallel()
var err error
ok := new(Okx) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(ok), "Setup must not error")
for x := range pushDataMap {
if x == "Balance And Position" {
ok.API.AuthenticatedSupport = true
ok.API.AuthenticatedWebsocketSupport = true
ok.SetCredentials("test", "test", "test", "", "", "")
} else {
ok.API.AuthenticatedSupport = false
ok.API.AuthenticatedWebsocketSupport = false
}
err = ok.WsHandleData([]byte(pushDataMap[x]))
require.NoErrorf(t, err, "Okx %s error %v", x, err)
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
@@ -543,8 +544,7 @@ func (ok *Okx) WsHandleData(respRaw []byte) error {
var response WsPositionResponse
return ok.wsProcessPushData(respRaw, &response)
case channelBalanceAndPosition:
var response WsBalanceAndPosition
return ok.wsProcessPushData(respRaw, &response)
return ok.wsProcessBalanceAndPosition(respRaw)
case channelOrders:
return ok.wsProcessOrders(respRaw)
case channelAlgoOrders:
@@ -1518,6 +1518,35 @@ func (ok *Okx) wsProcessBlockPublicTrades(data []byte) error {
return trade.AddTradesToBuffer(trades...)
}
func (ok *Okx) wsProcessBalanceAndPosition(data []byte) error {
var resp WsBalanceAndPosition
if err := json.Unmarshal(data, &resp); err != nil {
return err
}
creds, err := ok.GetCredentials(context.TODO())
if err != nil {
return err
}
var changes []account.Change
for i := range resp.Data {
for j := range resp.Data[i].BalanceData {
changes = append(changes, account.Change{
AssetType: asset.Spot,
Account: resp.Argument.UID,
Balance: &account.Balance{
Currency: currency.NewCode(resp.Data[i].BalanceData[j].Currency),
Total: resp.Data[i].BalanceData[j].CashBalance.Float64(),
Free: resp.Data[i].BalanceData[j].CashBalance.Float64(),
UpdatedAt: resp.Data[i].BalanceData[j].UpdateTime.Time(),
},
})
}
// TODO: Handle position data
}
ok.Websocket.DataHandler <- changes
return account.ProcessChange(ok.Name, changes, creds)
}
// wsProcessPushData processes push data coming through the websocket channel
func (ok *Okx) wsProcessPushData(data []byte, resp any) error {
if err := json.Unmarshal(data, resp); err != nil {

View File

@@ -948,11 +948,13 @@ func (p *Poloniex) processAccountBalanceUpdate(notification []any) error {
// NOTES: This will affect free amount, a rest call might be needed to get
// locked and total amounts periodically.
p.Websocket.DataHandler <- account.Change{
Exchange: p.Name,
Currency: code,
Asset: asset.Spot,
Account: deriveWalletType(walletType),
Amount: amount,
Account: deriveWalletType(walletType),
AssetType: asset.Spot,
Balance: &account.Balance{
Currency: code,
Total: amount,
Free: amount,
},
}
return nil
}