trade: remove exchangeName for AddTradesToBuffer (#1820)

Signed-off-by: Ye Sijun <junnplus@gmail.com>
This commit is contained in:
Jun
2025-03-06 10:50:03 +09:00
committed by GitHub
parent 9fcaa9130b
commit 7a6d6cc002
23 changed files with 69 additions and 72 deletions

View File

@@ -810,7 +810,7 @@ func (b *Binance) GetRecentTrades(ctx context.Context, p currency.Pair, a asset.
}
if b.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(b.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}

View File

@@ -326,7 +326,8 @@ func (bi *Binanceus) UpdateOrderbook(ctx context.Context, pair currency.Pair, as
orderbookNew, err := bi.GetOrderBookDepth(ctx, &OrderBookDataRequestParams{
Symbol: pair,
Limit: 1000})
Limit: 1000,
})
if err != nil {
return book, err
}
@@ -450,7 +451,7 @@ func (bi *Binanceus) GetRecentTrades(ctx context.Context, p currency.Pair, asset
}
if bi.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(bi.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}

View File

@@ -34,9 +34,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)
var (
errParsingWSField = errors.New("error parsing WS field")
)
var errParsingWSField = errors.New("error parsing WS field")
const (
authenticatedBitfinexWebsocketEndpoint = "wss://api.bitfinex.com/ws/2"
@@ -112,8 +110,10 @@ type checksum struct {
}
// checksumStore quick global for now
var checksumStore = make(map[int]*checksum)
var cMtx sync.Mutex
var (
checksumStore = make(map[int]*checksum)
cMtx sync.Mutex
)
var subscriptionNames = map[string]string{
subscription.TickerChannel: wsTickerChannel,
@@ -724,12 +724,14 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
ID: int64(id),
Period: int64(pricePeriod),
Price: rateAmount,
Amount: amount})
Amount: amount,
})
} else {
newOrderbook = append(newOrderbook, WebsocketBook{
ID: int64(id),
Price: pricePeriod,
Amount: rateAmount})
Amount: rateAmount,
})
}
}
if err := b.WsInsertSnapshot(c.Pairs[0], c.Asset, newOrderbook, fundingRate); err != nil {
@@ -756,12 +758,14 @@ func (b *Bitfinex) handleWSBookUpdate(c *subscription.Subscription, d []interfac
ID: int64(id),
Period: int64(pricePeriod),
Price: amountRate,
Amount: amount})
Amount: amount,
})
} else {
newOrderbook = append(newOrderbook, WebsocketBook{
ID: int64(id),
Price: pricePeriod,
Amount: amountRate})
Amount: amountRate,
})
}
if err := b.WsUpdateOrderbook(c, c.Pairs[0], c.Asset, newOrderbook, int64(sequenceNo), fundingRate); err != nil {
@@ -986,7 +990,7 @@ func (b *Bitfinex) handleWSAllTrades(s *subscription.Subscription, respRaw []byt
}
}
if b.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(b.GetName(), trades...)
err = trade.AddTradesToBuffer(trades...)
}
return err
}

View File

