Binance: Improve wsHandleData performance (#1452)

* improve binance handleData performance

* new benchmark

* enable all pairs to scale test

* hmmm

* updated benchmark

* type.Number improvement

* improve benchmark

* Update binance_websocket.go

* minor fixes post-merge
This commit is contained in:
Scott
2024-02-06 16:52:13 +11:00
committed by GitHub
parent 6de314f8d1
commit d88c3b7d3f
4 changed files with 344 additions and 386 deletions

View File

@@ -1,10 +1,12 @@
package binance
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
"sync"
"testing"
@@ -1955,6 +1957,30 @@ func TestGetDepositAddress(t *testing.T) {
}
}
func BenchmarkWsHandleData(bb *testing.B) {
bb.ReportAllocs()
ap, err := b.CurrencyPairs.GetPairs(asset.Spot, false)
require.NoError(bb, err)
err = b.CurrencyPairs.StorePairs(asset.Spot, ap, true)
require.NoError(bb, err)
data, err := os.ReadFile("testdata/wsHandleData.json")
require.NoError(bb, err)
lines := bytes.Split(data, []byte("\n"))
require.Len(bb, lines, 8)
go func() {
for {
<-b.Websocket.DataHandler
}
}()
bb.ResetTimer()
for i := 0; i < bb.N; i++ {
for x := range lines {
assert.NoError(bb, b.wsHandleData(lines[x]))
}
}
}
func TestSubscribe(t *testing.T) {
t.Parallel()
b := b
@@ -2012,11 +2038,11 @@ func TestWsKlineUpdate(t *testing.T) {
pressXToJSON := []byte(`{"stream":"btcusdt@kline_1m","data":{
"e": "kline",
"E": 123456789,
"s": "BNBBTC",
"s": "BTCUSDT",
"k": {
"t": 123400000,
"T": 123460000,
"s": "BNBBTC",
"s": "BTCUSDT",
"i": "1m",
"f": 100,
"L": 200,
@@ -2041,10 +2067,11 @@ func TestWsKlineUpdate(t *testing.T) {
func TestWsTradeUpdate(t *testing.T) {
t.Parallel()
b.SetSaveTradeDataStatus(true)
pressXToJSON := []byte(`{"stream":"btcusdt@trade","data":{
"e": "trade",
"E": 123456789,
"s": "BNBBTC",
"s": "BTCUSDT",
"t": 12345,
"p": "0.001",
"q": "100",

View File

@@ -163,13 +163,13 @@ type DepthUpdateParams []struct {
// WebsocketDepthStream is the difference for the update depth stream
type WebsocketDepthStream struct {
Event string `json:"e"`
Timestamp time.Time `json:"E"`
Pair string `json:"s"`
FirstUpdateID int64 `json:"U"`
LastUpdateID int64 `json:"u"`
UpdateBids [][2]interface{} `json:"b"`
UpdateAsks [][2]interface{} `json:"a"`
Event string `json:"e"`
Timestamp time.Time `json:"E"`
Pair string `json:"s"`
FirstUpdateID int64 `json:"U"`
LastUpdateID int64 `json:"u"`
UpdateBids [][2]types.Number `json:"b"`
UpdateAsks [][2]types.Number `json:"a"`
}
// RecentTradeRequestParams represents Klines request data.
@@ -190,17 +190,17 @@ type RecentTrade struct {
// TradeStream holds the trade stream data
type TradeStream struct {
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
TradeID int64 `json:"t"`
Price string `json:"p"`
Quantity string `json:"q"`
BuyerOrderID int64 `json:"b"`
SellerOrderID int64 `json:"a"`
TimeStamp time.Time `json:"T"`
Maker bool `json:"m"`
BestMatchPrice bool `json:"M"`
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
TradeID int64 `json:"t"`
Price types.Number `json:"p"`
Quantity types.Number `json:"q"`
BuyerOrderID int64 `json:"b"`
SellerOrderID int64 `json:"a"`
TimeStamp time.Time `json:"T"`
Maker bool `json:"m"`
BestMatchPrice bool `json:"M"`
}
// KlineStream holds the kline stream data
@@ -213,49 +213,49 @@ type KlineStream struct {
// KlineStreamData defines kline streaming data
type KlineStreamData struct {
StartTime time.Time `json:"t"`
CloseTime time.Time `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
OpenPrice float64 `json:"o,string"`
ClosePrice float64 `json:"c,string"`
HighPrice float64 `json:"h,string"`
LowPrice float64 `json:"l,string"`
Volume float64 `json:"v,string"`
NumberOfTrades int64 `json:"n"`
KlineClosed bool `json:"x"`
Quote float64 `json:"q,string"`
TakerBuyBaseAssetVolume float64 `json:"V,string"`
TakerBuyQuoteAssetVolume float64 `json:"Q,string"`
StartTime time.Time `json:"t"`
CloseTime time.Time `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
OpenPrice types.Number `json:"o"`
ClosePrice types.Number `json:"c"`
HighPrice types.Number `json:"h"`
LowPrice types.Number `json:"l"`
Volume types.Number `json:"v"`
NumberOfTrades int64 `json:"n"`
KlineClosed bool `json:"x"`
Quote types.Number `json:"q"`
TakerBuyBaseAssetVolume types.Number `json:"V"`
TakerBuyQuoteAssetVolume types.Number `json:"Q"`
}
// TickerStream holds the ticker stream data
type TickerStream struct {
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
PriceChange float64 `json:"p,string"`
PriceChangePercent float64 `json:"P,string"`
WeightedAvgPrice float64 `json:"w,string"`
ClosePrice float64 `json:"x,string"`
LastPrice float64 `json:"c,string"`
LastPriceQuantity float64 `json:"Q,string"`
BestBidPrice float64 `json:"b,string"`
BestBidQuantity float64 `json:"B,string"`
BestAskPrice float64 `json:"a,string"`
BestAskQuantity float64 `json:"A,string"`
OpenPrice float64 `json:"o,string"`
HighPrice float64 `json:"h,string"`
LowPrice float64 `json:"l,string"`
TotalTradedVolume float64 `json:"v,string"`
TotalTradedQuoteVolume float64 `json:"q,string"`
OpenTime time.Time `json:"O"`
CloseTime time.Time `json:"C"`
FirstTradeID int64 `json:"F"`
LastTradeID int64 `json:"L"`
NumberOfTrades int64 `json:"n"`
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
PriceChange types.Number `json:"p"`
PriceChangePercent types.Number `json:"P"`
WeightedAvgPrice types.Number `json:"w"`
ClosePrice types.Number `json:"x"`
LastPrice types.Number `json:"c"`
LastPriceQuantity types.Number `json:"Q"`
BestBidPrice types.Number `json:"b"`
BestBidQuantity types.Number `json:"B"`
BestAskPrice types.Number `json:"a"`
BestAskQuantity types.Number `json:"A"`
OpenPrice types.Number `json:"o"`
HighPrice types.Number `json:"h"`
LowPrice types.Number `json:"l"`
TotalTradedVolume types.Number `json:"v"`
TotalTradedQuoteVolume types.Number `json:"q"`
OpenTime time.Time `json:"O"`
CloseTime time.Time `json:"C"`
FirstTradeID int64 `json:"F"`
LastTradeID int64 `json:"L"`
NumberOfTrades int64 `json:"n"`
}
// HistoricalTrade holds recent trade data

View File

@@ -164,294 +164,258 @@ func (b *Binance) wsReadData() {
}
func (b *Binance) wsHandleData(respRaw []byte) error {
var multiStreamData map[string]interface{}
err := json.Unmarshal(respRaw, &multiStreamData)
if err != nil {
return err
}
if id, err := jsonparser.GetInt(respRaw, "id"); err == nil {
if b.Websocket.Match.IncomingWithData(id, respRaw) {
return nil
}
}
if r, ok := multiStreamData["result"]; ok {
if r == nil {
if resultString, err := jsonparser.GetUnsafeString(respRaw, "result"); err == nil {
if resultString == "null" {
return nil
}
}
jsonData, _, _, err := jsonparser.Get(respRaw, "data")
if err != nil {
return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw))
}
var event string
event, err = jsonparser.GetUnsafeString(jsonData, "e")
if err == nil {
switch event {
case "outboundAccountPosition":
var data wsAccountPosition
err = json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to outboundAccountPosition structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
case "balanceUpdate":
var data wsBalanceUpdate
err = json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to balanceUpdate structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
case "executionReport":
var data wsOrderUpdate
err = json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to executionReport structure %s",
b.Name,
err)
}
avgPrice := 0.0
if data.Data.CumulativeFilledQuantity != 0 {
avgPrice = data.Data.CumulativeQuoteTransactedQuantity / data.Data.CumulativeFilledQuantity
}
remainingAmount := data.Data.Quantity - data.Data.CumulativeFilledQuantity
var pair currency.Pair
var assetType asset.Item
pair, assetType, err = b.GetRequestFormattedPairAndAssetType(data.Data.Symbol)
if err != nil {
return err
}
var feeAsset currency.Code
if data.Data.CommissionAsset != "" {
feeAsset = currency.NewCode(data.Data.CommissionAsset)
}
orderID := strconv.FormatInt(data.Data.OrderID, 10)
var orderStatus order.Status
orderStatus, err = stringToOrderStatus(data.Data.OrderStatus)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
clientOrderID := data.Data.ClientOrderID
if orderStatus == order.Cancelled {
clientOrderID = data.Data.CancelledClientOrderID
}
var orderType order.Type
orderType, err = order.StringToOrderType(data.Data.OrderType)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
var orderSide order.Side
orderSide, err = order.StringToOrderSide(data.Data.Side)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
b.Websocket.DataHandler <- &order.Detail{
Price: data.Data.Price,
Amount: data.Data.Quantity,
AverageExecutedPrice: avgPrice,
ExecutedAmount: data.Data.CumulativeFilledQuantity,
RemainingAmount: remainingAmount,
Cost: data.Data.CumulativeQuoteTransactedQuantity,
CostAsset: pair.Quote,
Fee: data.Data.Commission,
FeeAsset: feeAsset,
Exchange: b.Name,
OrderID: orderID,
ClientOrderID: clientOrderID,
Type: orderType,
Side: orderSide,
Status: orderStatus,
AssetType: assetType,
Date: data.Data.OrderCreationTime,
LastUpdated: data.Data.TransactionTime,
Pair: pair,
}
return nil
case "listStatus":
var data wsListStatus
err = json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to listStatus structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
}
}
if newdata, ok := multiStreamData["data"].(map[string]interface{}); ok {
if e, ok := newdata["e"].(string); ok {
switch e {
case "outboundAccountInfo":
var data wsAccountInfo
err := json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to outboundAccountInfo structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
case "outboundAccountPosition":
var data wsAccountPosition
err := json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to outboundAccountPosition structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
case "balanceUpdate":
var data wsBalanceUpdate
err := json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to balanceUpdate structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
return nil
case "executionReport":
var data wsOrderUpdate
err := json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to executionReport structure %s",
b.Name,
err)
}
averagePrice := 0.0
if data.Data.CumulativeFilledQuantity != 0 {
averagePrice = data.Data.CumulativeQuoteTransactedQuantity / data.Data.CumulativeFilledQuantity
}
remainingAmount := data.Data.Quantity - data.Data.CumulativeFilledQuantity
pair, assetType, err := b.GetRequestFormattedPairAndAssetType(data.Data.Symbol)
if err != nil {
return err
}
var feeAsset currency.Code
if data.Data.CommissionAsset != "" {
feeAsset = currency.NewCode(data.Data.CommissionAsset)
}
orderID := strconv.FormatInt(data.Data.OrderID, 10)
orderStatus, err := stringToOrderStatus(data.Data.OrderStatus)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
clientOrderID := data.Data.ClientOrderID
if orderStatus == order.Cancelled {
clientOrderID = data.Data.CancelledClientOrderID
}
orderType, err := order.StringToOrderType(data.Data.OrderType)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
orderSide, err := order.StringToOrderSide(data.Data.Side)
if err != nil {
b.Websocket.DataHandler <- order.ClassificationError{
Exchange: b.Name,
OrderID: orderID,
Err: err,
}
}
b.Websocket.DataHandler <- &order.Detail{
Price: data.Data.Price,
Amount: data.Data.Quantity,
AverageExecutedPrice: averagePrice,
ExecutedAmount: data.Data.CumulativeFilledQuantity,
RemainingAmount: remainingAmount,
Cost: data.Data.CumulativeQuoteTransactedQuantity,
CostAsset: pair.Quote,
Fee: data.Data.Commission,
FeeAsset: feeAsset,
Exchange: b.Name,
OrderID: orderID,
ClientOrderID: clientOrderID,
Type: orderType,
Side: orderSide,
Status: orderStatus,
AssetType: assetType,
Date: data.Data.OrderCreationTime,
LastUpdated: data.Data.TransactionTime,
Pair: pair,
}
return nil
case "listStatus":
var data wsListStatus
err := json.Unmarshal(respRaw, &data)
if err != nil {
return fmt.Errorf("%v - Could not convert to listStatus structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- data
streamStr, err := jsonparser.GetUnsafeString(respRaw, "stream")
if err != nil {
if errors.Is(err, jsonparser.KeyPathNotFoundError) {
return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw))
}
return err
}
streamType := strings.Split(streamStr, "@")
if len(streamType) <= 1 {
return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw))
}
var (
pair currency.Pair
isEnabled bool
symbol string
)
symbol, err = jsonparser.GetUnsafeString(jsonData, "s")
if err != nil {
// there should be a symbol returned for all data types below
return err
}
pair, isEnabled, err = b.MatchSymbolCheckEnabled(symbol, asset.Spot, false)
if err != nil {
return err
}
if !isEnabled {
return nil
}
switch streamType[1] {
case "trade":
saveTradeData := b.IsSaveTradeDataEnabled()
if !saveTradeData &&
!b.IsTradeFeedEnabled() {
return nil
}
var t TradeStream
err = json.Unmarshal(jsonData, &t)
if err != nil {
return fmt.Errorf("%v - Could not unmarshal trade data: %s",
b.Name,
err)
}
return b.Websocket.Trade.Update(saveTradeData,
trade.Data{
CurrencyPair: pair,
Timestamp: t.TimeStamp,
Price: t.Price.Float64(),
Amount: t.Quantity.Float64(),
Exchange: b.Name,
AssetType: asset.Spot,
TID: strconv.FormatInt(t.TradeID, 10),
})
case "ticker":
var t TickerStream
err = json.Unmarshal(jsonData, &t)
if err != nil {
return fmt.Errorf("%v - Could not convert to a TickerStream structure %s",
b.Name,
err.Error())
}
b.Websocket.DataHandler <- &ticker.Price{
ExchangeName: b.Name,
Open: t.OpenPrice.Float64(),
Close: t.ClosePrice.Float64(),
Volume: t.TotalTradedVolume.Float64(),
QuoteVolume: t.TotalTradedQuoteVolume.Float64(),
High: t.HighPrice.Float64(),
Low: t.LowPrice.Float64(),
Bid: t.BestBidPrice.Float64(),
Ask: t.BestAskPrice.Float64(),
Last: t.LastPrice.Float64(),
LastUpdated: t.EventTime,
AssetType: asset.Spot,
Pair: pair,
}
return nil
case "kline_1m", "kline_3m", "kline_5m", "kline_15m", "kline_30m", "kline_1h", "kline_2h", "kline_4h",
"kline_6h", "kline_8h", "kline_12h", "kline_1d", "kline_3d", "kline_1w", "kline_1M":
var kline KlineStream
err = json.Unmarshal(jsonData, &kline)
if err != nil {
return fmt.Errorf("%v - Could not convert to a KlineStream structure %s",
b.Name,
err)
}
b.Websocket.DataHandler <- stream.KlineData{
Timestamp: kline.EventTime,
Pair: pair,
AssetType: asset.Spot,
Exchange: b.Name,
StartTime: kline.Kline.StartTime,
CloseTime: kline.Kline.CloseTime,
Interval: kline.Kline.Interval,
OpenPrice: kline.Kline.OpenPrice.Float64(),
ClosePrice: kline.Kline.ClosePrice.Float64(),
HighPrice: kline.Kline.HighPrice.Float64(),
LowPrice: kline.Kline.LowPrice.Float64(),
Volume: kline.Kline.Volume.Float64(),
}
return nil
case "depth":
var depth WebsocketDepthStream
err = json.Unmarshal(jsonData, &depth)
if err != nil {
return fmt.Errorf("%v - Could not convert to depthStream structure %s",
b.Name,
err)
}
var init bool
init, err = b.UpdateLocalBuffer(&depth)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache error: %s",
b.Name,
err)
}
return nil
default:
return fmt.Errorf("%s %s %s", b.Name, stream.UnhandledMessage, string(respRaw))
}
if wsStream, ok := multiStreamData["stream"].(string); ok {
streamType := strings.Split(wsStream, "@")
if len(streamType) > 1 {
if data, ok := multiStreamData["data"]; ok {
rawData, err := json.Marshal(data)
if err != nil {
return err
}
pairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return err
}
format, err := b.GetPairFormat(asset.Spot, true)
if err != nil {
return err
}
switch streamType[1] {
case "trade":
saveTradeData := b.IsSaveTradeDataEnabled()
if !saveTradeData &&
!b.IsTradeFeedEnabled() {
return nil
}
var t TradeStream
err := json.Unmarshal(rawData, &t)
if err != nil {
return fmt.Errorf("%v - Could not unmarshal trade data: %s",
b.Name,
err)
}
price, err := strconv.ParseFloat(t.Price, 64)
if err != nil {
return fmt.Errorf("%v - price conversion error: %s",
b.Name,
err)
}
amount, err := strconv.ParseFloat(t.Quantity, 64)
if err != nil {
return fmt.Errorf("%v - amount conversion error: %s",
b.Name,
err)
}
pair, err := currency.NewPairFromFormattedPairs(t.Symbol, pairs, format)
if err != nil {
return err
}
return b.Websocket.Trade.Update(saveTradeData,
trade.Data{
CurrencyPair: pair,
Timestamp: t.TimeStamp,
Price: price,
Amount: amount,
Exchange: b.Name,
AssetType: asset.Spot,
TID: strconv.FormatInt(t.TradeID, 10),
})
case "ticker":
var t TickerStream
err := json.Unmarshal(rawData, &t)
if err != nil {
return fmt.Errorf("%v - Could not convert to a TickerStream structure %s",
b.Name,
err.Error())
}
pair, err := currency.NewPairFromFormattedPairs(t.Symbol, pairs, format)
if err != nil {
return err
}
b.Websocket.DataHandler <- &ticker.Price{
ExchangeName: b.Name,
Open: t.OpenPrice,
Close: t.ClosePrice,
Volume: t.TotalTradedVolume,
QuoteVolume: t.TotalTradedQuoteVolume,
High: t.HighPrice,
Low: t.LowPrice,
Bid: t.BestBidPrice,
Ask: t.BestAskPrice,
Last: t.LastPrice,
LastUpdated: t.EventTime,
AssetType: asset.Spot,
Pair: pair,
}
return nil
case "kline_1m", "kline_3m", "kline_5m", "kline_15m", "kline_30m", "kline_1h", "kline_2h", "kline_4h",
"kline_6h", "kline_8h", "kline_12h", "kline_1d", "kline_3d", "kline_1w", "kline_1M":
var kline KlineStream
err := json.Unmarshal(rawData, &kline)
if err != nil {
return fmt.Errorf("%v - Could not convert to a KlineStream structure %s",
b.Name,
err)
}
pair, err := currency.NewPairFromFormattedPairs(kline.Symbol, pairs, format)
if err != nil {
return err
}
b.Websocket.DataHandler <- stream.KlineData{
Timestamp: kline.EventTime,
Pair: pair,
AssetType: asset.Spot,
Exchange: b.Name,
StartTime: kline.Kline.StartTime,
CloseTime: kline.Kline.CloseTime,
Interval: kline.Kline.Interval,
OpenPrice: kline.Kline.OpenPrice,
ClosePrice: kline.Kline.ClosePrice,
HighPrice: kline.Kline.HighPrice,
LowPrice: kline.Kline.LowPrice,
Volume: kline.Kline.Volume,
}
return nil
case "depth":
var depth WebsocketDepthStream
err := json.Unmarshal(rawData, &depth)
if err != nil {
return fmt.Errorf("%v - Could not convert to depthStream structure %s",
b.Name,
err)
}
init, err := b.UpdateLocalBuffer(&depth)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache error: %s",
b.Name,
err)
}
return nil
default:
b.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: b.Name + stream.UnhandledMessage + string(respRaw),
}
}
}
}
}
return fmt.Errorf("unhandled stream data %s", string(respRaw))
}
func stringToOrderStatus(status string) (order.Status, error) {
@@ -517,35 +481,22 @@ func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBoo
// UpdateLocalBuffer updates and returns the most recent iteration of the orderbook
func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) {
enabledPairs, err := b.GetEnabledPairs(asset.Spot)
pair, err := b.MatchSymbolWithAvailablePairs(wsdp.Pair, asset.Spot, false)
if err != nil {
return false, err
}
format, err := b.GetPairFormat(asset.Spot, true)
err = b.obm.stageWsUpdate(wsdp, pair, asset.Spot)
if err != nil {
return false, err
}
currencyPair, err := currency.NewPairFromFormattedPairs(wsdp.Pair,
enabledPairs,
format)
if err != nil {
return false, err
}
err = b.obm.stageWsUpdate(wsdp, currencyPair, asset.Spot)
if err != nil {
init, err2 := b.obm.checkIsInitialSync(currencyPair)
init, err2 := b.obm.checkIsInitialSync(pair)
if err2 != nil {
return false, err2
}
return init, err
}
err = b.applyBufferUpdate(currencyPair)
err = b.applyBufferUpdate(pair)
if err != nil {
b.flushAndCleanup(currencyPair)
b.flushAndCleanup(pair)
}
return false, err
@@ -692,46 +643,18 @@ func (b *Binance) unsubscribeFromChan(chans []subscription.Subscription) error {
func (b *Binance) ProcessUpdate(cp currency.Pair, a asset.Item, ws *WebsocketDepthStream) error {
updateBid := make([]orderbook.Item, len(ws.UpdateBids))
for i := range ws.UpdateBids {
price, ok := ws.UpdateBids[i][0].(string)
if !ok {
return errors.New("type assertion failed for bid price")
updateBid[i] = orderbook.Item{
Price: ws.UpdateBids[i][0].Float64(),
Amount: ws.UpdateBids[i][1].Float64(),
}
p, err := strconv.ParseFloat(price, 64)
if err != nil {
return err
}
amount, ok := ws.UpdateBids[i][1].(string)
if !ok {
return errors.New("type assertion failed for bid amount")
}
a, err := strconv.ParseFloat(amount, 64)
if err != nil {
return err
}
updateBid[i] = orderbook.Item{Price: p, Amount: a}
}
updateAsk := make([]orderbook.Item, len(ws.UpdateAsks))
for i := range ws.UpdateAsks {
price, ok := ws.UpdateAsks[i][0].(string)
if !ok {
return errors.New("type assertion failed for ask price")
updateAsk[i] = orderbook.Item{
Price: ws.UpdateAsks[i][0].Float64(),
Amount: ws.UpdateAsks[i][1].Float64(),
}
p, err := strconv.ParseFloat(price, 64)
if err != nil {
return err
}
amount, ok := ws.UpdateAsks[i][1].(string)
if !ok {
return errors.New("type assertion failed for ask amount")
}
a, err := strconv.ParseFloat(amount, 64)
if err != nil {
return err
}
updateAsk[i] = orderbook.Item{Price: p, Amount: a}
}
return b.Websocket.Orderbook.Update(&orderbook.Update{
Bids: updateBid,
Asks: updateAsk,

View File

@@ -0,0 +1,8 @@
{"stream":"btcusdt@ticker","data":{"e":"24hrTicker","E":1580254809477,"s":"BTCUSDT","p":"420.97000000","P":"4.720","w":"9058.27981278","x":"8917.98000000","c":"9338.96000000","Q":"0.17246300","b":"9338.03000000","B":"0.18234600","a":"9339.70000000","A":"0.14097600","o":"8917.99000000","h":"9373.19000000","l":"8862.40000000","v":"72229.53692000","q":"654275356.16896672","O":1580168409456,"C":1580254809456,"F":235294268,"L":235894703,"n":600436}}
{"stream":"btcusdt@kline_1m","data":{"e": "kline","E": 123456789,"s": "BTCUSDT","k": {"t": 123400000,"T": 123460000,"s": "BTCUSDT","i": "1m","f": 100,"L": 200,"o": "0.0010","c": "0.0020","h": "0.0025","l": "0.0015","v": "1000","n": 100,"x": false,"q": "1.0000","V": "500","Q": "0.500","B": "123456"}}}
{"stream":"btcusdt@trade","data":{"e": "trade","E": 123456789,"s": "BTCUSDT","t": 12345,"p": "0.001","q": "100","b": 88,"a": 50,"T": 123456785,"m": true,"M": true}}
{"stream":"btcusdt@depth","data":{"e": "depthUpdate","E": 123456788,"s": "BTCUSDT","U": 157,"u": 160,"b": [["6621.45", "0.3"]],"a": [["6622.46", "1.5"]]}}
{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e": "balanceUpdate","E": 1573200697110,"a": "BTC","d": "100.00000000","T": 1573200697068}}
{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e": "listStatus","E": 1564035303637,"s": "ETHBTC","g": 2,"c": "OCO","l": "EXEC_STARTED","L": "EXECUTING","r": "NONE","C": "F4QN4G8DlFATFlIUQ0cjdD","T": 1564035303625,"O": [{"s": "ETHBTC","i": 17,"c": "AJYsMjErWJesZvqlJCTUgL"},{"s": "ETHBTC","i": 18,"c": "bfYPSQdLoqAJeNrOr9adzq"}]}}
{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"executionReport","E":1616627567900,"s":"BTCUSDT","c":"c4wyKsIhoAaittTYlIVLqk","S":"BUY","o":"LIMIT","f":"GTC","q":"0.00028400","p":"52789.10000000","P":"0.00000000","F":"0.00000000","g":-1,"C":"","x":"NEW","X":"NEW","r":"NONE","i":5340845958,"l":"0.00000000","z":"0.00000000","L":"0.00000000","n":"0","N":"BTC","T":1616627567900,"t":-1,"I":11388173160,"w":true,"m":false,"M":false,"O":1616627567900,"Z":"0.00000000","Y":"0.00000000","Q":"0.00000000","W":1616627567900}}
{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"outboundAccountPosition","E":1616628815745,"u":1616628815745,"B":[{"a":"BTC","f":"0.00225109","l":"0.00123000"},{"a":"BNB","f":"0.00000000","l":"0.00000000"},{"a":"USDT","f":"54.43390661","l":"0.00000000"}]}}