diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 629ed3a9..df8b2458 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -1,10 +1,12 @@ package binance import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "os" "reflect" "sync" "testing" @@ -1955,6 +1957,30 @@ func TestGetDepositAddress(t *testing.T) { } } +func BenchmarkWsHandleData(bb *testing.B) { + bb.ReportAllocs() + ap, err := b.CurrencyPairs.GetPairs(asset.Spot, false) + require.NoError(bb, err) + err = b.CurrencyPairs.StorePairs(asset.Spot, ap, true) + require.NoError(bb, err) + + data, err := os.ReadFile("testdata/wsHandleData.json") + require.NoError(bb, err) + lines := bytes.Split(data, []byte("\n")) + require.Len(bb, lines, 8) + go func() { + for { + <-b.Websocket.DataHandler + } + }() + bb.ResetTimer() + for i := 0; i < bb.N; i++ { + for x := range lines { + assert.NoError(bb, b.wsHandleData(lines[x])) + } + } +} + func TestSubscribe(t *testing.T) { t.Parallel() b := b @@ -2012,11 +2038,11 @@ func TestWsKlineUpdate(t *testing.T) { pressXToJSON := []byte(`{"stream":"btcusdt@kline_1m","data":{ "e": "kline", "E": 123456789, - "s": "BNBBTC", + "s": "BTCUSDT", "k": { "t": 123400000, "T": 123460000, - "s": "BNBBTC", + "s": "BTCUSDT", "i": "1m", "f": 100, "L": 200, @@ -2041,10 +2067,11 @@ func TestWsKlineUpdate(t *testing.T) { func TestWsTradeUpdate(t *testing.T) { t.Parallel() + b.SetSaveTradeDataStatus(true) pressXToJSON := []byte(`{"stream":"btcusdt@trade","data":{ "e": "trade", "E": 123456789, - "s": "BNBBTC", + "s": "BTCUSDT", "t": 12345, "p": "0.001", "q": "100", diff --git a/exchanges/binance/binance_types.go b/exchanges/binance/binance_types.go index 9d6cf8ef..de28e296 100644 --- a/exchanges/binance/binance_types.go +++ b/exchanges/binance/binance_types.go @@ -163,13 +163,13 @@ type DepthUpdateParams []struct { // WebsocketDepthStream is the difference for the update depth stream type WebsocketDepthStream struct { - Event string `json:"e"` - Timestamp time.Time `json:"E"` - Pair string `json:"s"` - FirstUpdateID int64 `json:"U"` - LastUpdateID int64 `json:"u"` - UpdateBids [][2]interface{} `json:"b"` - UpdateAsks [][2]interface{} `json:"a"` + Event string `json:"e"` + Timestamp time.Time `json:"E"` + Pair string `json:"s"` + FirstUpdateID int64 `json:"U"` + LastUpdateID int64 `json:"u"` + UpdateBids [][2]types.Number `json:"b"` + UpdateAsks [][2]types.Number `json:"a"` } // RecentTradeRequestParams represents Klines request data. @@ -190,17 +190,17 @@ type RecentTrade struct { // TradeStream holds the trade stream data type TradeStream struct { - EventType string `json:"e"` - EventTime time.Time `json:"E"` - Symbol string `json:"s"` - TradeID int64 `json:"t"` - Price string `json:"p"` - Quantity string `json:"q"` - BuyerOrderID int64 `json:"b"` - SellerOrderID int64 `json:"a"` - TimeStamp time.Time `json:"T"` - Maker bool `json:"m"` - BestMatchPrice bool `json:"M"` + EventType string `json:"e"` + EventTime time.Time `json:"E"` + Symbol string `json:"s"` + TradeID int64 `json:"t"` + Price types.Number `json:"p"` + Quantity types.Number `json:"q"` + BuyerOrderID int64 `json:"b"` + SellerOrderID int64 `json:"a"` + TimeStamp time.Time `json:"T"` + Maker bool `json:"m"` + BestMatchPrice bool `json:"M"` } // KlineStream holds the kline stream data @@ -213,49 +213,49 @@ type KlineStream struct { // KlineStreamData defines kline streaming data type KlineStreamData struct { - StartTime time.Time `json:"t"` - CloseTime time.Time `json:"T"` - Symbol string `json:"s"` - Interval string `json:"i"` - FirstTradeID int64 `json:"f"` - LastTradeID int64 `json:"L"` - OpenPrice float64 `json:"o,string"` - ClosePrice float64 `json:"c,string"` - HighPrice float64 `json:"h,string"` - LowPrice float64 `json:"l,string"` - Volume float64 `json:"v,string"` - NumberOfTrades int64 `json:"n"` - KlineClosed bool `json:"x"` - Quote float64 `json:"q,string"` - TakerBuyBaseAssetVolume float64 `json:"V,string"` - TakerBuyQuoteAssetVolume float64 `json:"Q,string"` + StartTime time.Time `json:"t"` + CloseTime time.Time `json:"T"` + Symbol string `json:"s"` + Interval string `json:"i"` + FirstTradeID int64 `json:"f"` + LastTradeID int64 `json:"L"` + OpenPrice types.Number `json:"o"` + ClosePrice types.Number `json:"c"` + HighPrice types.Number `json:"h"` + LowPrice types.Number `json:"l"` + Volume types.Number `json:"v"` + NumberOfTrades int64 `json:"n"` + KlineClosed bool `json:"x"` + Quote types.Number `json:"q"` + TakerBuyBaseAssetVolume types.Number `json:"V"` + TakerBuyQuoteAssetVolume types.Number `json:"Q"` } // TickerStream holds the ticker stream data type TickerStream struct { - EventType string `json:"e"` - EventTime time.Time `json:"E"` - Symbol string `json:"s"` - PriceChange float64 `json:"p,string"` - PriceChangePercent float64 `json:"P,string"` - WeightedAvgPrice float64 `json:"w,string"` - ClosePrice float64 `json:"x,string"` - LastPrice float64 `json:"c,string"` - LastPriceQuantity float64 `json:"Q,string"` - BestBidPrice float64 `json:"b,string"` - BestBidQuantity float64 `json:"B,string"` - BestAskPrice float64 `json:"a,string"` - BestAskQuantity float64 `json:"A,string"` - OpenPrice float64 `json:"o,string"` - HighPrice float64 `json:"h,string"` - LowPrice float64 `json:"l,string"` - TotalTradedVolume float64 `json:"v,string"` - TotalTradedQuoteVolume float64 `json:"q,string"` - OpenTime time.Time `json:"O"` - CloseTime time.Time `json:"C"` - FirstTradeID int64 `json:"F"` - LastTradeID int64 `json:"L"` - NumberOfTrades int64 `json:"n"` + EventType string `json:"e"` + EventTime time.Time `json:"E"` + Symbol string `json:"s"` + PriceChange types.Number `json:"p"` + PriceChangePercent types.Number `json:"P"` + WeightedAvgPrice types.Number `json:"w"` + ClosePrice types.Number `json:"x"` + LastPrice types.Number `json:"c"` + LastPriceQuantity types.Number `json:"Q"` + BestBidPrice types.Number `json:"b"` + BestBidQuantity types.Number `json:"B"` + BestAskPrice types.Number `json:"a"` + BestAskQuantity types.Number `json:"A"` + OpenPrice types.Number `json:"o"` + HighPrice types.Number `json:"h"` + LowPrice types.Number `json:"l"` + TotalTradedVolume types.Number `json:"v"` + TotalTradedQuoteVolume types.Number `json:"q"` + OpenTime time.Time `json:"O"` + CloseTime time.Time `json:"C"` + FirstTradeID int64 `json:"F"` + LastTradeID int64 `json:"L"` + NumberOfTrades int64 `json:"n"` } // HistoricalTrade holds recent trade data diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 0a3fb38e..52cdd0cc 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -164,294 +164,258 @@ func (b *Binance) wsReadData() { } func (b *Binance) wsHandleData(respRaw []byte) error { - var multiStreamData map[string]interface{} - err := json.Unmarshal(respRaw, &multiStreamData) - if err != nil { - return err - } - if id, err := jsonparser.GetInt(respRaw, "id"); err == nil { if b.Websocket.Match.IncomingWithData(id, respRaw) { return nil } } - if r, ok := multiStreamData["result"]; ok { - if r == nil { + if resultString, err := jsonparser.GetUnsafeString(respRaw, "result"); err == nil { + if resultString == "null" { + return nil + } + } + jsonData, _, _, err := jsonparser.Get(respRaw, "data") + if err != nil { + return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw)) + } + var event string + event, err = jsonparser.GetUnsafeString(jsonData, "e") + if err == nil { + switch event { + case "outboundAccountPosition": + var data wsAccountPosition + err = json.Unmarshal(respRaw, &data) + if err != nil { + return fmt.Errorf("%v - Could not convert to outboundAccountPosition structure %s", + b.Name, + err) + } + b.Websocket.DataHandler <- data + return nil + case "balanceUpdate": + var data wsBalanceUpdate + err = json.Unmarshal(respRaw, &data) + if err != nil { + return fmt.Errorf("%v - Could not convert to balanceUpdate structure %s", + b.Name, + err) + } + b.Websocket.DataHandler <- data + return nil + case "executionReport": + var data wsOrderUpdate + err = json.Unmarshal(respRaw, &data) + if err != nil { + return fmt.Errorf("%v - Could not convert to executionReport structure %s", + b.Name, + err) + } + avgPrice := 0.0 + if data.Data.CumulativeFilledQuantity != 0 { + avgPrice = data.Data.CumulativeQuoteTransactedQuantity / data.Data.CumulativeFilledQuantity + } + remainingAmount := data.Data.Quantity - data.Data.CumulativeFilledQuantity + var pair currency.Pair + var assetType asset.Item + pair, assetType, err = b.GetRequestFormattedPairAndAssetType(data.Data.Symbol) + if err != nil { + return err + } + var feeAsset currency.Code + if data.Data.CommissionAsset != "" { + feeAsset = currency.NewCode(data.Data.CommissionAsset) + } + orderID := strconv.FormatInt(data.Data.OrderID, 10) + var orderStatus order.Status + orderStatus, err = stringToOrderStatus(data.Data.OrderStatus) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: orderID, + Err: err, + } + } + clientOrderID := data.Data.ClientOrderID + if orderStatus == order.Cancelled { + clientOrderID = data.Data.CancelledClientOrderID + } + var orderType order.Type + orderType, err = order.StringToOrderType(data.Data.OrderType) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: orderID, + Err: err, + } + } + var orderSide order.Side + orderSide, err = order.StringToOrderSide(data.Data.Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: orderID, + Err: err, + } + } + b.Websocket.DataHandler <- &order.Detail{ + Price: data.Data.Price, + Amount: data.Data.Quantity, + AverageExecutedPrice: avgPrice, + ExecutedAmount: data.Data.CumulativeFilledQuantity, + RemainingAmount: remainingAmount, + Cost: data.Data.CumulativeQuoteTransactedQuantity, + CostAsset: pair.Quote, + Fee: data.Data.Commission, + FeeAsset: feeAsset, + Exchange: b.Name, + OrderID: orderID, + ClientOrderID: clientOrderID, + Type: orderType, + Side: orderSide, + Status: orderStatus, + AssetType: assetType, + Date: data.Data.OrderCreationTime, + LastUpdated: data.Data.TransactionTime, + Pair: pair, + } + return nil + case "listStatus": + var data wsListStatus + err = json.Unmarshal(respRaw, &data) + if err != nil { + return fmt.Errorf("%v - Could not convert to listStatus structure %s", + b.Name, + err) + } + b.Websocket.DataHandler <- data return nil } } - if newdata, ok := multiStreamData["data"].(map[string]interface{}); ok { - if e, ok := newdata["e"].(string); ok { - switch e { - case "outboundAccountInfo": - var data wsAccountInfo - err := json.Unmarshal(respRaw, &data) - if err != nil { - return fmt.Errorf("%v - Could not convert to outboundAccountInfo structure %s", - b.Name, - err) - } - b.Websocket.DataHandler <- data - return nil - case "outboundAccountPosition": - var data wsAccountPosition - err := json.Unmarshal(respRaw, &data) - if err != nil { - return fmt.Errorf("%v - Could not convert to outboundAccountPosition structure %s", - b.Name, - err) - } - b.Websocket.DataHandler <- data - return nil - case "balanceUpdate": - var data wsBalanceUpdate - err := json.Unmarshal(respRaw, &data) - if err != nil { - return fmt.Errorf("%v - Could not convert to balanceUpdate structure %s", - b.Name, - err) - } - b.Websocket.DataHandler <- data - return nil - case "executionReport": - var data wsOrderUpdate - err := json.Unmarshal(respRaw, &data) - if err != nil { - return fmt.Errorf("%v - Could not convert to executionReport structure %s", - b.Name, - err) - } - averagePrice := 0.0 - if data.Data.CumulativeFilledQuantity != 0 { - averagePrice = data.Data.CumulativeQuoteTransactedQuantity / data.Data.CumulativeFilledQuantity - } - remainingAmount := data.Data.Quantity - data.Data.CumulativeFilledQuantity - pair, assetType, err := b.GetRequestFormattedPairAndAssetType(data.Data.Symbol) - if err != nil { - return err - } - var feeAsset currency.Code - if data.Data.CommissionAsset != "" { - feeAsset = currency.NewCode(data.Data.CommissionAsset) - } - orderID := strconv.FormatInt(data.Data.OrderID, 10) - orderStatus, err := stringToOrderStatus(data.Data.OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: orderID, - Err: err, - } - } - clientOrderID := data.Data.ClientOrderID - if orderStatus == order.Cancelled { - clientOrderID = data.Data.CancelledClientOrderID - } - orderType, err := order.StringToOrderType(data.Data.OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: orderID, - Err: err, - } - } - orderSide, err := order.StringToOrderSide(data.Data.Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: orderID, - Err: err, - } - } - b.Websocket.DataHandler <- &order.Detail{ - Price: data.Data.Price, - Amount: data.Data.Quantity, - AverageExecutedPrice: averagePrice, - ExecutedAmount: data.Data.CumulativeFilledQuantity, - RemainingAmount: remainingAmount, - Cost: data.Data.CumulativeQuoteTransactedQuantity, - CostAsset: pair.Quote, - Fee: data.Data.Commission, - FeeAsset: feeAsset, - Exchange: b.Name, - OrderID: orderID, - ClientOrderID: clientOrderID, - Type: orderType, - Side: orderSide, - Status: orderStatus, - AssetType: assetType, - Date: data.Data.OrderCreationTime, - LastUpdated: data.Data.TransactionTime, - Pair: pair, - } - return nil - case "listStatus": - var data wsListStatus - err := json.Unmarshal(respRaw, &data) - if err != nil { - return fmt.Errorf("%v - Could not convert to listStatus structure %s", - b.Name, - err) - } - b.Websocket.DataHandler <- data + streamStr, err := jsonparser.GetUnsafeString(respRaw, "stream") + if err != nil { + if errors.Is(err, jsonparser.KeyPathNotFoundError) { + return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw)) + } + return err + } + streamType := strings.Split(streamStr, "@") + if len(streamType) <= 1 { + return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw)) + } + var ( + pair currency.Pair + isEnabled bool + symbol string + ) + symbol, err = jsonparser.GetUnsafeString(jsonData, "s") + if err != nil { + // there should be a symbol returned for all data types below + return err + } + pair, isEnabled, err = b.MatchSymbolCheckEnabled(symbol, asset.Spot, false) + if err != nil { + return err + } + if !isEnabled { + return nil + } + switch streamType[1] { + case "trade": + saveTradeData := b.IsSaveTradeDataEnabled() + if !saveTradeData && + !b.IsTradeFeedEnabled() { + return nil + } + + var t TradeStream + err = json.Unmarshal(jsonData, &t) + if err != nil { + return fmt.Errorf("%v - Could not unmarshal trade data: %s", + b.Name, + err) + } + return b.Websocket.Trade.Update(saveTradeData, + trade.Data{ + CurrencyPair: pair, + Timestamp: t.TimeStamp, + Price: t.Price.Float64(), + Amount: t.Quantity.Float64(), + Exchange: b.Name, + AssetType: asset.Spot, + TID: strconv.FormatInt(t.TradeID, 10), + }) + case "ticker": + var t TickerStream + err = json.Unmarshal(jsonData, &t) + if err != nil { + return fmt.Errorf("%v - Could not convert to a TickerStream structure %s", + b.Name, + err.Error()) + } + b.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: b.Name, + Open: t.OpenPrice.Float64(), + Close: t.ClosePrice.Float64(), + Volume: t.TotalTradedVolume.Float64(), + QuoteVolume: t.TotalTradedQuoteVolume.Float64(), + High: t.HighPrice.Float64(), + Low: t.LowPrice.Float64(), + Bid: t.BestBidPrice.Float64(), + Ask: t.BestAskPrice.Float64(), + Last: t.LastPrice.Float64(), + LastUpdated: t.EventTime, + AssetType: asset.Spot, + Pair: pair, + } + return nil + case "kline_1m", "kline_3m", "kline_5m", "kline_15m", "kline_30m", "kline_1h", "kline_2h", "kline_4h", + "kline_6h", "kline_8h", "kline_12h", "kline_1d", "kline_3d", "kline_1w", "kline_1M": + var kline KlineStream + err = json.Unmarshal(jsonData, &kline) + if err != nil { + return fmt.Errorf("%v - Could not convert to a KlineStream structure %s", + b.Name, + err) + } + b.Websocket.DataHandler <- stream.KlineData{ + Timestamp: kline.EventTime, + Pair: pair, + AssetType: asset.Spot, + Exchange: b.Name, + StartTime: kline.Kline.StartTime, + CloseTime: kline.Kline.CloseTime, + Interval: kline.Kline.Interval, + OpenPrice: kline.Kline.OpenPrice.Float64(), + ClosePrice: kline.Kline.ClosePrice.Float64(), + HighPrice: kline.Kline.HighPrice.Float64(), + LowPrice: kline.Kline.LowPrice.Float64(), + Volume: kline.Kline.Volume.Float64(), + } + return nil + case "depth": + var depth WebsocketDepthStream + err = json.Unmarshal(jsonData, &depth) + if err != nil { + return fmt.Errorf("%v - Could not convert to depthStream structure %s", + b.Name, + err) + } + var init bool + init, err = b.UpdateLocalBuffer(&depth) + if err != nil { + if init { return nil } + return fmt.Errorf("%v - UpdateLocalCache error: %s", + b.Name, + err) } + return nil + default: + return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw)) } - if wsStream, ok := multiStreamData["stream"].(string); ok { - streamType := strings.Split(wsStream, "@") - if len(streamType) > 1 { - if data, ok := multiStreamData["data"]; ok { - rawData, err := json.Marshal(data) - if err != nil { - return err - } - - pairs, err := b.GetEnabledPairs(asset.Spot) - if err != nil { - return err - } - - format, err := b.GetPairFormat(asset.Spot, true) - if err != nil { - return err - } - - switch streamType[1] { - case "trade": - saveTradeData := b.IsSaveTradeDataEnabled() - - if !saveTradeData && - !b.IsTradeFeedEnabled() { - return nil - } - - var t TradeStream - err := json.Unmarshal(rawData, &t) - if err != nil { - return fmt.Errorf("%v - Could not unmarshal trade data: %s", - b.Name, - err) - } - - price, err := strconv.ParseFloat(t.Price, 64) - if err != nil { - return fmt.Errorf("%v - price conversion error: %s", - b.Name, - err) - } - - amount, err := strconv.ParseFloat(t.Quantity, 64) - if err != nil { - return fmt.Errorf("%v - amount conversion error: %s", - b.Name, - err) - } - - pair, err := currency.NewPairFromFormattedPairs(t.Symbol, pairs, format) - if err != nil { - return err - } - - return b.Websocket.Trade.Update(saveTradeData, - trade.Data{ - CurrencyPair: pair, - Timestamp: t.TimeStamp, - Price: price, - Amount: amount, - Exchange: b.Name, - AssetType: asset.Spot, - TID: strconv.FormatInt(t.TradeID, 10), - }) - case "ticker": - var t TickerStream - err := json.Unmarshal(rawData, &t) - if err != nil { - return fmt.Errorf("%v - Could not convert to a TickerStream structure %s", - b.Name, - err.Error()) - } - - pair, err := currency.NewPairFromFormattedPairs(t.Symbol, pairs, format) - if err != nil { - return err - } - - b.Websocket.DataHandler <- &ticker.Price{ - ExchangeName: b.Name, - Open: t.OpenPrice, - Close: t.ClosePrice, - Volume: t.TotalTradedVolume, - QuoteVolume: t.TotalTradedQuoteVolume, - High: t.HighPrice, - Low: t.LowPrice, - Bid: t.BestBidPrice, - Ask: t.BestAskPrice, - Last: t.LastPrice, - LastUpdated: t.EventTime, - AssetType: asset.Spot, - Pair: pair, - } - return nil - case "kline_1m", "kline_3m", "kline_5m", "kline_15m", "kline_30m", "kline_1h", "kline_2h", "kline_4h", - "kline_6h", "kline_8h", "kline_12h", "kline_1d", "kline_3d", "kline_1w", "kline_1M": - var kline KlineStream - err := json.Unmarshal(rawData, &kline) - if err != nil { - return fmt.Errorf("%v - Could not convert to a KlineStream structure %s", - b.Name, - err) - } - - pair, err := currency.NewPairFromFormattedPairs(kline.Symbol, pairs, format) - if err != nil { - return err - } - - b.Websocket.DataHandler <- stream.KlineData{ - Timestamp: kline.EventTime, - Pair: pair, - AssetType: asset.Spot, - Exchange: b.Name, - StartTime: kline.Kline.StartTime, - CloseTime: kline.Kline.CloseTime, - Interval: kline.Kline.Interval, - OpenPrice: kline.Kline.OpenPrice, - ClosePrice: kline.Kline.ClosePrice, - HighPrice: kline.Kline.HighPrice, - LowPrice: kline.Kline.LowPrice, - Volume: kline.Kline.Volume, - } - return nil - case "depth": - var depth WebsocketDepthStream - err := json.Unmarshal(rawData, &depth) - if err != nil { - return fmt.Errorf("%v - Could not convert to depthStream structure %s", - b.Name, - err) - } - init, err := b.UpdateLocalBuffer(&depth) - if err != nil { - if init { - return nil - } - return fmt.Errorf("%v - UpdateLocalCache error: %s", - b.Name, - err) - } - return nil - default: - b.Websocket.DataHandler <- stream.UnhandledMessageWarning{ - Message: b.Name + stream.UnhandledMessage + string(respRaw), - } - } - } - } - } - return fmt.Errorf("unhandled stream data %s", string(respRaw)) } func stringToOrderStatus(status string) (order.Status, error) { @@ -517,35 +481,22 @@ func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBoo // UpdateLocalBuffer updates and returns the most recent iteration of the orderbook func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) { - enabledPairs, err := b.GetEnabledPairs(asset.Spot) + pair, err := b.MatchSymbolWithAvailablePairs(wsdp.Pair, asset.Spot, false) if err != nil { return false, err } - - format, err := b.GetPairFormat(asset.Spot, true) + err = b.obm.stageWsUpdate(wsdp, pair, asset.Spot) if err != nil { - return false, err - } - - currencyPair, err := currency.NewPairFromFormattedPairs(wsdp.Pair, - enabledPairs, - format) - if err != nil { - return false, err - } - - err = b.obm.stageWsUpdate(wsdp, currencyPair, asset.Spot) - if err != nil { - init, err2 := b.obm.checkIsInitialSync(currencyPair) + init, err2 := b.obm.checkIsInitialSync(pair) if err2 != nil { return false, err2 } return init, err } - err = b.applyBufferUpdate(currencyPair) + err = b.applyBufferUpdate(pair) if err != nil { - b.flushAndCleanup(currencyPair) + b.flushAndCleanup(pair) } return false, err @@ -692,46 +643,18 @@ func (b *Binance) unsubscribeFromChan(chans []subscription.Subscription) error { func (b *Binance) ProcessUpdate(cp currency.Pair, a asset.Item, ws *WebsocketDepthStream) error { updateBid := make([]orderbook.Item, len(ws.UpdateBids)) for i := range ws.UpdateBids { - price, ok := ws.UpdateBids[i][0].(string) - if !ok { - return errors.New("type assertion failed for bid price") + updateBid[i] = orderbook.Item{ + Price: ws.UpdateBids[i][0].Float64(), + Amount: ws.UpdateBids[i][1].Float64(), } - p, err := strconv.ParseFloat(price, 64) - if err != nil { - return err - } - amount, ok := ws.UpdateBids[i][1].(string) - if !ok { - return errors.New("type assertion failed for bid amount") - } - a, err := strconv.ParseFloat(amount, 64) - if err != nil { - return err - } - updateBid[i] = orderbook.Item{Price: p, Amount: a} } - updateAsk := make([]orderbook.Item, len(ws.UpdateAsks)) for i := range ws.UpdateAsks { - price, ok := ws.UpdateAsks[i][0].(string) - if !ok { - return errors.New("type assertion failed for ask price") + updateAsk[i] = orderbook.Item{ + Price: ws.UpdateAsks[i][0].Float64(), + Amount: ws.UpdateAsks[i][1].Float64(), } - p, err := strconv.ParseFloat(price, 64) - if err != nil { - return err - } - amount, ok := ws.UpdateAsks[i][1].(string) - if !ok { - return errors.New("type assertion failed for ask amount") - } - a, err := strconv.ParseFloat(amount, 64) - if err != nil { - return err - } - updateAsk[i] = orderbook.Item{Price: p, Amount: a} } - return b.Websocket.Orderbook.Update(&orderbook.Update{ Bids: updateBid, Asks: updateAsk, diff --git a/exchanges/binance/testdata/wsHandleData.json b/exchanges/binance/testdata/wsHandleData.json new file mode 100644 index 00000000..8582e78f --- /dev/null +++ b/exchanges/binance/testdata/wsHandleData.json @@ -0,0 +1,8 @@ +{"stream":"btcusdt@ticker","data":{"e":"24hrTicker","E":1580254809477,"s":"BTCUSDT","p":"420.97000000","P":"4.720","w":"9058.27981278","x":"8917.98000000","c":"9338.96000000","Q":"0.17246300","b":"9338.03000000","B":"0.18234600","a":"9339.70000000","A":"0.14097600","o":"8917.99000000","h":"9373.19000000","l":"8862.40000000","v":"72229.53692000","q":"654275356.16896672","O":1580168409456,"C":1580254809456,"F":235294268,"L":235894703,"n":600436}} +{"stream":"btcusdt@kline_1m","data":{"e": "kline","E": 123456789,"s": "BTCUSDT","k": {"t": 123400000,"T": 123460000,"s": "BTCUSDT","i": "1m","f": 100,"L": 200,"o": "0.0010","c": "0.0020","h": "0.0025","l": "0.0015","v": "1000","n": 100,"x": false,"q": "1.0000","V": "500","Q": "0.500","B": "123456"}}} +{"stream":"btcusdt@trade","data":{"e": "trade","E": 123456789,"s": "BTCUSDT","t": 12345,"p": "0.001","q": "100","b": 88,"a": 50,"T": 123456785,"m": true,"M": true}} +{"stream":"btcusdt@depth","data":{"e": "depthUpdate","E": 123456788,"s": "BTCUSDT","U": 157,"u": 160,"b": [["6621.45", "0.3"]],"a": [["6622.46", "1.5"]]}} +{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e": "balanceUpdate","E": 1573200697110,"a": "BTC","d": "100.00000000","T": 1573200697068}} +{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e": "listStatus","E": 1564035303637,"s": "ETHBTC","g": 2,"c": "OCO","l": "EXEC_STARTED","L": "EXECUTING","r": "NONE","C": "F4QN4G8DlFATFlIUQ0cjdD","T": 1564035303625,"O": [{"s": "ETHBTC","i": 17,"c": "AJYsMjErWJesZvqlJCTUgL"},{"s": "ETHBTC","i": 18,"c": "bfYPSQdLoqAJeNrOr9adzq"}]}} +{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"executionReport","E":1616627567900,"s":"BTCUSDT","c":"c4wyKsIhoAaittTYlIVLqk","S":"BUY","o":"LIMIT","f":"GTC","q":"0.00028400","p":"52789.10000000","P":"0.00000000","F":"0.00000000","g":-1,"C":"","x":"NEW","X":"NEW","r":"NONE","i":5340845958,"l":"0.00000000","z":"0.00000000","L":"0.00000000","n":"0","N":"BTC","T":1616627567900,"t":-1,"I":11388173160,"w":true,"m":false,"M":false,"O":1616627567900,"Z":"0.00000000","Y":"0.00000000","Q":"0.00000000","W":1616627567900}} +{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"outboundAccountPosition","E":1616628815745,"u":1616628815745,"B":[{"a":"BTC","f":"0.00225109","l":"0.00123000"},{"a":"BNB","f":"0.00000000","l":"0.00000000"},{"a":"USDT","f":"54.43390661","l":"0.00000000"}]}} \ No newline at end of file