@@ -156,7 +156,7 @@ func (b *Bitstamp) handleWSTrade(msg []byte) error {
if wsTradeTemp.Data.Type == 1 {
side = order.Sell
}
return trade.AddTradesToBuffer(b.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0),
CurrencyPair: p,
AssetType: asset.Spot,

View File

@@ -204,7 +204,7 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error {
side = order.Sell
}
return trade.AddTradesToBuffer(b.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: t.Timestamp,
CurrencyPair: p,
AssetType: asset.Spot,
@@ -254,7 +254,7 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error {
originalAmount := orderData.OpenVolume
var price float64
var trades []order.TradeHistory
var orderID = strconv.FormatInt(orderData.OrderID, 10)
orderID := strconv.FormatInt(orderData.OrderID, 10)
for x := range orderData.Trades {
var isMaker bool
if orderData.Trades[x].LiquidityType == "Maker" {

View File

@@ -280,7 +280,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(tradeHistory.Data[x].ID, 10),
})
}
return trade.AddTradesToBuffer(b.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case strings.Contains(topic, "orderBookL2Api"): // TODO: Fix orderbook updates.
var t wsOrderBook
err = json.Unmarshal(respRaw, &t)

View File

@@ -682,7 +682,7 @@ func (by *Bybit) wsProcessPublicTrade(assetType asset.Item, resp *WebsocketRespo
TID: result[x].TradeID,
}
}
return trade.AddTradesToBuffer(by.Name, tradeDatas...)
return trade.AddTradesToBuffer(tradeDatas...)
}
func (by *Bybit) wsProcessOrderbook(assetType asset.Item, resp *WebsocketResponse) error {

View File

@@ -675,7 +675,7 @@ func (by *Bybit) GetRecentTrades(ctx context.Context, p currency.Pair, assetType
}
if by.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(by.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}

View File

@@ -241,7 +241,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
if !c.IsSaveTradeDataEnabled() {
return nil
}
return trade.AddTradesToBuffer(c.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: wsOrder.Time,
Exchange: c.Name,
CurrencyPair: p,

View File

@@ -31,9 +31,7 @@ const (
coinutWebsocketRateLimit = 30
)
var (
channels map[string]chan []byte
)
var channels map[string]chan []byte
// NOTE for speed considerations
// wss://wsapi-as.coinut.com
@@ -310,7 +308,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error {
TID: strconv.FormatInt(tradeSnap.Trades[i].TransID, 10),
})
}
return trade.AddTradesToBuffer(c.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case "inst_trade_update":
if !c.IsSaveTradeDataEnabled() {
return nil
@@ -341,7 +339,7 @@ func (c *COINUT) wsHandleData(_ context.Context, respRaw []byte) error {
}
}
return trade.AddTradesToBuffer(c.Name, trade.Data{
return trade.AddTradesToBuffer(trade.Data{
Timestamp: time.Unix(0, tradeUpdate.Timestamp*1000),
CurrencyPair: p,
AssetType: asset.Spot,
@@ -389,7 +387,7 @@ func (c *COINUT) parseOrderContainer(oContainer *wsOrderContainer) (*order.Detai
var oSide order.Side
var oStatus order.Status
var err error
var orderID = strconv.FormatInt(oContainer.OrderID, 10)
orderID := strconv.FormatInt(oContainer.OrderID, 10)
if oContainer.Side != "" {
oSide, err = order.StringToOrderSide(oContainer.Side)
if err != nil {
@@ -582,7 +580,7 @@ func (c *COINUT) WsProcessOrderbookUpdate(update *WsOrderbookUpdate) error {
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *COINUT) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{"inst_tick", "inst_order_book", "inst_trade"}
channels := []string{"inst_tick", "inst_order_book", "inst_trade"}
var subscriptions subscription.List
enabledPairs, err := c.GetEnabledPairs(asset.Spot)
if err != nil {

View File

@@ -408,7 +408,7 @@ func (d *Deribit) processUserOrderChanges(respRaw []byte, channels []string) err
AssetType: a,
}
}
err = trade.AddTradesToBuffer(d.Name, td...)
err = trade.AddTradesToBuffer(td...)
if err != nil {
return err
}
@@ -513,7 +513,7 @@ func (d *Deribit) processTrades(respRaw []byte, channels []string) error {
AssetType: a,
}
}
return trade.AddTradesToBuffer(d.Name, tradeDatas...)
return trade.AddTradesToBuffer(tradeDatas...)
}
func (d *Deribit) processIncrementalTicker(respRaw []byte, channels []string) error {

View File

@@ -1193,7 +1193,7 @@ func (b *Base) AddTradesToBuffer(trades ...trade.Data) error {
if !b.IsSaveTradeDataEnabled() {
return nil
}
return trade.AddTradesToBuffer(b.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// IsSaveTradeDataEnabled checks the state of
@@ -1322,7 +1322,7 @@ func (e *Endpoints) GetURL(key URL) (string, error) {
// GetURLMap gets all urls for either running or default map based on the bool value supplied
func (e *Endpoints) GetURLMap() map[string]string {
e.mu.RLock()
var urlMap = make(map[string]string)
urlMap := make(map[string]string)
for k, v := range e.defaults {
urlMap[k] = v
}

View File

@@ -346,7 +346,7 @@ func (g *Gemini) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(result.EventID, 10),
}
return trade.AddTradesToBuffer(g.Name, tradeEvent)
return trade.AddTradesToBuffer(tradeEvent)
case "subscription_ack":
var result WsSubscriptionAcknowledgementResponse
err := json.Unmarshal(respRaw, &result)
@@ -563,7 +563,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {
}
}
return trade.AddTradesToBuffer(g.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
func channelName(s *subscription.Subscription) string {

View File

@@ -250,7 +250,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error {
TID: strconv.FormatInt(tradeSnapshot.Params.Data[i].ID, 10),
})
}
return trade.AddTradesToBuffer(h.Name, trades...)
return trade.AddTradesToBuffer(trades...)
case "activeOrders":
var o wsActiveOrdersResponse
err := json.Unmarshal(respRaw, &o)
@@ -292,10 +292,7 @@ func (h *HitBTC) wsHandleData(respRaw []byte) error {
return err
}
}
case
"replaced",
"canceled",
"new":
case "replaced", "canceled", "new":
var o wsOrderResponse
err := json.Unmarshal(respRaw, &o)
if err != nil {

View File

@@ -118,6 +118,7 @@ func (h *HUOBI) wsReadMsgs(s stream.Connection) {
}
}
}
func (h *HUOBI) wsHandleData(respRaw []byte) error {
if id, err := jsonparser.GetString(respRaw, "id"); err == nil {
if h.Websocket.Match.IncomingWithData(id, respRaw) {
@@ -255,7 +256,7 @@ func (h *HUOBI) wsHandleAllTradesMsg(s *subscription.Subscription, respRaw []byt
TID: strconv.FormatFloat(t.Tick.Data[i].TradeID, 'f', -1, 64),
})
}
return trade.AddTradesToBuffer(h.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
func (h *HUOBI) wsHandleTickerMsg(s *subscription.Subscription, respRaw []byte) error {

View File

@@ -568,7 +568,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
if err != nil {
return err
}
var tSide = order.Buy
tSide := order.Buy
s, ok := t[3].(string)
if !ok {
return common.GetTypeAssertError("string", t[3], "trade.side")
@@ -593,7 +593,7 @@ func (k *Kraken) wsProcessTrades(response []any, pair currency.Pair) error {
}
}
if saveTradeData {
return trade.AddTradesToBuffer(k.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
return nil
}
@@ -1094,7 +1094,6 @@ func (k *Kraken) manageSubs(op string, subs subscription.List) error {
// Ignore an overall timeout, because we'll track individual subscriptions in handleSubResps
err = common.ExcludeError(err, stream.ErrSignatureTimeout)
if err != nil {
return fmt.Errorf("%w; Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}

View File

@@ -458,7 +458,8 @@ func (ku *Kucoin) UpdateAccountInfo(ctx context.Context, assetType asset.Item) (
Total: accountH[x].Balance.Float64(),
Hold: accountH[x].Holds.Float64(),
Free: accountH[x].Available.Float64(),
}},
},
},
})
}
default:
@@ -588,7 +589,7 @@ func (ku *Kucoin) GetRecentTrades(ctx context.Context, p currency.Pair, assetTyp
return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, assetType)
}
if ku.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(ku.Name, resp...)
err := trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}

View File

@@ -580,14 +580,12 @@ func (ok *Okx) WsHandleData(respRaw []byte) error {
case channelOpenInterest:
var response WSOpenInterestResponse
return ok.wsProcessPushData(respRaw, &response)
case channelTrades,
channelAllTrades:
case channelTrades, channelAllTrades:
return ok.wsProcessTrades(respRaw)
case channelEstimatedPrice:
var response WsDeliveryEstimatedPrice
return ok.wsProcessPushData(respRaw, &response)
case channelMarkPrice,
channelPriceLimit:
case channelMarkPrice, channelPriceLimit:
var response WsMarkPrice
return ok.wsProcessPushData(respRaw, &response)
case channelOrderBooks5:
@@ -695,7 +693,7 @@ func (ok *Okx) wsProcessSpreadTrades(respRaw []byte) error {
Price: resp.Data[x].FillPrice.Float64(),
}
}
return trade.AddTradesToBuffer(ok.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// wsProcessSpreadOrders retrieve order information from the sprd-order Websocket channel.
@@ -874,7 +872,7 @@ func (ok *Okx) wsProcessPublicSpreadTrades(respRaw []byte) error {
Timestamp: data[x].Timestamp.Time(),
}
}
return trade.AddTradesToBuffer(ok.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// wsProcessSpreadOrderbook process spread orderbook data.
@@ -900,7 +898,8 @@ func (ok *Okx) wsProcessSpreadOrderbook(respRaw []byte) error {
LastUpdated: resp.Data[x].Timestamp.Time(),
Pair: pair,
Exchange: ok.Name,
VerifyOrderbook: ok.CanVerifyOrderbook})
VerifyOrderbook: ok.CanVerifyOrderbook,
})
if err != nil {
return err
}
@@ -949,7 +948,8 @@ func (ok *Okx) wsProcessOrderbook5(data []byte) error {
LastUpdated: resp.Data[0].Timestamp.Time(),
Pair: pair,
Exchange: ok.Name,
VerifyOrderbook: ok.CanVerifyOrderbook})
VerifyOrderbook: ok.CanVerifyOrderbook,
})
if err != nil {
return err
}
@@ -986,7 +986,7 @@ func (ok *Okx) wsProcessOptionTrades(data []byte) error {
Price: resp.Data[i].Price.Float64(),
}
}
return trade.AddTradesToBuffer(ok.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// wsProcessOrderBooks processes "snapshot" and "update" order book
@@ -1251,7 +1251,7 @@ func (ok *Okx) wsProcessTrades(data []byte) error {
})
}
}
return trade.AddTradesToBuffer(ok.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// wsProcessOrders handles websocket order push data responses.
@@ -1500,7 +1500,7 @@ func (ok *Okx) wsProcessBlockPublicTrades(data []byte) error {
Price: resp.Data[i].Price.Float64(),
}
}
return trade.AddTradesToBuffer(ok.Name, trades...)
return trade.AddTradesToBuffer(trades...)
}
// wsProcessPushData processes push data coming through the websocket channel

