Binance: Fix websocket reponse unmarshal bug (#651)

* binance: Fix websocket reponse unmarshal, fix field type when not null and add test replicating what I received from endpoint.

* Binancce: RMLINE AND COMMENT

* return error on any uncaptured data
This commit is contained in:
Ryan O'Hara-Reid
2021-03-25 16:51:06 +11:00
committed by GitHub
parent 881bab2d5a
commit fe3d0e9ed1
4 changed files with 183 additions and 177 deletions

View File

@@ -2012,68 +2012,6 @@ func TestWSUnsubscriptionHandling(t *testing.T) {
}
}
func TestWsOrderUpdateHandling(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{
"e": "executionReport",
"E": 1499405658658,
"s": "BTCUSDT",
"c": "mUvoqJxFIILMdfAW5iGSOW",
"S": "BUY",
"o": "LIMIT",
"f": "GTC",
"q": "1.00000000",
"p": "0.10264410",
"P": "0.00000000",
"F": "0.00000000",
"g": -1,
"C": null,
"x": "NEW",
"X": "NEW",
"r": "NONE",
"i": 4293153,
"l": "0.00000000",
"z": "0.00000000",
"L": "0.00000000",
"n": "0",
"N": null,
"T": 1499405658657,
"t": -1,
"I": 8641984,
"w": true,
"m": false,
"M": false,
"O": 1499405658657,
"Z": "0.00000000",
"Y": "0.00000000",
"Q": "0.00000000"
}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
}
}
func TestWsOutboundAccountPosition(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{
"e": "outboundAccountPosition",
"E": 1564034571105,
"u": 1564034571073,
"B": [
{
"a": "ETH",
"f": "10000.000000",
"l": "0.000000"
}
]
}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
}
}
func TestWsTickerUpdate(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{"stream":"btcusdt@ticker","data":{"e":"24hrTicker","E":1580254809477,"s":"BTCUSDT","p":"420.97000000","P":"4.720","w":"9058.27981278","x":"8917.98000000","c":"9338.96000000","Q":"0.17246300","b":"9338.03000000","B":"0.18234600","a":"9339.70000000","A":"0.14097600","o":"8917.99000000","h":"9373.19000000","l":"8862.40000000","v":"72229.53692000","q":"654275356.16896672","O":1580168409456,"C":1580254809456,"F":235294268,"L":235894703,"n":600436}}`)
@@ -2237,13 +2175,13 @@ func TestWsDepthUpdate(t *testing.T) {
func TestWsBalanceUpdate(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{
pressXToJSON := []byte(`{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{
"e": "balanceUpdate",
"E": 1573200697110,
"a": "BTC",
"d": "100.00000000",
"T": 1573200697068
}`)
}}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
@@ -2252,7 +2190,7 @@ func TestWsBalanceUpdate(t *testing.T) {
func TestWsOCO(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{
pressXToJSON := []byte(`{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{
"e": "listStatus",
"E": 1564035303637,
"s": "ETHBTC",
@@ -2275,7 +2213,7 @@ func TestWsOCO(t *testing.T) {
"c": "bfYPSQdLoqAJeNrOr9adzq"
}
]
}`)
}}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
@@ -2490,7 +2428,6 @@ func TestSetExchangeOrderExecutionLimits(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = b.UpdateOrderExecutionLimits(asset.CoinMarginedFutures)
if err != nil {
t.Fatal(err)
@@ -2530,3 +2467,27 @@ func TestSetExchangeOrderExecutionLimits(t *testing.T) {
t.Fatalf("expected %v, but receieved %v", order.ErrPriceBelowMin, err)
}
}
func TestWsOrderExecutionReport(t *testing.T) {
t.Parallel()
payload := []byte(`{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"executionReport","E":1616627567900,"s":"BTCUSDT","c":"c4wyKsIhoAaittTYlIVLqk","S":"BUY","o":"LIMIT","f":"GTC","q":"0.00028400","p":"52789.10000000","P":"0.00000000","F":"0.00000000","g":-1,"C":"","x":"NEW","X":"NEW","r":"NONE","i":5340845958,"l":"0.00000000","z":"0.00000000","L":"0.00000000","n":"0","N":null,"T":1616627567900,"t":-1,"I":11388173160,"w":true,"m":false,"M":false,"O":1616627567900,"Z":"0.00000000","Y":"0.00000000","Q":"0.00000000"}}`)
err := b.wsHandleData(payload)
if err != nil {
t.Fatal(err)
}
payload = []byte(`{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"executionReport","E":1616633041556,"s":"BTCUSDT","c":"YeULctvPAnHj5HXCQo9Mob","S":"BUY","o":"LIMIT","f":"GTC","q":"0.00028600","p":"52436.85000000","P":"0.00000000","F":"0.00000000","g":-1,"C":"","x":"TRADE","X":"FILLED","r":"NONE","i":5341783271,"l":"0.00028600","z":"0.00028600","L":"52436.85000000","n":"0.00000029","N":"BTC","T":1616633041555,"t":726946523,"I":11390206312,"w":false,"m":false,"M":true,"O":1616633041555,"Z":"14.99693910","Y":"14.99693910","Q":"0.00000000"}}`)
err = b.wsHandleData(payload)
if err != nil {
t.Fatal(err)
}
}
func TestWsOutboundAccountPosition(t *testing.T) {
t.Parallel()
payload := []byte(`{"stream":"jTfvpakT2yT0hVIo5gYWVihZhdM2PrBgJUZ5PyfZ4EVpCkx4Uoxk5timcrQc","data":{"e":"outboundAccountPosition","E":1616628815745,"u":1616628815745,"B":[{"a":"BTC","f":"0.00225109","l":"0.00123000"},{"a":"BNB","f":"0.00000000","l":"0.00000000"},{"a":"USDT","f":"54.43390661","l":"0.00000000"}]}}`)
err := b.wsHandleData(payload)
if err != nil {
t.Fatal(err)
}
}

View File

@@ -148,27 +148,30 @@ type TradeStream struct {
// KlineStream holds the kline stream data
type KlineStream struct {
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
Kline struct {
StartTime time.Time `json:"t"`
CloseTime time.Time `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
OpenPrice float64 `json:"o,string"`
ClosePrice float64 `json:"c,string"`
HighPrice float64 `json:"h,string"`
LowPrice float64 `json:"l,string"`
Volume float64 `json:"v,string"`
NumberOfTrades int64 `json:"n"`
KlineClosed bool `json:"x"`
Quote float64 `json:"q,string"`
TakerBuyBaseAssetVolume float64 `json:"V,string"`
TakerBuyQuoteAssetVolume float64 `json:"Q,string"`
} `json:"k"`
EventType string `json:"e"`
EventTime time.Time `json:"E"`
Symbol string `json:"s"`
Kline KlineStreamData `json:"k"`
}
// KlineStreamData defines kline streaming data
type KlineStreamData struct {
StartTime time.Time `json:"t"`
CloseTime time.Time `json:"T"`
Symbol string `json:"s"`
Interval string `json:"i"`
FirstTradeID int64 `json:"f"`
LastTradeID int64 `json:"L"`
OpenPrice float64 `json:"o,string"`
ClosePrice float64 `json:"c,string"`
HighPrice float64 `json:"h,string"`
LowPrice float64 `json:"l,string"`
Volume float64 `json:"v,string"`
NumberOfTrades int64 `json:"n"`
KlineClosed bool `json:"x"`
Quote float64 `json:"q,string"`
TakerBuyBaseAssetVolume float64 `json:"V,string"`
TakerBuyQuoteAssetVolume float64 `json:"Q,string"`
}
// TickerStream holds the ticker stream data
@@ -649,106 +652,121 @@ type UserAccountStream struct {
}
type wsAccountInfo struct {
Stream string `json:"stream"`
Data struct {
CanDeposit bool `json:"D"`
CanTrade bool `json:"T"`
CanWithdraw bool `json:"W"`
EventTime time.Time `json:"E"`
LastUpdated time.Time `json:"u"`
BuyerCommission float64 `json:"b"`
MakerCommission float64 `json:"m"`
SellerCommission float64 `json:"s"`
TakerCommission float64 `json:"t"`
EventType string `json:"e"`
Currencies []struct {
Asset string `json:"a"`
Available float64 `json:"f,string"`
Locked float64 `json:"l,string"`
} `json:"B"`
} `json:"data"`
Stream string `json:"stream"`
Data WsAccountInfoData `json:"data"`
}
// WsAccountInfoData defines websocket account info data
type WsAccountInfoData struct {
CanDeposit bool `json:"D"`
CanTrade bool `json:"T"`
CanWithdraw bool `json:"W"`
EventTime time.Time `json:"E"`
LastUpdated time.Time `json:"u"`
BuyerCommission float64 `json:"b"`
MakerCommission float64 `json:"m"`
SellerCommission float64 `json:"s"`
TakerCommission float64 `json:"t"`
EventType string `json:"e"`
Currencies []struct {
Asset string `json:"a"`
Available float64 `json:"f,string"`
Locked float64 `json:"l,string"`
} `json:"B"`
}
type wsAccountPosition struct {
Stream string `json:"stream"`
Data struct {
Currencies []struct {
Asset string `json:"a"`
Available float64 `json:"f,string"`
Locked float64 `json:"l,string"`
} `json:"B"`
EventTime time.Time `json:"E"`
LastUpdated time.Time `json:"u"`
EventType string `json:"e"`
} `json:"data"`
Stream string `json:"stream"`
Data WsAccountPositionData `json:"data"`
}
// WsAccountPositionData defines websocket account position data
type WsAccountPositionData struct {
Currencies []struct {
Asset string `json:"a"`
Available float64 `json:"f,string"`
Locked float64 `json:"l,string"`
} `json:"B"`
EventTime time.Time `json:"E"`
LastUpdated time.Time `json:"u"`
EventType string `json:"e"`
}
type wsBalanceUpdate struct {
Stream string `json:"stream"`
Data struct {
EventTime time.Time `json:"E"`
ClearTime time.Time `json:"T"`
BalanceDelta float64 `json:"d,string"`
Asset string `json:"a"`
EventType string `json:"e"`
} `json:"data"`
Stream string `json:"stream"`
Data WsBalanceUpdateData `json:"data"`
}
// WsBalanceUpdateData defines websocket account balance data
type WsBalanceUpdateData struct {
EventTime time.Time `json:"E"`
ClearTime time.Time `json:"T"`
BalanceDelta float64 `json:"d,string"`
Asset string `json:"a"`
EventType string `json:"e"`
}
type wsOrderUpdate struct {
Stream string `json:"stream"`
Data struct {
ClientOrderID string `json:"C"`
EventTime time.Time `json:"E"`
IcebergQuantity float64 `json:"F,string"`
LastExecutedPrice float64 `json:"L,string"`
CommissionAsset float64 `json:"N"`
OrderCreationTime time.Time `json:"O"`
StopPrice float64 `json:"P,string"`
QuoteOrderQuantity float64 `json:"Q,string"`
Side string `json:"S"`
TransactionTime time.Time `json:"T"`
OrderStatus string `json:"X"`
LastQuoteAssetTransactedQuantity float64 `json:"Y,string"`
CumulativeQuoteTransactedQuantity float64 `json:"Z,string"`
CancelledClientOrderID string `json:"c"`
EventType string `json:"e"`
TimeInForce string `json:"f"`
OrderListID int64 `json:"g"`
OrderID int64 `json:"i"`
LastExecutedQuantity float64 `json:"l,string"`
IsMaker bool `json:"m"`
Commission float64 `json:"n,string"`
OrderType string `json:"o"`
Price float64 `json:"p,string"`
Quantity float64 `json:"q,string"`
RejectionReason string `json:"r"`
Symbol string `json:"s"`
TradeID int64 `json:"t"`
IsOnOrderBook bool `json:"w"`
CurrentExecutionType string `json:"x"`
CumulativeFilledQuantity float64 `json:"z,string"`
} `json:"data"`
Stream string `json:"stream"`
Data WsOrderUpdateData `json:"data"`
}
// WsOrderUpdateData defines websocket account order update data
type WsOrderUpdateData struct {
ClientOrderID string `json:"C"`
EventTime time.Time `json:"E"`
IcebergQuantity float64 `json:"F,string"`
LastExecutedPrice float64 `json:"L,string"`
CommissionAsset string `json:"N"`
OrderCreationTime time.Time `json:"O"`
StopPrice float64 `json:"P,string"`
QuoteOrderQuantity float64 `json:"Q,string"`
Side string `json:"S"`
TransactionTime time.Time `json:"T"`
OrderStatus string `json:"X"`
LastQuoteAssetTransactedQuantity float64 `json:"Y,string"`
CumulativeQuoteTransactedQuantity float64 `json:"Z,string"`
CancelledClientOrderID string `json:"c"`
EventType string `json:"e"`
TimeInForce string `json:"f"`
OrderListID int64 `json:"g"`
OrderID int64 `json:"i"`
LastExecutedQuantity float64 `json:"l,string"`
IsMaker bool `json:"m"`
Commission float64 `json:"n,string"`
OrderType string `json:"o"`
Price float64 `json:"p,string"`
Quantity float64 `json:"q,string"`
RejectionReason string `json:"r"`
Symbol string `json:"s"`
TradeID int64 `json:"t"`
IsOnOrderBook bool `json:"w"`
CurrentExecutionType string `json:"x"`
CumulativeFilledQuantity float64 `json:"z,string"`
}
type wsListStatus struct {
Stream string `json:"stream"`
Data struct {
ListClientOrderID string `json:"C"`
EventTime time.Time `json:"E"`
ListOrderStatus string `json:"L"`
Orders []struct {
ClientOrderID string `json:"c"`
OrderID int64 `json:"i"`
Symbol string `json:"s"`
} `json:"O"`
TransactionTime time.Time `json:"T"`
ContingencyType string `json:"c"`
EventType string `json:"e"`
OrderListID int64 `json:"g"`
ListStatusType string `json:"l"`
RejectionReason string `json:"r"`
Symbol string `json:"s"`
} `json:"data"`
Stream string `json:"stream"`
Data WsListStatusData `json:"data"`
}
// WsListStatusData defines websocket account listing status data
type WsListStatusData struct {
ListClientOrderID string `json:"C"`
EventTime time.Time `json:"E"`
ListOrderStatus string `json:"L"`
Orders []struct {
ClientOrderID string `json:"c"`
OrderID int64 `json:"i"`
Symbol string `json:"s"`
} `json:"O"`
TransactionTime time.Time `json:"T"`
ContingencyType string `json:"c"`
EventType string `json:"e"`
OrderListID int64 `json:"g"`
ListStatusType string `json:"l"`
RejectionReason string `json:"r"`
Symbol string `json:"s"`
}
// WsPayload defines the payload through the websocket connection

