exchanges/account: add UpdatedAt field to Balance and ProtectedBalance (#1827)

* add updatedAt for account balance

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* add nil pointer test for load

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* account: set default updatedAt to balance for Update

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* engine: add UpdatedAt for account info

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* account: force updatedAt for load balance

Signed-off-by: Ye Sijun <junnplus@gmail.com>

* minor change for test

Signed-off-by: Ye Sijun <junnplus@gmail.com>

---------

Signed-off-by: Ye Sijun <junnplus@gmail.com>
This commit is contained in:
Jun
2025-03-13 11:57:14 +09:00
committed by GitHub
parent 138419e7a8
commit 4474278053
9 changed files with 6890 additions and 8648 deletions

View File

@@ -183,7 +183,8 @@ func (s *RPCServer) StartRPCRESTProxy() {
}
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithTransportCredentials(creds),
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(auth.BasicAuth{
Username: s.Config.RemoteControl.Username,
Password: s.Config.RemoteControl.Password,
@@ -270,8 +271,10 @@ func (s *RPCServer) EnableSubsystem(_ context.Context, r *gctrpc.GenericSubsyste
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s enabled", r.Subsystem)}, nil
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s enabled", r.Subsystem),
}, nil
}
// DisableSubsystem disables a engine subsystem
@@ -280,8 +283,10 @@ func (s *RPCServer) DisableSubsystem(_ context.Context, r *gctrpc.GenericSubsyst
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s disabled", r.Subsystem)}, nil
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("subsystem %s disabled", r.Subsystem),
}, nil
}
// GetRPCEndpoints returns a list of API endpoints
@@ -622,6 +627,7 @@ func createAccountInfoRequest(h account.Holdings) (*gctrpc.GetAccountInfoRespons
Free: y.Free,
FreeWithoutBorrow: y.AvailableWithoutBorrow,
Borrowed: y.Borrowed,
UpdatedAt: timestamppb.New(y.UpdatedAt),
})
}
accounts[x] = &a
@@ -660,6 +666,7 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream
Currency: initAcc.Accounts[x].Currencies[y].Currency.String(),
TotalValue: initAcc.Accounts[x].Currencies[y].Total,
Hold: initAcc.Accounts[x].Currencies[y].Hold,
UpdatedAt: timestamppb.New(initAcc.Accounts[x].Currencies[y].UpdatedAt),
}
}
accounts[x] = &gctrpc.Account{
@@ -707,6 +714,7 @@ func (s *RPCServer) GetAccountInfoStream(r *gctrpc.GetAccountInfoRequest, stream
Currency: holdings.Accounts[x].Currencies[y].Currency.String(),
TotalValue: holdings.Accounts[x].Currencies[y].Total,
Hold: holdings.Accounts[x].Currencies[y].Hold,
UpdatedAt: timestamppb.New(holdings.Accounts[x].Currencies[y].UpdatedAt),
}
}
accounts[x] = &gctrpc.Account{
@@ -1268,7 +1276,7 @@ func (s *RPCServer) SimulateOrder(_ context.Context, r *gctrpc.SimulateOrderRequ
return nil, err
}
var buy = true
buy := true
if !strings.EqualFold(r.Side, order.Buy.String()) &&
!strings.EqualFold(r.Side, order.Bid.String()) {
buy = false
@@ -1324,7 +1332,7 @@ func (s *RPCServer) WhaleBomb(_ context.Context, r *gctrpc.WhaleBombRequest) (*g
return nil, err
}
var buy = true
buy := true
if !strings.EqualFold(r.Side, order.Buy.String()) &&
!strings.EqualFold(r.Side, order.Bid.String()) {
buy = false
@@ -1393,8 +1401,10 @@ func (s *RPCServer) CancelOrder(ctx context.Context, r *gctrpc.CancelOrderReques
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("order %s cancelled", r.OrderId)}, nil
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("order %s cancelled", r.OrderId),
}, nil
}
// CancelBatchOrders cancels an orders specified by exchange, currency pair and asset type
@@ -1550,8 +1560,10 @@ func (s *RPCServer) RemoveEvent(_ context.Context, r *gctrpc.RemoveEventRequest)
if !s.eventManager.Remove(r.Id) {
return nil, fmt.Errorf("event %d not removed", r.Id)
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
Data: fmt.Sprintf("event %d removed", r.Id)}, nil
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("event %d removed", r.Id),
}, nil
}
// GetCryptocurrencyDepositAddresses returns a list of cryptocurrency deposit
@@ -2100,14 +2112,16 @@ func (s *RPCServer) GetOrderbookStream(r *gctrpc.GetOrderbookStreamRequest, stre
resp.Bids[i] = &gctrpc.OrderbookItem{
Amount: base.Bids[i].Amount,
Price: base.Bids[i].Price,
Id: base.Bids[i].ID}
Id: base.Bids[i].ID,
}
}
resp.Asks = make([]*gctrpc.OrderbookItem, len(base.Asks))
for i := range base.Asks {
resp.Asks[i] = &gctrpc.OrderbookItem{
Amount: base.Asks[i].Amount,
Price: base.Asks[i].Price,
Id: base.Asks[i].ID}
Id: base.Asks[i].ID,
}
}
}
@@ -2169,14 +2183,16 @@ func (s *RPCServer) GetExchangeOrderbookStream(r *gctrpc.GetExchangeOrderbookStr
resp.Bids[i] = &gctrpc.OrderbookItem{
Amount: ob.Bids[i].Amount,
Price: ob.Bids[i].Price,
Id: ob.Bids[i].ID}
Id: ob.Bids[i].ID,
}
}
resp.Asks = make([]*gctrpc.OrderbookItem, len(ob.Asks))
for i := range ob.Asks {
resp.Asks[i] = &gctrpc.OrderbookItem{
Amount: ob.Asks[i].Amount,
Price: ob.Asks[i].Price,
Id: ob.Asks[i].ID}
Id: ob.Asks[i].ID,
}
}
}
@@ -2242,7 +2258,8 @@ func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gct
Pair: &gctrpc.CurrencyPair{
Base: t.Pair.Base.String(),
Quote: t.Pair.Quote.String(),
Delimiter: t.Pair.Delimiter},
Delimiter: t.Pair.Delimiter,
},
LastUpdated: s.unixTimestamp(t.LastUpdated),
Last: t.Last,
High: t.High,
@@ -2295,7 +2312,8 @@ func (s *RPCServer) GetExchangeTickerStream(r *gctrpc.GetExchangeTickerStreamReq
Pair: &gctrpc.CurrencyPair{
Base: t.Pair.Base.String(),
Quote: t.Pair.Quote.String(),
Delimiter: t.Pair.Delimiter},
Delimiter: t.Pair.Delimiter,
},
LastUpdated: s.unixTimestamp(t.LastUpdated),
Last: t.Last,
High: t.High,
@@ -2646,7 +2664,7 @@ func (s *RPCServer) GCTScriptUpload(_ context.Context, r *gctrpc.GCTScriptUpload
}
fPath := filepath.Join(gctscript.ScriptPath, r.ScriptName)
var fPathExits = fPath
fPathExits := fPath
if filepath.Ext(fPath) == ".zip" {
fPathExits = fPathExits[0 : len(fPathExits)-4]
}
@@ -3072,10 +3090,12 @@ func (s *RPCServer) WebsocketSetProxy(_ context.Context, r *gctrpc.WebsocketSetP
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("new proxy has been set [%s] for %s websocket connection",
r.Exchange,
r.Proxy)}, nil
r.Proxy),
}, nil
}
// WebsocketSetURL sets exchange websocket client connection URL
@@ -3094,10 +3114,12 @@ func (s *RPCServer) WebsocketSetURL(_ context.Context, r *gctrpc.WebsocketSetURL
if err != nil {
return nil, err
}
return &gctrpc.GenericResponse{Status: MsgStatusSuccess,
return &gctrpc.GenericResponse{
Status: MsgStatusSuccess,
Data: fmt.Sprintf("new URL has been set [%s] for %s websocket connection",
r.Exchange,
r.Url)}, nil
r.Url),
}, nil
}
// GetSavedTrades returns trades from the database
@@ -4864,7 +4886,7 @@ func (s *RPCServer) GetCollateral(ctx context.Context, r *gctrpc.GetCollateralRe
return nil, err
}
var collateralDisplayCurrency = " " + c.CollateralCurrency.String()
collateralDisplayCurrency := " " + c.CollateralCurrency.String()
result := &gctrpc.GetCollateralResponse{
SubAccount: creds.SubAccount,
CollateralCurrency: c.CollateralCurrency.String(),
@@ -4926,7 +4948,7 @@ func (s *RPCServer) GetCollateral(ctx context.Context, r *gctrpc.GetCollateralRe
if c.BreakdownByCurrency[i].TotalFunds.IsZero() && !r.IncludeZeroValues {
continue
}
var originalDisplayCurrency = " " + c.BreakdownByCurrency[i].Currency.String()
originalDisplayCurrency := " " + c.BreakdownByCurrency[i].Currency.String()
cb := &gctrpc.CollateralForCurrency{
Currency: c.BreakdownByCurrency[i].Currency.String(),
ExcludedFromCollateral: c.BreakdownByCurrency[i].SkipContribution,