View File

@@ -791,7 +791,7 @@ func (ok *Okx) GetRecentTrades(ctx context.Context, p currency.Pair, assetType a
return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, assetType)
}
if ok.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(ok.Name, resp...)
err = trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
@@ -847,7 +847,7 @@ allTrades:
tradeIDEnd = trades[len(trades)-1].TradeID
}
if ok.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(ok.Name, resp...)
err = trade.AddTradesToBuffer(resp...)
if err != nil {
return nil, err
}
@@ -924,7 +924,7 @@ func (ok *Okx) SubmitOrder(ctx context.Context, s *order.Submit) (*order.SubmitR
var result *AlgoOrder
switch orderTypeString {
case orderLimit, orderMarket, orderPostOnly, orderFOK, orderIOC, orderOptimalLimitIOC, "mmp", "mmp_and_post_only":
var orderRequest = &PlaceOrderRequestParam{
orderRequest := &PlaceOrderRequestParam{
InstrumentID: pairString,
TradeMode: tradeMode,
Side: sideType,
@@ -2478,13 +2478,11 @@ func (ok *Okx) GetFuturesPositionSummary(ctx context.Context, req *futures.Posit
if len(acc) != 1 {
return nil, fmt.Errorf("%w, received '%v'", errOnlyOneResponseExpected, len(acc))
}
var (
freeCollateral, totalCollateral, equityOfCurrency, frozenBalance,
var freeCollateral, totalCollateral, equityOfCurrency, frozenBalance,
availableEquity, cashBalance, discountEquity,
equityUSD, totalEquity, isolatedEquity, isolatedLiabilities,
isolatedUnrealisedProfit, notionalLeverage,
strategyEquity decimal.Decimal
)
for i := range acc[0].Details {
if !acc[0].Details[i].Currency.Equal(positionSummary.Currency) {

View File

@@ -200,7 +200,7 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
return err
}
w.Trade.Setup(w.exchangeName, s.TradeFeed, w.DataHandler)
w.Trade.Setup(s.TradeFeed, w.DataHandler)
w.Fills.Setup(s.FillsFeed, w.DataHandler)
if s.MaxWebsocketSubscriptionsPerConnection < 0 {

View File

@@ -29,8 +29,7 @@ func (p *Processor) setup(wg *sync.WaitGroup) {
// Setup configures necessary fields to the `Trade` structure that govern trade data
// processing.
func (t *Trade) Setup(exchangeName string, tradeFeedEnabled bool, c chan interface{}) {
t.exchangeName = exchangeName
func (t *Trade) Setup(tradeFeedEnabled bool, c chan interface{}) {
t.dataHandler = c
t.tradeFeedEnabled = tradeFeedEnabled
}
@@ -48,7 +47,7 @@ func (t *Trade) Update(save bool, data ...Data) error {
}
if save {
if err := AddTradesToBuffer(t.exchangeName, data...); err != nil {
if err := AddTradesToBuffer(data...); err != nil {
return err
}
}
@@ -57,7 +56,7 @@ func (t *Trade) Update(save bool, data ...Data) error {
}
// AddTradesToBuffer will push trade data onto the buffer
func AddTradesToBuffer(exchangeName string, data ...Data) error {
func AddTradesToBuffer(data ...Data) error {
cfg := database.DB.GetConfig()
if database.DB == nil || cfg == nil || !cfg.Enabled {
return nil
@@ -79,7 +78,7 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
data[i].CurrencyPair.IsEmpty() ||
data[i].Exchange == "" ||
data[i].Timestamp.IsZero() {
errs = common.AppendError(errs, fmt.Errorf("%v received invalid trade data: %+v", exchangeName, data[i]))
errs = common.AppendError(errs, fmt.Errorf("%v received invalid trade data: %+v", data[i].Exchange, data[i]))
continue
}
@@ -99,7 +98,7 @@ func AddTradesToBuffer(exchangeName string, data ...Data) error {
}
uu, err := uuid.NewV4()
if err != nil {
errs = common.AppendError(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", exchangeName, data[i]))
errs = common.AppendError(errs, fmt.Errorf("%s uuid failed to generate for trade: %+v", data[i].Exchange, data[i]))
}
data[i].ID = uu
validDatas = append(validDatas, data[i])

View File

@@ -38,7 +38,7 @@ func TestAddTradesToBuffer(t *testing.T) {
t.Error(err)
}
cp, _ := currency.NewPairFromString("BTC-USD")
err = AddTradesToBuffer("test!", []Data{
err = AddTradesToBuffer([]Data{
{
Timestamp: time.Now(),
Exchange: "test!",
@@ -56,7 +56,7 @@ func TestAddTradesToBuffer(t *testing.T) {
t.Error("expected the processor to have started")
}
err = AddTradesToBuffer("test!", []Data{
err = AddTradesToBuffer([]Data{
{
Timestamp: time.Now(),
Exchange: "test!",
@@ -74,7 +74,7 @@ func TestAddTradesToBuffer(t *testing.T) {
processor.buffer = nil
processor.mutex.Unlock()
err = AddTradesToBuffer("test!", []Data{
err = AddTradesToBuffer([]Data{
{
Timestamp: time.Now(),
Exchange: "test!",

View File

@@ -27,7 +27,6 @@ var (
// Trade used to hold data and methods related to trade dissemination and
// storage
type Trade struct {
exchangeName string
dataHandler chan interface{}
tradeFeedEnabled bool
}