View File

@@ -149,6 +149,13 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
if err != nil {
return err
}
if r, ok := multiStreamData["result"]; ok {
if r == nil {
return nil
}
}
if method, ok := multiStreamData["method"].(string); ok {
// TODO handle subscription handling
if strings.EqualFold(method, "subscribe") {
@@ -170,6 +177,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}
b.Websocket.DataHandler <- data
return nil
case "outboundAccountPosition":
var data wsAccountPosition
err := json.Unmarshal(respRaw, &data)
@@ -179,6 +187,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}
b.Websocket.DataHandler <- data
return nil
case "balanceUpdate":
var data wsBalanceUpdate
err := json.Unmarshal(respRaw, &data)
@@ -188,6 +197,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}
b.Websocket.DataHandler <- data
return nil
case "executionReport":
var data wsOrderUpdate
err := json.Unmarshal(respRaw, &data)
@@ -243,6 +253,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
Date: data.Data.OrderCreationTime,
Pair: p,
}
return nil
case "listStatus":
var data wsListStatus
err := json.Unmarshal(respRaw, &data)
@@ -252,6 +263,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}
b.Websocket.DataHandler <- data
return nil
}
}
}
@@ -344,6 +356,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
AssetType: asset.Spot,
Pair: pair,
}
return nil
case "kline_1m", "kline_3m", "kline_5m", "kline_15m", "kline_30m", "kline_1h", "kline_2h", "kline_4h",
"kline_6h", "kline_8h", "kline_12h", "kline_1d", "kline_3d", "kline_1w", "kline_1M":
var kline KlineStream
@@ -373,6 +386,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
LowPrice: kline.Kline.LowPrice,
Volume: kline.Kline.Volume,
}
return nil
case "depth":
var depth WebsocketDepthStream
err := json.Unmarshal(rawData, &depth)
@@ -391,6 +405,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
b.Name,
err)
}
return nil
default:
b.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: b.Name + stream.UnhandledMessage + string(respRaw),
@@ -399,7 +414,7 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
}
}
}
return nil
return fmt.Errorf("unhandled stream data %s", string(respRaw))
}
func stringToOrderStatus(status string) (order.Status, error) {

View File

@@ -109,6 +109,7 @@ func (a *KlineStream) UnmarshalJSON(data []byte) error {
Kline struct {
StartTime binanceTime `json:"t"`
CloseTime binanceTime `json:"T"`
*KlineStreamData
} `json:"k"`
*Alias
}{
@@ -117,6 +118,7 @@ func (a *KlineStream) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Kline = *aux.Kline.KlineStreamData
a.EventTime = aux.EventTime.Time()
a.Kline.StartTime = aux.Kline.StartTime.Time()
a.Kline.CloseTime = aux.Kline.CloseTime.Time()
@@ -322,6 +324,7 @@ func (a *wsAccountInfo) UnmarshalJSON(data []byte) error {
Data struct {
EventTime binanceTime `json:"E"`
LastUpdated binanceTime `json:"u"`
*WsAccountInfoData
} `json:"data"`
*Alias
}{
@@ -330,6 +333,7 @@ func (a *wsAccountInfo) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Data = *aux.Data.WsAccountInfoData
a.Data.EventTime = aux.Data.EventTime.Time()
a.Data.LastUpdated = aux.Data.LastUpdated.Time()
return nil
@@ -342,6 +346,7 @@ func (a *wsAccountPosition) UnmarshalJSON(data []byte) error {
Data struct {
EventTime binanceTime `json:"E"`
LastUpdated binanceTime `json:"u"`
*WsAccountPositionData
} `json:"data"`
*Alias
}{
@@ -350,6 +355,7 @@ func (a *wsAccountPosition) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Data = *aux.Data.WsAccountPositionData
a.Data.EventTime = aux.Data.EventTime.Time()
a.Data.LastUpdated = aux.Data.LastUpdated.Time()
return nil
@@ -362,6 +368,7 @@ func (a *wsBalanceUpdate) UnmarshalJSON(data []byte) error {
Data struct {
EventTime binanceTime `json:"E"`
ClearTime binanceTime `json:"T"`
*WsBalanceUpdateData
} `json:"data"`
*Alias
}{
@@ -370,6 +377,7 @@ func (a *wsBalanceUpdate) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Data = *aux.Data.WsBalanceUpdateData
a.Data.EventTime = aux.Data.EventTime.Time()
a.Data.ClearTime = aux.Data.ClearTime.Time()
return nil
@@ -383,6 +391,7 @@ func (a *wsOrderUpdate) UnmarshalJSON(data []byte) error {
EventTime binanceTime `json:"E"`
OrderCreationTime binanceTime `json:"O"`
TransactionTime binanceTime `json:"T"`
*WsOrderUpdateData
} `json:"data"`
*Alias
}{
@@ -391,6 +400,7 @@ func (a *wsOrderUpdate) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Data = *aux.Data.WsOrderUpdateData
a.Data.EventTime = aux.Data.EventTime.Time()
a.Data.OrderCreationTime = aux.Data.OrderCreationTime.Time()
a.Data.TransactionTime = aux.Data.TransactionTime.Time()
@@ -404,6 +414,7 @@ func (a *wsListStatus) UnmarshalJSON(data []byte) error {
Data struct {
EventTime binanceTime `json:"E"`
TransactionTime binanceTime `json:"T"`
*WsListStatusData
} `json:"data"`
*Alias
}{
@@ -412,6 +423,7 @@ func (a *wsListStatus) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
a.Data = *aux.Data.WsListStatusData
a.Data.EventTime = aux.Data.EventTime.Time()
a.Data.TransactionTime = aux.Data.TransactionTime.Time()
return nil