diff --git a/engine/websocketroutine_manager.go b/engine/websocketroutine_manager.go index 8795661b..d255a135 100644 --- a/engine/websocketroutine_manager.go +++ b/engine/websocketroutine_manager.go @@ -337,12 +337,12 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data any log.Warnln(log.WebsocketMgr, d.Message) case account.Change: if m.verbose { - m.printAccountHoldingsChangeSummary(d) + m.printAccountHoldingsChangeSummary(exchName, d) } case []account.Change: if m.verbose { for x := range d { - m.printAccountHoldingsChangeSummary(d[x]) + m.printAccountHoldingsChangeSummary(exchName, d[x]) } } case []trade.Data, trade.Data: @@ -404,16 +404,16 @@ func (m *WebsocketRoutineManager) printOrderSummary(o *order.Detail, isUpdate bo // printAccountHoldingsChangeSummary this function will be deprecated when a // account holdings update is done. -func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(o account.Change) { - if m == nil || atomic.LoadInt32(&m.state) == stoppedState { +func (m *WebsocketRoutineManager) printAccountHoldingsChangeSummary(exch string, o account.Change) { + if m == nil || atomic.LoadInt32(&m.state) == stoppedState || o.Balance == nil { return } log.Debugf(log.WebsocketMgr, "Account Holdings Balance Changed: %s %s %s has changed balance by %f for account: %s", - o.Exchange, - o.Asset, - o.Currency, - o.Amount, + exch, + o.AssetType, + o.Balance.Currency, + o.Balance.Total, o.Account) } diff --git a/exchanges/account/account.go b/exchanges/account/account.go index 2f55d7be..8f7d20b2 100644 --- a/exchanges/account/account.go +++ b/exchanges/account/account.go @@ -26,7 +26,6 @@ var ( var ( errHoldingsIsNil = errors.New("holdings cannot be nil") errExchangeNameUnset = errors.New("exchange name unset") - errExchangeAccountsNotFound = errors.New("exchange accounts not found") errNoExchangeSubAccountBalances = errors.New("no exchange sub account balances") errBalanceIsNil = errors.New("balance is nil") errNoCredentialBalances = errors.New("no balances associated with credentials") @@ -34,8 +33,29 @@ var ( errOutOfSequence = errors.New("out of sequence") errUpdatedAtIsZero = errors.New("updatedAt may not be zero") errLoadingBalance = errors.New("error loading balance") + errExchangeAlreadyExists = errors.New("exchange already exists") + errCannotUpdateBalance = errors.New("cannot update balance") ) +// initAccounts adds a new empty shared account accounts entry for an exchange +// must be called with s.mu locked +func (s *Service) initAccounts(exch string) (*Accounts, error) { + id, err := s.mux.GetID() + if err != nil { + return nil, err + } + _, ok := s.exchangeAccounts[exch] + if ok { + return nil, errExchangeAlreadyExists + } + accounts := &Accounts{ + ID: id, + subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances), + } + s.exchangeAccounts[exch] = accounts + return accounts, nil +} + // CollectBalances converts a map of sub-account balances into a slice func CollectBalances(accountBalances map[string][]Balance, assetType asset.Item) (accounts []SubAccount, err error) { if accountBalances == nil { @@ -64,9 +84,10 @@ func SubscribeToExchangeAccount(exchange string) (dispatch.Pipe, error) { defer service.mu.Unlock() accounts, ok := service.exchangeAccounts[exchange] if !ok { - return dispatch.Pipe{}, fmt.Errorf("cannot subscribe %s %w", - exchange, - errExchangeAccountsNotFound) + var err error + if accounts, err = service.initAccounts(exchange); err != nil { + return dispatch.Pipe{}, fmt.Errorf("cannot subscribe to exchange account %w", err) + } } return service.mux.Subscribe(accounts.ID) } @@ -76,6 +97,11 @@ func Process(h *Holdings, c *Credentials) error { return service.Save(h, c) } +// ProcessChange updates the changes to the exchange account +func ProcessChange(exch string, changes []Change, c *Credentials) error { + return service.Update(exch, changes, c) +} + // GetHoldings returns full holdings for an exchange. // NOTE: Due to credentials these amounts could be N*APIKEY actual holdings. // TODO: Add jurisdiction and differentiation between APIKEY holdings. @@ -192,36 +218,31 @@ func GetBalance(exch, subAccount string, creds *Credentials, a asset.Item, c cur // incoming should be a full update, and any missing currencies will be zeroed func (s *Service) Save(incoming *Holdings, creds *Credentials) error { if incoming == nil { - return fmt.Errorf("cannot update holdings: %w", errHoldingsIsNil) + return fmt.Errorf("cannot save holdings: %w", errHoldingsIsNil) } if incoming.Exchange == "" { - return fmt.Errorf("cannot update holdings: %w", errExchangeNameUnset) + return fmt.Errorf("cannot save holdings: %w", errExchangeNameUnset) } if creds.IsEmpty() { - return fmt.Errorf("cannot update holdings: %w", errCredentialsAreNil) + return fmt.Errorf("cannot save holdings: %w", errCredentialsAreNil) } exch := strings.ToLower(incoming.Exchange) s.mu.Lock() defer s.mu.Unlock() - accounts, exist := s.exchangeAccounts[exch] - if !exist { - id, err := s.mux.GetID() - if err != nil { - return err + accounts, ok := s.exchangeAccounts[exch] + if !ok { + var err error + if accounts, err = s.initAccounts(exch); err != nil { + return fmt.Errorf("cannot save holdings for %s %w", exch, err) } - accounts = &Accounts{ - ID: id, - subAccounts: make(map[Credentials]map[key.SubAccountAsset]currencyBalances), - } - s.exchangeAccounts[exch] = accounts } - subAccounts, exist := accounts.subAccounts[*creds] - if !exist { + subAccounts, ok := accounts.subAccounts[*creds] + if !ok { subAccounts = make(map[key.SubAccountAsset]currencyBalances) accounts.subAccounts[*creds] = subAccounts } @@ -250,8 +271,8 @@ func (s *Service) Save(incoming *Holdings, creds *Credentials) error { SubAccount: incoming.Accounts[x].ID, Asset: incoming.Accounts[x].AssetType, } - assets, exist := subAccounts[accAsset] - if !exist { + assets, ok := subAccounts[accAsset] + if !ok { assets = make(map[*currency.Item]*ProtectedBalance) accounts.subAccounts[*creds][accAsset] = assets } @@ -292,6 +313,78 @@ func (s *Service) Save(incoming *Holdings, creds *Credentials) error { return errs } +// Update updates the balance for a specific exchange and credentials +func (s *Service) Update(exch string, changes []Change, creds *Credentials) error { + if exch == "" { + return fmt.Errorf("%w: %w", errCannotUpdateBalance, errExchangeNameUnset) + } + + if creds.IsEmpty() { + return fmt.Errorf("%w: %w", errCannotUpdateBalance, errCredentialsAreNil) + } + + exch = strings.ToLower(exch) + s.mu.Lock() + defer s.mu.Unlock() + + accounts, ok := s.exchangeAccounts[exch] + if !ok { + var err error + if accounts, err = s.initAccounts(exch); err != nil { + return fmt.Errorf("%w for %s %w", errCannotUpdateBalance, exch, err) + } + } + + subAccounts, ok := accounts.subAccounts[*creds] + if !ok { + subAccounts = make(map[key.SubAccountAsset]currencyBalances) + accounts.subAccounts[*creds] = subAccounts + } + + var errs error + for _, change := range changes { + if !change.AssetType.IsValid() { + errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s %w", + errCannotUpdateBalance, change.Account, change.AssetType, asset.ErrNotSupported)) + continue + } + if change.Balance == nil { + errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s %w", + errCannotUpdateBalance, change.Account, change.AssetType, errBalanceIsNil)) + continue + } + + accAsset := key.SubAccountAsset{ + SubAccount: change.Account, + Asset: change.AssetType, + } + assets, ok := subAccounts[accAsset] + if !ok { + assets = make(map[*currency.Item]*ProtectedBalance) + accounts.subAccounts[*creds][accAsset] = assets + } + bal, ok := assets[change.Balance.Currency.Item] + if !ok || bal == nil { + bal = &ProtectedBalance{} + assets[change.Balance.Currency.Item] = bal + } + + if err := bal.load(change.Balance); err != nil { + errs = common.AppendError(errs, fmt.Errorf("%w for %s.%s.%s %w", + errCannotUpdateBalance, + change.Account, + change.AssetType, + change.Balance.Currency, + err)) + continue + } + if err := s.mux.Publish(change, accounts.ID); err != nil { + errs = common.AppendError(errs, fmt.Errorf("cannot publish update balance for %s: %w", exch, err)) + } + } + return errs +} + // load checks to see if there is a change from incoming balance, if there is a // change it will change then alert external routines. func (b *ProtectedBalance) load(change *Balance) error { diff --git a/exchanges/account/account_test.go b/exchanges/account/account_test.go index 022f403c..6c7d458e 100644 --- a/exchanges/account/account_test.go +++ b/exchanges/account/account_test.go @@ -144,7 +144,7 @@ func TestGetHoldings(t *testing.T) { assert.Equal(t, 20.0, u.Accounts[0].Currencies[0].Hold) _, err = SubscribeToExchangeAccount("nonsense") - assert.ErrorIs(t, err, errExchangeAccountsNotFound) + require.NoError(t, err) p, err := SubscribeToExchangeAccount("Test") require.NoError(t, err) @@ -410,3 +410,105 @@ func TestSave(t *testing.T) { assert.Equal(t, 80.0, e.total) assert.Equal(t, 20.0, e.hold) } + +func TestUpdate(t *testing.T) { + t.Parallel() + s := &Service{exchangeAccounts: make(map[string]*Accounts), mux: dispatch.GetNewMux(nil)} + err := s.Update("", nil, nil) + assert.ErrorIs(t, err, errExchangeNameUnset) + + err = s.Update("test", nil, nil) + assert.ErrorIs(t, err, errCredentialsAreNil) + + err = s.Update("test", []Change{ + { + AssetType: 6969, + Balance: &Balance{ + Currency: currency.BTC, + Free: 100, + }, + }, + }, happyCredentials) + assert.ErrorIs(t, err, asset.ErrNotSupported) + + now := time.Now() + err = s.Update("test", []Change{ + { + AssetType: asset.Spot, + Account: "1337", + Balance: &Balance{ + Currency: currency.BTC, + Total: 100, + Free: 80, + UpdatedAt: now, + }, + }, + }, happyCredentials) + require.NoError(t, err) + + acc, ok := s.exchangeAccounts["test"] + require.True(t, ok, "Update must add the exchange") + + assets, ok := acc.subAccounts[*happyCredentials][key.SubAccountAsset{ + SubAccount: "1337", + Asset: asset.Spot, + }] + require.True(t, ok, "Update must add subAccount for the credentials") + + b, ok := assets[currency.BTC.Item] + require.True(t, ok, "Update must add currency to the subAccount") + + assert.Equal(t, 100.0, b.total, "Update should set total correctly") + assert.Equal(t, 80.0, b.free, "Update should set free correctly") + assert.Equal(t, now, b.updatedAt, "Update should set updatedAt correctly") + + err = s.Update("test", []Change{ + { + AssetType: asset.Spot, + Account: "1337", + Balance: &Balance{ + Currency: currency.BTC, + Total: 100, + Free: 100, + UpdatedAt: now.Add(-1 * time.Second), + }, + }, + }, happyCredentials) + assert.ErrorIs(t, err, errOutOfSequence) + + err = s.Update("test", []Change{ + { + AssetType: asset.Spot, + Account: "1337", + Balance: &Balance{ + Currency: currency.BTC, + Total: 100, + Free: 100, + UpdatedAt: now.Add(1 * time.Second), + }, + }, + }, happyCredentials) + require.NoError(t, err) + + assert.Equal(t, 100.0, b.total) + assert.Equal(t, 100.0, b.free) + assert.Equal(t, now.Add(1*time.Second), b.updatedAt) +} + +func TestTrackNewAccounts(t *testing.T) { + t.Parallel() + s := &Service{ + exchangeAccounts: make(map[string]*Accounts), + mux: dispatch.GetNewMux(nil), + } + + s.mu.Lock() + _, err := s.initAccounts("binance") + s.mu.Unlock() + require.NoError(t, err) + + s.mu.Lock() + _, err = s.initAccounts("binance") + s.mu.Unlock() + assert.ErrorIs(t, err, errExchangeAlreadyExists) +} diff --git a/exchanges/account/account_types.go b/exchanges/account/account_types.go index e4f46874..421a5bf5 100644 --- a/exchanges/account/account_types.go +++ b/exchanges/account/account_types.go @@ -67,11 +67,9 @@ type Balance struct { // Change defines incoming balance change on currency holdings type Change struct { - Exchange string - Currency currency.Code - Asset asset.Item - Amount float64 - Account string + Account string + AssetType asset.Item + Balance *Balance } // ProtectedBalance stores the full balance information for that specific asset diff --git a/exchanges/bybit/bybit_websocket.go b/exchanges/bybit/bybit_websocket.go index 85bd6755..05158d2b 100644 --- a/exchanges/bybit/bybit_websocket.go +++ b/exchanges/bybit/bybit_websocket.go @@ -345,19 +345,26 @@ func (by *Bybit) wsProcessWalletPushData(assetType asset.Item, resp []byte) erro if err != nil { return err } - accounts := []account.Change{} + creds, err := by.GetCredentials(context.TODO()) + if err != nil { + return err + } + var changes []account.Change for x := range result.Data { for y := range result.Data[x].Coin { - accounts = append(accounts, account.Change{ - Exchange: by.Name, - Currency: currency.NewCode(result.Data[x].Coin[y].Coin), - Asset: assetType, - Amount: result.Data[x].Coin[y].WalletBalance.Float64(), + changes = append(changes, account.Change{ + AssetType: assetType, + Balance: &account.Balance{ + Currency: currency.NewCode(result.Data[x].Coin[y].Coin), + Total: result.Data[x].Coin[y].WalletBalance.Float64(), + Free: result.Data[x].Coin[y].WalletBalance.Float64(), + UpdatedAt: result.CreationTime.Time(), + }, }) } } - by.Websocket.DataHandler <- accounts - return nil + by.Websocket.DataHandler <- changes + return account.ProcessChange(by.Name, changes, creds) } // wsProcessOrder the order stream to see changes to your orders in real-time. diff --git a/exchanges/gateio/gateio_test.go b/exchanges/gateio/gateio_test.go index 9b96530b..6b3cc7fa 100644 --- a/exchanges/gateio/gateio_test.go +++ b/exchanges/gateio/gateio_test.go @@ -8,6 +8,7 @@ import ( "log" "os" "slices" + "strings" "sync" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" + "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/fundingrate" "github.com/thrasher-corp/gocryptotrader/exchanges/futures" @@ -2026,7 +2028,8 @@ const wsBalancesPushDataJSON = `{"time": 1605248616, "channel": "spot.balances", func TestBalancesPushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(t.Context(), []byte(wsBalancesPushDataJSON)); err != nil { + ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"}) + if err := g.WsHandleSpotData(ctx, []byte(wsBalancesPushDataJSON)); err != nil { t.Errorf("%s websocket balances push data error: %v", g.Name, err) } } @@ -2044,7 +2047,8 @@ const wsCrossMarginBalancePushDataJSON = `{"time": 1605248616,"channel": "spot.c func TestCrossMarginBalancePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleSpotData(t.Context(), []byte(wsCrossMarginBalancePushDataJSON)); err != nil { + ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"}) + if err := g.WsHandleSpotData(ctx, []byte(wsCrossMarginBalancePushDataJSON)); err != nil { t.Errorf("%s websocket cross margin balance push data error: %v", g.Name, err) } } @@ -2063,7 +2067,13 @@ func TestFuturesDataHandler(t *testing.T) { t.Parallel() g := new(Gateio) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes require.NoError(t, testexch.Setup(g), "Test instance Setup must not error") - testexch.FixtureToDataHandler(t, "testdata/wsFutures.json", func(m []byte) error { return g.WsHandleFuturesData(t.Context(), m, asset.CoinMarginedFutures) }) + testexch.FixtureToDataHandler(t, "testdata/wsFutures.json", func(m []byte) error { + ctx := t.Context() + if strings.Contains(string(m), "futures.balances") { + ctx = account.DeployCredentialsToContext(ctx, &account.Credentials{Key: "test", Secret: "test"}) + } + return g.WsHandleFuturesData(ctx, m, asset.CoinMarginedFutures) + }) close(g.Websocket.DataHandler) assert.Len(t, g.Websocket.DataHandler, 14, "Should see the correct number of messages") for resp := range g.Websocket.DataHandler { @@ -2227,7 +2237,8 @@ const optionsBalancePushDataJSON = `{ "channel": "options.balances", "event": "u func TestOptionsBalancePushData(t *testing.T) { t.Parallel() - if err := g.WsHandleOptionsData(t.Context(), []byte(optionsBalancePushDataJSON)); err != nil { + ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{Key: "test", Secret: "test"}) + if err := g.WsHandleOptionsData(ctx, []byte(optionsBalancePushDataJSON)); err != nil { t.Errorf("%s websocket options balance push data error: %v", g.Name, err) } } diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 55cdff82..43e6518f 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -163,7 +163,7 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st } // WsHandleSpotData handles spot data -func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error { +func (g *Gateio) WsHandleSpotData(ctx context.Context, respRaw []byte) error { push, err := parseWSHeader(respRaw) if err != nil { return err @@ -195,13 +195,13 @@ func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error { case spotUserTradesChannel: return g.processUserPersonalTrades(respRaw) case spotBalancesChannel: - return g.processSpotBalances(respRaw) + return g.processSpotBalances(ctx, respRaw) case marginBalancesChannel: - return g.processMarginBalances(respRaw) + return g.processMarginBalances(ctx, respRaw) case spotFundingBalanceChannel: return g.processFundingBalances(respRaw) case crossMarginBalanceChannel: - return g.processCrossMarginBalance(respRaw) + return g.processCrossMarginBalance(ctx, respRaw) case crossMarginLoanChannel: return g.processCrossMarginLoans(respRaw) case spotPongChannel: @@ -543,7 +543,7 @@ func (g *Gateio) processUserPersonalTrades(data []byte) error { return g.Websocket.Fills.Update(fills...) } -func (g *Gateio) processSpotBalances(data []byte) error { +func (g *Gateio) processSpotBalances(ctx context.Context, data []byte) error { resp := struct { Time int64 `json:"time"` Channel string `json:"channel"` @@ -554,21 +554,29 @@ func (g *Gateio) processSpotBalances(data []byte) error { if err != nil { return err } - accountChanges := make([]account.Change, len(resp.Result)) - for x := range resp.Result { - code := currency.NewCode(resp.Result[x].Currency) - accountChanges[x] = account.Change{ - Exchange: g.Name, - Currency: code, - Asset: asset.Spot, - Amount: resp.Result[x].Available.Float64(), + creds, err := g.GetCredentials(ctx) + if err != nil { + return err + } + changes := make([]account.Change, len(resp.Result)) + for i := range resp.Result { + changes[i] = account.Change{ + Account: resp.Result[i].User, + AssetType: asset.Spot, + Balance: &account.Balance{ + Currency: currency.NewCode(resp.Result[i].Currency), + Total: resp.Result[i].Total.Float64(), + Free: resp.Result[i].Available.Float64(), + Hold: resp.Result[i].Total.Float64() - resp.Result[i].Available.Float64(), + UpdatedAt: resp.Result[i].Timestamp.Time(), + }, } } - g.Websocket.DataHandler <- accountChanges - return nil + g.Websocket.DataHandler <- changes + return account.ProcessChange(g.Name, changes, creds) } -func (g *Gateio) processMarginBalances(data []byte) error { +func (g *Gateio) processMarginBalances(ctx context.Context, data []byte) error { resp := struct { Time int64 `json:"time"` Channel string `json:"channel"` @@ -579,18 +587,25 @@ func (g *Gateio) processMarginBalances(data []byte) error { if err != nil { return err } - accountChange := make([]account.Change, len(resp.Result)) + creds, err := g.GetCredentials(ctx) + if err != nil { + return err + } + changes := make([]account.Change, len(resp.Result)) for x := range resp.Result { - code := currency.NewCode(resp.Result[x].Currency) - accountChange[x] = account.Change{ - Exchange: g.Name, - Currency: code, - Asset: asset.Margin, - Amount: resp.Result[x].Available.Float64(), + changes[x] = account.Change{ + AssetType: asset.Margin, + Balance: &account.Balance{ + Currency: currency.NewCode(resp.Result[x].Currency), + Total: resp.Result[x].Available.Float64() + resp.Result[x].Freeze.Float64(), + Free: resp.Result[x].Available.Float64(), + Hold: resp.Result[x].Freeze.Float64(), + UpdatedAt: resp.Result[x].Timestamp.Time(), + }, } } - g.Websocket.DataHandler <- accountChange - return nil + g.Websocket.DataHandler <- changes + return account.ProcessChange(g.Name, changes, creds) } func (g *Gateio) processFundingBalances(data []byte) error { @@ -608,7 +623,7 @@ func (g *Gateio) processFundingBalances(data []byte) error { return nil } -func (g *Gateio) processCrossMarginBalance(data []byte) error { +func (g *Gateio) processCrossMarginBalance(ctx context.Context, data []byte) error { resp := struct { Time int64 `json:"time"` Channel string `json:"channel"` @@ -619,19 +634,25 @@ func (g *Gateio) processCrossMarginBalance(data []byte) error { if err != nil { return err } - accountChanges := make([]account.Change, len(resp.Result)) + creds, err := g.GetCredentials(ctx) + if err != nil { + return err + } + changes := make([]account.Change, len(resp.Result)) for x := range resp.Result { - code := currency.NewCode(resp.Result[x].Currency) - accountChanges[x] = account.Change{ - Exchange: g.Name, - Currency: code, - Asset: asset.Margin, - Amount: resp.Result[x].Available.Float64(), - Account: resp.Result[x].User, + changes[x] = account.Change{ + Account: resp.Result[x].User, + AssetType: asset.Margin, + Balance: &account.Balance{ + Currency: currency.NewCode(resp.Result[x].Currency), + Total: resp.Result[x].Total.Float64(), + Free: resp.Result[x].Available.Float64(), + UpdatedAt: resp.Result[x].Timestamp.Time(), + }, } } - g.Websocket.DataHandler <- accountChanges - return nil + g.Websocket.DataHandler <- changes + return account.ProcessChange(g.Name, changes, creds) } func (g *Gateio) processCrossMarginLoans(data []byte) error { diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 5f96808f..a76d1fa9 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -139,7 +139,7 @@ func (g *Gateio) FuturesUnsubscribe(ctx context.Context, conn websocket.Connecti } // WsHandleFuturesData handles futures websocket data -func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.Item) error { +func (g *Gateio) WsHandleFuturesData(ctx context.Context, respRaw []byte, a asset.Item) error { push, err := parseWSHeader(respRaw) if err != nil { return err @@ -182,7 +182,7 @@ func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset. case futuresAutoPositionCloseChannel: return g.processPositionCloseData(respRaw) case futuresBalancesChannel: - return g.processBalancePushData(respRaw, a) + return g.processBalancePushData(ctx, respRaw, a) case futuresReduceRiskLimitsChannel: return g.processFuturesReduceRiskLimitNotification(respRaw) case futuresPositionsChannel: @@ -652,7 +652,7 @@ func (g *Gateio) processPositionCloseData(data []byte) error { return nil } -func (g *Gateio) processBalancePushData(data []byte, assetType asset.Item) error { +func (g *Gateio) processBalancePushData(ctx context.Context, data []byte, assetType asset.Item) error { resp := struct { Time int64 `json:"time"` Channel string `json:"channel"` @@ -663,23 +663,29 @@ func (g *Gateio) processBalancePushData(data []byte, assetType asset.Item) error if err != nil { return err } - accountChange := make([]account.Change, len(resp.Result)) - for x := range resp.Result { - info := strings.Split(resp.Result[x].Text, currency.UnderscoreDelimiter) + creds, err := g.GetCredentials(ctx) + if err != nil { + return err + } + changes := make([]account.Change, len(resp.Result)) + for x, bal := range resp.Result { + info := strings.Split(bal.Text, currency.UnderscoreDelimiter) if len(info) != 2 { return errors.New("malformed text") } - code := currency.NewCode(info[0]) - accountChange[x] = account.Change{ - Exchange: g.Name, - Currency: code, - Asset: assetType, - Amount: resp.Result[x].Balance, - Account: resp.Result[x].User, + changes[x] = account.Change{ + AssetType: assetType, + Account: bal.User, + Balance: &account.Balance{ + Currency: currency.NewCode(info[0]), + Total: bal.Balance, + Free: bal.Balance, + UpdatedAt: bal.Time.Time(), + }, } } - g.Websocket.DataHandler <- accountChange - return nil + g.Websocket.DataHandler <- changes + return account.ProcessChange(g.Name, changes, creds) } func (g *Gateio) processFuturesReduceRiskLimitNotification(data []byte) error { diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 1e0cf62c..3d49a5e5 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -293,7 +293,7 @@ func (g *Gateio) OptionsUnsubscribe(ctx context.Context, conn websocket.Connecti } // WsHandleOptionsData handles options websocket data -func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error { +func (g *Gateio) WsHandleOptionsData(ctx context.Context, respRaw []byte) error { push, err := parseWSHeader(respRaw) if err != nil { return err @@ -339,7 +339,7 @@ func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error { case optionsPositionCloseChannel: return g.processPositionCloseData(respRaw) case optionsBalancesChannel: - return g.processBalancePushData(respRaw, asset.Options) + return g.processBalancePushData(ctx, respRaw, asset.Options) case optionsPositionsChannel: return g.processOptionsPositionPushData(respRaw) default: diff --git a/exchanges/kucoin/kucoin_test.go b/exchanges/kucoin/kucoin_test.go index 83b65165..942ec8b9 100644 --- a/exchanges/kucoin/kucoin_test.go +++ b/exchanges/kucoin/kucoin_test.go @@ -2302,6 +2302,9 @@ func TestGetAuthenticatedServersInstances(t *testing.T) { func TestPushData(t *testing.T) { t.Parallel() ku := testInstance(t) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + ku.SetCredentials("mock", "test", "test", "", "", "") + ku.API.AuthenticatedSupport = true + ku.API.AuthenticatedWebsocketSupport = true testexch.FixtureToDataHandler(t, "testdata/wsHandleData.json", ku.wsHandleData) } diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 43dfa962..abcc8941 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -358,13 +358,24 @@ func (ku *Kucoin) processFuturesAccountBalanceEvent(respData []byte) error { if err := json.Unmarshal(respData, &resp); err != nil { return err } - ku.Websocket.DataHandler <- account.Change{ - Exchange: ku.Name, - Currency: currency.NewCode(resp.Currency), - Asset: asset.Futures, - Amount: resp.AvailableBalance, + creds, err := ku.GetCredentials(context.TODO()) + if err != nil { + return err } - return nil + changes := []account.Change{ + { + AssetType: asset.Futures, + Balance: &account.Balance{ + Currency: currency.NewCode(resp.Currency), + Total: resp.AvailableBalance + resp.HoldBalance, + Hold: resp.HoldBalance, + Free: resp.AvailableBalance, + UpdatedAt: resp.Timestamp.Time(), + }, + }, + } + ku.Websocket.DataHandler <- changes + return account.ProcessChange(ku.Name, changes, creds) } // processFuturesStopOrderLifecycleEvent processes futures stop orders lifecycle events. @@ -681,13 +692,24 @@ func (ku *Kucoin) processAccountBalanceChange(respData []byte) error { if err != nil { return err } - ku.Websocket.DataHandler <- account.Change{ - Exchange: ku.Name, - Currency: currency.NewCode(response.Currency), - Asset: asset.Futures, - Amount: response.Available, + creds, err := ku.GetCredentials(context.TODO()) + if err != nil { + return err } - return nil + changes := []account.Change{ + { + AssetType: asset.Futures, + Balance: &account.Balance{ + Currency: currency.NewCode(response.Currency), + Total: response.Total, + Hold: response.Hold, + Free: response.Available, + UpdatedAt: response.Time.Time(), + }, + }, + } + ku.Websocket.DataHandler <- changes + return account.ProcessChange(ku.Name, changes, creds) } // processOrderChangeEvent processes order update events. diff --git a/exchanges/okx/okx_test.go b/exchanges/okx/okx_test.go index 07e12bbe..20fcaab2 100644 --- a/exchanges/okx/okx_test.go +++ b/exchanges/okx/okx_test.go @@ -4024,7 +4024,18 @@ var pushDataMap = map[string]string{ func TestPushData(t *testing.T) { t.Parallel() var err error + ok := new(Okx) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(ok), "Setup must not error") + for x := range pushDataMap { + if x == "Balance And Position" { + ok.API.AuthenticatedSupport = true + ok.API.AuthenticatedWebsocketSupport = true + ok.SetCredentials("test", "test", "test", "", "", "") + } else { + ok.API.AuthenticatedSupport = false + ok.API.AuthenticatedWebsocketSupport = false + } err = ok.WsHandleData([]byte(pushDataMap[x])) require.NoErrorf(t, err, "Okx %s error %v", x, err) } diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go index e4a651c8..ce4c3eca 100644 --- a/exchanges/okx/okx_websocket.go +++ b/exchanges/okx/okx_websocket.go @@ -18,6 +18,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" + "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" @@ -543,8 +544,7 @@ func (ok *Okx) WsHandleData(respRaw []byte) error { var response WsPositionResponse return ok.wsProcessPushData(respRaw, &response) case channelBalanceAndPosition: - var response WsBalanceAndPosition - return ok.wsProcessPushData(respRaw, &response) + return ok.wsProcessBalanceAndPosition(respRaw) case channelOrders: return ok.wsProcessOrders(respRaw) case channelAlgoOrders: @@ -1518,6 +1518,35 @@ func (ok *Okx) wsProcessBlockPublicTrades(data []byte) error { return trade.AddTradesToBuffer(trades...) } +func (ok *Okx) wsProcessBalanceAndPosition(data []byte) error { + var resp WsBalanceAndPosition + if err := json.Unmarshal(data, &resp); err != nil { + return err + } + creds, err := ok.GetCredentials(context.TODO()) + if err != nil { + return err + } + var changes []account.Change + for i := range resp.Data { + for j := range resp.Data[i].BalanceData { + changes = append(changes, account.Change{ + AssetType: asset.Spot, + Account: resp.Argument.UID, + Balance: &account.Balance{ + Currency: currency.NewCode(resp.Data[i].BalanceData[j].Currency), + Total: resp.Data[i].BalanceData[j].CashBalance.Float64(), + Free: resp.Data[i].BalanceData[j].CashBalance.Float64(), + UpdatedAt: resp.Data[i].BalanceData[j].UpdateTime.Time(), + }, + }) + } + // TODO: Handle position data + } + ok.Websocket.DataHandler <- changes + return account.ProcessChange(ok.Name, changes, creds) +} + // wsProcessPushData processes push data coming through the websocket channel func (ok *Okx) wsProcessPushData(data []byte, resp any) error { if err := json.Unmarshal(data, resp); err != nil { diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index dd3b9e51..7efcbf44 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -948,11 +948,13 @@ func (p *Poloniex) processAccountBalanceUpdate(notification []any) error { // NOTES: This will affect free amount, a rest call might be needed to get // locked and total amounts periodically. p.Websocket.DataHandler <- account.Change{ - Exchange: p.Name, - Currency: code, - Asset: asset.Spot, - Account: deriveWalletType(walletType), - Amount: amount, + Account: deriveWalletType(walletType), + AssetType: asset.Spot, + Balance: &account.Balance{ + Currency: code, + Total: amount, + Free: amount, + }, } return nil }