diff --git a/exchanges/bitmex/bitmex_test.go b/exchanges/bitmex/bitmex_test.go index 0471dcd0..27ba9a2b 100644 --- a/exchanges/bitmex/bitmex_test.go +++ b/exchanges/bitmex/bitmex_test.go @@ -25,7 +25,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" + testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -830,7 +832,7 @@ func TestUpdateTradablePairs(t *testing.T) { func TestWsPositionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -838,7 +840,7 @@ func TestWsPositionUpdate(t *testing.T) { "riskValue":87960,"homeNotional":0.0008796,"posState":"Liquidation","maintMargin":263, "unrealisedGrossPnl":-677,"unrealisedPnl":-677,"unrealisedPnlPcnt":-0.0078,"unrealisedRoePcnt":-0.7756, "simpleQty":0.001,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:07:45.442Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -847,7 +849,7 @@ func TestWsPositionUpdate(t *testing.T) { func TestWsInsertExectuionUpdate(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"0193e879-cb6f-2891-d099-2c4eb40fee21", @@ -862,27 +864,7 @@ func TestWsInsertExectuionUpdate(t *testing.T) { "text":"Liquidation","trdMatchID":"7f4ab7f6-0006-3234-76f4-ae1385aad00f","execCost":88155,"execComm":66, "homeNotional":-0.00088155,"foreignNotional":1,"transactTime":"2017-04-04T22:07:46.035Z", "timestamp":"2017-04-04T22:07:46.035Z" - }]}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSConnectionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"info":"Welcome to the BitMEX Realtime API.","version":"1.1.0", - "timestamp":"2015-01-18T10:14:06.802Z","docs":"https://www.bitmex.com/app/wsAPI","heartbeatEnabled":false}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } -} - -func TestWSSubscriptionHandling(t *testing.T) { - t.Parallel() - pressXToJSON := []byte(`{"success":true,"subscribe":"trade:ETHUSD", - "request":{"op":"subscribe","args":["trade:ETHUSD","instrument:ETHUSD"]}}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -891,18 +873,18 @@ func TestWSSubscriptionHandling(t *testing.T) { func TestWSPositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":1, "markPrice":1136.88,"posState":"Liquidated","simpleQty":0.001,"liquidationPrice":1140.1,"bankruptPrice":1134.37, "timestamp":"2017-04-04T22:07:46.019Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -915,7 +897,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { "unrealisedPnlPcnt":0,"unrealisedRoePcnt":0,"simpleQty":0,"simpleCost":0,"simpleValue":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:07:46.140Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -924,7 +906,7 @@ func TestWSPositionUpdateHandling(t *testing.T) { func TestWSOrderbookHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ + pressXToJSON := []byte(`[0, "public", "public", { "table":"orderBookL2_25", "keys":["symbol","id","side"], "types":{"id":"long","price":"float","side":"symbol","size":"long","symbol":"symbol"}, @@ -938,76 +920,60 @@ func TestWSOrderbookHandling(t *testing.T) { {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":10,"price":50}, {"symbol":"ETHUSD","id":17999996000,"side":"Buy","size":20,"price":40}, {"symbol":"ETHUSD","id":17999997000,"side":"Buy","size":100,"price":30} - ] - }`) + ]}]`) err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":5,"timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"update", - "data":[ - ] - }`) + "data":[]}]`) err = b.wsHandleData(pressXToJSON) - if err == nil { - t.Error("Expected error") - } + require.ErrorContains(t, err, "empty orderbook") - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) - } + require.NoError(t, err) - pressXToJSON = []byte(`{ + pressXToJSON = []byte(`[0, "public", "public", { "table":"orderBookL2_25", "action":"delete", "data":[ {"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"} - ] - }`) + ]}]`) err = b.wsHandleData(pressXToJSON) - if !errors.Is(err, orderbook.ErrOrderbookInvalid) { - t.Error(err) - } + assert.ErrorIs(t, err, orderbook.ErrOrderbookInvalid) } func TestWSDeleveragePositionUpdateHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"position", + pressXToJSON := []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt","currentQty":2000, "markPrice":1160.72,"posState":"Deleverage","simpleQty":1.746,"liquidationPrice":1140.1, "timestamp":"2017-04-04T22:16:38.460Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } - pressXToJSON = []byte(`{"table":"position", + pressXToJSON = []byte(`[0, "public", "public", {"table":"position", "action":"update", "data":[{ "account":2,"symbol":"ETHUSD","currency":"XBt", @@ -1021,7 +987,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { "simpleQty":0,"simpleCost":0,"simpleValue":0,"simplePnl":0,"simplePnlPcnt":0,"avgCostPrice":null, "avgEntryPrice":null,"breakEvenPrice":null,"marginCallPrice":null,"liquidationPrice":null,"bankruptPrice":null, "timestamp":"2017-04-04T22:16:38.547Z" - }]}`) + }]}]`) err = b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1030,7 +996,7 @@ func TestWSDeleveragePositionUpdateHandling(t *testing.T) { func TestWSDeleverageExecutionInsertHandling(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"execution", + pressXToJSON := []byte(`[0, "public", "public", {"table":"execution", "action":"insert", "data":[{ "execID":"20ad1ff4-c110-a4f2-dd31-f94eaa0701fd", @@ -1045,7 +1011,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { "trdMatchID":"1e849b8a-7e88-3c67-a93f-cc654d40e8ba","execCost":172306000,"execComm":-43077, "homeNotional":-1.72306,"foreignNotional":2000,"transactTime":"2017-04-04T22:16:38.472Z", "timestamp":"2017-04-04T22:16:38.472Z" - }]}`) + }]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1054,7 +1020,7 @@ func TestWSDeleverageExecutionInsertHandling(t *testing.T) { func TestWsTrades(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}`) + pressXToJSON := []byte(`[0, "public", "public", {"table":"trade","action":"insert","data":[{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"MinusTick","trdMatchID":"c427f7a0-6b26-1e10-5c4e-1bd74daf2a73","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"95eb9155-b58c-70e9-44b7-34efe50302e0","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"e607c187-f25c-86bc-cb39-8afff7aaf2d9","grossValue":2583000,"homeNotional":0.9904912836767037,"foreignNotional":255.84389857369254},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":17,"price":258.3,"tickDirection":"ZeroMinusTick","trdMatchID":"0f076814-a57d-9a59-8063-ad6b823a80ac","grossValue":439110,"homeNotional":0.1683835182250396,"foreignNotional":43.49346275752773},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"MinusTick","trdMatchID":"f4ef3dfd-51c4-538f-37c1-e5071ba1c75d","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"81ef136b-8f4a-b1cf-78a8-fffbfa89bf40","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"65a87e8c-7563-34a4-d040-94e8513c5401","grossValue":2582500,"homeNotional":0.9904912836767037,"foreignNotional":255.79437400950872},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":15,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"1d11a74e-a157-3f33-036d-35a101fba50b","grossValue":387375,"homeNotional":0.14857369255150554,"foreignNotional":38.369156101426306},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":1,"price":258.25,"tickDirection":"ZeroMinusTick","trdMatchID":"40d49df1-f018-f66f-4ca5-31d4997641d7","grossValue":25825,"homeNotional":0.009904912836767036,"foreignNotional":2.5579437400950873},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"MinusTick","trdMatchID":"36135b51-73e5-c007-362b-a55be5830c6b","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"6ee19edb-99aa-3030-ba63-933ffb347ade","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":100,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"d44be603-cdb8-d676-e3e2-f91fb12b2a70","grossValue":2582000,"homeNotional":0.9904912836767037,"foreignNotional":255.7448494453249},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":5,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"a14b43b3-50b4-c075-c54d-dfb0165de33d","grossValue":129100,"homeNotional":0.04952456418383518,"foreignNotional":12.787242472266245},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":8,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"3c30e175-5194-320c-8f8c-01636c2f4a32","grossValue":206560,"homeNotional":0.07923930269413629,"foreignNotional":20.45958795562599},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":50,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"5b803378-760b-4919-21fc-bfb275d39ace","grossValue":1291000,"homeNotional":0.49524564183835185,"foreignNotional":127.87242472266244},{"timestamp":"2020-02-17T01:35:36.442Z","symbol":"ETHUSD","side":"Sell","size":244,"price":258.2,"tickDirection":"ZeroMinusTick","trdMatchID":"cf57fec1-c444-b9e5-5e2d-4fb643f4fdb7","grossValue":6300080,"homeNotional":2.416798732171157,"foreignNotional":624.0174326465927}]}]`) err := b.wsHandleData(pressXToJSON) if err != nil { t.Error(err) @@ -1357,3 +1323,53 @@ func TestGetCurrencyTradeURL(t *testing.T) { assert.NotEmpty(t, resp) } } + +func TestGenerateSubscriptions(t *testing.T) { + t.Parallel() + + b := new(Bitmex) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + + p := currency.Pairs{ + currency.NewPair(currency.ETH, currency.USD), + currency.NewPair(currency.BCH, currency.NewCode("Z19")), + } + + exp := subscription.List{ + {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[1].String(), Channel: bitmexWSOrderbookL2, Asset: asset.Futures, Pairs: p[1:2]}, + {QualifiedChannel: bitmexWSOrderbookL2 + ":" + p[0].String(), Channel: bitmexWSOrderbookL2, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSTrade + ":" + p[1].String(), Channel: bitmexWSTrade, Asset: asset.Futures, Pairs: p[1:2]}, + {QualifiedChannel: bitmexWSTrade + ":" + p[0].String(), Channel: bitmexWSTrade, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSAffiliate, Channel: bitmexWSAffiliate, Authenticated: true}, + {QualifiedChannel: bitmexWSOrder, Channel: bitmexWSOrder, Authenticated: true}, + {QualifiedChannel: bitmexWSMargin, Channel: bitmexWSMargin, Authenticated: true}, + {QualifiedChannel: bitmexWSTransact, Channel: bitmexWSTransact, Authenticated: true}, + {QualifiedChannel: bitmexWSWallet, Channel: bitmexWSWallet, Authenticated: true}, + {QualifiedChannel: bitmexWSExecution + ":" + p[0].String(), Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract, Pairs: p[:1]}, + {QualifiedChannel: bitmexWSPosition + ":" + p[0].String(), Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract, Pairs: p[:1]}, + } + + b.Websocket.SetCanUseAuthenticatedEndpoints(true) + subs, err := b.generateSubscriptions() + require.NoError(t, err, "generateSubscriptions must not error") + testsubs.EqualLists(t, exp, subs) +} + +func TestSubscribe(t *testing.T) { + t.Parallel() + b := new(Bitmex) + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + subs, err := b.generateSubscriptions() // Note: We grab this before it's overwritten by SetupWs + require.NoError(t, err, "generateSubscriptions must not error") + testexch.SetupWs(t, b) + err = b.Subscribe(subs) + require.NoError(t, err, "Subscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.SubscribedState, s.State(), "%s state should be subscribed", s.QualifiedChannel) + } + err = b.Unsubscribe(subs) + require.NoError(t, err, "Unsubscribe should not error") + for _, s := range subs { + assert.Equalf(t, subscription.UnsubscribedState, s.State(), "%s state should be unsusbscribed", s.QualifiedChannel) + } +} diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index c2c5c71f..c6297404 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -8,9 +8,12 @@ import ( "net/http" "strconv" "strings" + "text/template" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -24,7 +27,7 @@ import ( ) const ( - bitmexWSURL = "wss://www.bitmex.com/realtime" + bitmexWSURL = "wss://www.bitmex.com/realtimemd" // Public Subscription Channels bitmexWSAnnouncement = "announcement" @@ -66,9 +69,16 @@ const ( bitmexActionUpdateData = "update" ) -var subscriptionNames = map[string]string{ - subscription.OrderbookChannel: bitmexWSOrderbookL2, - subscription.AllTradesChannel: bitmexWSTrade, +var defaultSubscriptions = subscription.List{ + {Enabled: true, Channel: bitmexWSOrderbookL2, Asset: asset.All}, + {Enabled: true, Channel: bitmexWSTrade, Asset: asset.All}, + {Enabled: true, Channel: bitmexWSAffiliate, Authenticated: true}, + {Enabled: true, Channel: bitmexWSOrder, Authenticated: true}, + {Enabled: true, Channel: bitmexWSMargin, Authenticated: true}, + {Enabled: true, Channel: bitmexWSTransact, Authenticated: true}, + {Enabled: true, Channel: bitmexWSWallet, Authenticated: true}, + {Enabled: true, Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract}, + {Enabled: true, Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract}, } // WsConnect initiates a new websocket connection @@ -77,35 +87,20 @@ func (b *Bitmex) WsConnect() error { return stream.ErrWebsocketNotEnabled } var dialer websocket.Dialer - err := b.Websocket.Conn.Dial(&dialer, http.Header{}) - if err != nil { + if err := b.Websocket.Conn.Dial(&dialer, http.Header{}); err != nil { return err } - resp := b.Websocket.Conn.ReadMessage() - if resp.Raw == nil { - return errors.New("connection closed") - } - var welcomeResp WebsocketWelcome - err = json.Unmarshal(resp.Raw, &welcomeResp) - if err != nil { - return err - } - - if b.Verbose { - log.Debugf(log.ExchangeSys, - "Successfully connected to Bitmex %s at time: %s Limit: %d", - welcomeResp.Info, - welcomeResp.Timestamp, - welcomeResp.Limit.Remaining) - } - b.Websocket.Wg.Add(1) go b.wsReadData() + ctx := context.TODO() + if err := b.wsOpenStream(ctx, b.Websocket.Conn, wsPublicStream); err != nil { + return err + } + if b.Websocket.CanUseAuthenticatedEndpoints() { - err = b.websocketSendAuth(context.TODO()) - if err != nil { + if err := b.websocketSendAuth(ctx); err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", b.Name, err) } @@ -114,6 +109,31 @@ func (b *Bitmex) WsConnect() error { return nil } +const ( + wsPublicStream = "public" + wsPrivateStream = "private" + wsSubscribeOp = "subscribe" + wsUnsubscribeOp = "unsubscribe" + wsMsgPacket = 0 + wsOpenPacket = 1 + wsClosePacket = 2 +) + +func (b *Bitmex) wsOpenStream(ctx context.Context, c stream.Connection, name string) error { + resp, err := c.SendMessageReturnResponse(ctx, request.Unset, "open:"+name, []any{wsOpenPacket, name, name}) + if err != nil { + return err + } + var welcomeResp WebsocketWelcome + if err := json.Unmarshal(resp, &welcomeResp); err != nil { + return err + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "Successfully connected to Bitmex %s websocket API at time: %s Limit: %d", name, welcomeResp.Timestamp, welcomeResp.Limit.Remaining) + } + return nil +} + // wsReadData receives and passes on websocket messages for processing func (b *Bitmex) wsReadData() { defer b.Websocket.Wg.Done() @@ -131,338 +151,292 @@ func (b *Bitmex) wsReadData() { } func (b *Bitmex) wsHandleData(respRaw []byte) error { - quickCapture := make(map[string]interface{}) - err := json.Unmarshal(respRaw, &quickCapture) + var err error + msg, _, _, err := jsonparser.Get(respRaw, "[3]") if err != nil { - return err + return fmt.Errorf("unknown message format: %s", respRaw) + } + // We don't need to know about errors, since we're looking optimistically into the json + op, _ := jsonparser.GetString(msg, "request", "op") + errMsg, _ := jsonparser.GetString(msg, "error") + success, _ := jsonparser.GetBoolean(msg, "success") + version, _ := jsonparser.GetString(msg, "version") + switch { + case version != "": + op = "open" + fallthrough + case errMsg != "", success: + streamID, e2 := jsonparser.GetString(respRaw, "[1]") + if e2 != nil { + return fmt.Errorf("%w parsing stream", e2) + } + if !b.Websocket.Match.IncomingWithData(op+":"+streamID, msg) { + return fmt.Errorf("%w: %s:%s", stream.ErrNoMessageListener, op, streamID) + } + return nil } - var respError WebsocketErrorResponse - if _, ok := quickCapture["status"]; ok { - err = json.Unmarshal(respRaw, &respError) - if err != nil { + tableName, err := jsonparser.GetString(msg, "table") + if err != nil { + // Anything that's not a table isn't expected + return fmt.Errorf("unknown message format: %s", msg) + } + + switch tableName { + case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: + var orderbooks OrderBookData + if err := json.Unmarshal(msg, &orderbooks); err != nil { return err } - } + if len(orderbooks.Data) == 0 { + return fmt.Errorf("empty orderbook data received: %s", msg) + } - if _, ok := quickCapture["success"]; ok { - var decodedResp WebsocketSubscribeResp - err = json.Unmarshal(respRaw, &decodedResp) + pair, a, err := b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) if err != nil { return err } - if decodedResp.Success { - if len(quickCapture) == 3 { - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully subscribed to %s", - b.Name, decodedResp.Subscribe) - } - } else { - b.Websocket.SetCanUseAuthenticatedEndpoints(true) - if b.Verbose { - log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", - b.Name) - } + err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + if err != nil { + return err + } + case bitmexWSTrade: + if !b.IsSaveTradeDataEnabled() { + return nil + } + var tradeHolder TradeData + if err := json.Unmarshal(msg, &tradeHolder); err != nil { + return err + } + var trades []trade.Data + for i := range tradeHolder.Data { + if tradeHolder.Data[i].Price == 0 { + // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. + // These have a size of 0 and are used only to indicate a changing price. + continue } + p, a, err := b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) + if err != nil { + return err + } + oSide, err := order.StringToOrderSide(tradeHolder.Data[i].Side) + if err != nil { + return err + } + + trades = append(trades, trade.Data{ + TID: tradeHolder.Data[i].TrdMatchID, + Exchange: b.Name, + CurrencyPair: p, + AssetType: a, + Side: oSide, + Price: tradeHolder.Data[i].Price, + Amount: float64(tradeHolder.Data[i].Size), + Timestamp: tradeHolder.Data[i].Timestamp, + }) + } + return b.AddTradesToBuffer(trades...) + case bitmexWSAnnouncement: + var announcement AnnouncementData + if err := json.Unmarshal(msg, &announcement); err != nil { + return err + } + + if announcement.Action == bitmexActionInitialData { return nil } - b.Websocket.DataHandler <- fmt.Errorf("%s websocket error: Unable to subscribe %s", - b.Name, decodedResp.Subscribe) - } else if _, ok := quickCapture["table"]; ok { - var decodedResp WebsocketMainResponse - err = json.Unmarshal(respRaw, &decodedResp) - if err != nil { + b.Websocket.DataHandler <- announcement.Data + case bitmexWSAffiliate: + var response WsAffiliateResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSInstrument: + // ticker + case bitmexWSExecution: + // trades of an order + var response WsExecutionResponse + if err := json.Unmarshal(msg, &response); err != nil { return err } - switch decodedResp.Table { - case bitmexWSOrderbookL2, bitmexWSOrderbookL225, bitmexWSOrderbookL10: - var orderbooks OrderBookData - err = json.Unmarshal(respRaw, &orderbooks) - if err != nil { - return err - } - if len(orderbooks.Data) == 0 { - return fmt.Errorf("%s - Empty orderbook data received: %s", b.Name, respRaw) - } - var pair currency.Pair - var a asset.Item - pair, a, err = b.GetPairAndAssetTypeRequestFormatted(orderbooks.Data[0].Symbol) + for i := range response.Data { + p, a, err := b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) if err != nil { return err } - - err = b.processOrderbook(orderbooks.Data, orderbooks.Action, pair, a) + oStatus, err := order.StringToOrderStatus(response.Data[i].OrdStatus) if err != nil { - return err - } - case bitmexWSTrade: - if !b.IsSaveTradeDataEnabled() { - return nil - } - var tradeHolder TradeData - err = json.Unmarshal(respRaw, &tradeHolder) - if err != nil { - return err - } - var trades []trade.Data - for i := range tradeHolder.Data { - if tradeHolder.Data[i].Price == 0 { - // Please note that indices (symbols starting with .) post trades at intervals to the trade feed. - // These have a size of 0 and are used only to indicate a changing price. - continue + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, } - var p currency.Pair - var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(tradeHolder.Data[i].Symbol) + } + oSide, err := order.StringToOrderSide(response.Data[i].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + Err: err, + } + } + b.Websocket.DataHandler <- &order.Detail{ + Exchange: b.Name, + OrderID: response.Data[i].OrderID, + AccountID: strconv.FormatInt(response.Data[i].Account, 10), + AssetType: a, + Pair: p, + Status: oStatus, + Trades: []order.TradeHistory{ + { + Price: response.Data[i].Price, + Amount: response.Data[i].OrderQuantity, + Exchange: b.Name, + TID: response.Data[i].ExecID, + Side: oSide, + Timestamp: response.Data[i].Timestamp, + IsMaker: false, + }, + }, + } + } + case bitmexWSOrder: + var response WsOrderResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + switch response.Action { + case "update", "insert": + for x := range response.Data { + p, a, err := b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) if err != nil { return err } - var oSide order.Side - oSide, err = order.StringToOrderSide(tradeHolder.Data[i].Side) + oSide, err := order.StringToOrderSide(response.Data[x].Side) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, + OrderID: response.Data[x].OrderID, Err: err, } } - - trades = append(trades, trade.Data{ - TID: tradeHolder.Data[i].TrdMatchID, - Exchange: b.Name, - CurrencyPair: p, - AssetType: a, - Side: oSide, - Price: tradeHolder.Data[i].Price, - Amount: float64(tradeHolder.Data[i].Size), - Timestamp: tradeHolder.Data[i].Timestamp, - }) - } - return b.AddTradesToBuffer(trades...) - case bitmexWSAnnouncement: - var announcement AnnouncementData - err = json.Unmarshal(respRaw, &announcement) - if err != nil { - return err - } - - if announcement.Action == bitmexActionInitialData { - return nil - } - - b.Websocket.DataHandler <- announcement.Data - case bitmexWSAffiliate: - var response WsAffiliateResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSInstrument: - // ticker - case bitmexWSExecution: - // trades of an order - var response WsExecutionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - for i := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetPairAndAssetTypeRequestFormatted(response.Data[i].Symbol) - if err != nil { - return err - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[i].OrdStatus) + oType, err := order.StringToOrderType(response.Data[x].OrderType) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[i].Side) + oStatus, err := order.StringToOrderStatus(response.Data[x].OrderStatus) if err != nil { b.Websocket.DataHandler <- order.ClassificationError{ Exchange: b.Name, - OrderID: response.Data[i].OrderID, + OrderID: response.Data[x].OrderID, Err: err, } } b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, Exchange: b.Name, - OrderID: response.Data[i].OrderID, - AccountID: strconv.FormatInt(response.Data[i].Account, 10), - AssetType: a, - Pair: p, + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, Status: oStatus, - Trades: []order.TradeHistory{ - { - Price: response.Data[i].Price, - Amount: response.Data[i].OrderQuantity, - Exchange: b.Name, - TID: response.Data[i].ExecID, - Side: oSide, - Timestamp: response.Data[i].Timestamp, - IsMaker: false, - }, - }, + AssetType: a, + Date: response.Data[x].TransactTime, + Pair: p, } } - case bitmexWSOrder: - var response WsOrderResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - switch response.Action { - case "update", "insert": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, + case "delete": + for x := range response.Data { + p, a, err := b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) + if err != nil { + return err + } + var oSide order.Side + oSide, err = order.StringToOrderSide(response.Data[x].Side) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } } - case "delete": - for x := range response.Data { - var p currency.Pair - var a asset.Item - p, a, err = b.GetRequestFormattedPairAndAssetType(response.Data[x].Symbol) - if err != nil { - return err - } - var oSide order.Side - oSide, err = order.StringToOrderSide(response.Data[x].Side) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oType order.Type - oType, err = order.StringToOrderType(response.Data[x].OrderType) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - var oStatus order.Status - oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) - if err != nil { - b.Websocket.DataHandler <- order.ClassificationError{ - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - Err: err, - } - } - b.Websocket.DataHandler <- &order.Detail{ - Price: response.Data[x].Price, - Amount: response.Data[x].OrderQuantity, - Exchange: b.Name, - OrderID: response.Data[x].OrderID, - AccountID: strconv.FormatInt(response.Data[x].Account, 10), - Type: oType, - Side: oSide, - Status: oStatus, - AssetType: a, - Date: response.Data[x].TransactTime, - Pair: p, + var oType order.Type + oType, err = order.StringToOrderType(response.Data[x].OrderType) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, } } - default: - b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) + var oStatus order.Status + oStatus, err = order.StringToOrderStatus(response.Data[x].OrderStatus) + if err != nil { + b.Websocket.DataHandler <- order.ClassificationError{ + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + Err: err, + } + } + b.Websocket.DataHandler <- &order.Detail{ + Price: response.Data[x].Price, + Amount: response.Data[x].OrderQuantity, + Exchange: b.Name, + OrderID: response.Data[x].OrderID, + AccountID: strconv.FormatInt(response.Data[x].Account, 10), + Type: oType, + Side: oSide, + Status: oStatus, + AssetType: a, + Date: response.Data[x].TransactTime, + Pair: p, + } } - case bitmexWSMargin: - var response WsMarginResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSPosition: - var response WsPositionResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - - case bitmexWSPrivateNotifications: - var response WsPrivateNotificationsResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSTransact: - var response WsTransactResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response - case bitmexWSWallet: - var response WsWalletResponse - err = json.Unmarshal(respRaw, &response) - if err != nil { - return err - } - b.Websocket.DataHandler <- response default: - b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)} - return nil + b.Websocket.DataHandler <- fmt.Errorf("%s - Unsupported order update %+v", b.Name, response) } + case bitmexWSMargin: + var response WsMarginResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSPosition: + var response WsPositionResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + case bitmexWSPrivateNotifications: + var response WsPrivateNotificationsResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSTransact: + var response WsTransactResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + case bitmexWSWallet: + var response WsWalletResponse + if err := json.Unmarshal(msg, &response); err != nil { + return err + } + b.Websocket.DataHandler <- response + default: + b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(msg)} } + return nil } @@ -543,96 +517,66 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, p currency. return nil } -// generateSubscriptions returns Adds default subscriptions to websocket to be handled by ManageSubscriptions() +// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature func (b *Bitmex) generateSubscriptions() (subscription.List, error) { - authed := b.Websocket.CanUseAuthenticatedEndpoints() + return b.Features.Subscriptions.ExpandTemplates(b) +} - assetPairs := map[asset.Item]currency.Pairs{} - for _, a := range b.GetAssetTypes(true) { - p, err := b.GetEnabledPairs(a) - if err != nil { - return nil, err - } - f, err := b.GetPairFormat(a, true) - if err != nil { - return nil, err - } - assetPairs[a] = p.Format(f) - } - - subs := subscription.List{} - for _, baseSub := range b.Features.Subscriptions { - if !authed && baseSub.Authenticated { - continue - } - - if baseSub.Asset == asset.Empty { - // Skip pair handling for subs which don't have an asset - subs = append(subs, baseSub.Clone()) - continue - } - - for a, p := range assetPairs { - if baseSub.Channel == bitmexWSOrderbookL2 && a == asset.Index { - continue // There are no L2 orderbook for index assets - } - if baseSub.Asset != asset.All && baseSub.Asset != a { - continue - } - s := baseSub.Clone() - s.Asset = a - s.Pairs = p - subs = append(subs, s) - } - } - - return subs, nil +// GetSubscriptionTemplate returns a subscription channel template +func (b *Bitmex) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{ + "channelName": channelName, + }).Parse(subTplText) } // Subscribe subscribes to a websocket channel func (b *Bitmex) Subscribe(subs subscription.List) error { - req := WebsocketRequest{ - Command: "subscribe", - } - for _, s := range subs { - for _, p := range s.Pairs { - cName := channelName(s.Channel) - req.Arguments = append(req.Arguments, cName+":"+p.String()) - } - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, subs...) - } - return err + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsSubscribeOp, l, wsPrivateStream) }, len(subs)), + ) } // Unsubscribe sends a websocket message to stop receiving data from the channel func (b *Bitmex) Unsubscribe(subs subscription.List) error { - req := WebsocketRequest{ - Command: "unsubscribe", - } - - for _, s := range subs { - for _, p := range s.Pairs { - cName := channelName(s.Channel) - req.Arguments = append(req.Arguments, cName+":"+p.String()) - } - } - err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req) - if err == nil { - err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...) - } - return err + return common.AppendError( + b.ParallelChanOp(subs.Public(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPublicStream) }, len(subs)), + b.ParallelChanOp(subs.Private(), func(l subscription.List) error { return b.manageSubs(wsUnsubscribeOp, l, wsPrivateStream) }, len(subs)), + ) } -// channelName converts global channel Names used in config of channel input into bitmex channel names -// returns the name unchanged if no match is found -func channelName(name string) string { - if s, ok := subscriptionNames[name]; ok { - return s +func (b *Bitmex) manageSubs(op string, subs subscription.List, stream string) error { + req := WebsocketRequest{ + Command: op, } - return name + exp := map[string]*subscription.Subscription{} + for _, s := range subs { + req.Arguments = append(req.Arguments, s.QualifiedChannel) + exp[s.QualifiedChannel] = s + } + packet := []any{wsMsgPacket, stream, stream, req} + resps, errs := b.Websocket.Conn.SendMessageReturnResponses(context.TODO(), request.Unset, op+":"+stream, packet, len(subs)) + for _, resp := range resps { + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + errs = common.AppendError(errs, errors.New(errMsg)) + } else { + chanName, err := jsonparser.GetString(resp, op) + if err != nil { + errs = common.AppendError(errs, err) + } + s, ok := exp[chanName] + if !ok { + errs = common.AppendError(errs, fmt.Errorf("%w: %s", subscription.ErrNotFound, chanName)) + } else { + if op == wsSubscribeOp { + errs = common.AppendError(errs, b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)) + } else { + errs = common.AppendError(errs, b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)) + } + } + } + } + return errs } // WebsocketSendAuth sends an authenticated subscription @@ -641,26 +585,33 @@ func (b *Bitmex) websocketSendAuth(ctx context.Context) error { if err != nil { return err } - b.Websocket.SetCanUseAuthenticatedEndpoints(true) timestamp := time.Now().Add(time.Hour * 1).Unix() newTimestamp := strconv.FormatInt(timestamp, 10) - hmac, err := crypto.GetHMAC(crypto.HashSHA256, - []byte("GET/realtime"+newTimestamp), - []byte(creds.Secret)) + hmac, err := crypto.GetHMAC(crypto.HashSHA256, []byte("GET/realtime"+newTimestamp), []byte(creds.Secret)) if err != nil { return err } signature := crypto.HexEncodeToString(hmac) - var sendAuth WebsocketRequest - sendAuth.Command = "authKeyExpires" - sendAuth.Arguments = append(sendAuth.Arguments, creds.Key, timestamp, - signature) - err = b.Websocket.Conn.SendJSONMessage(ctx, request.Unset, sendAuth) + err = b.wsOpenStream(ctx, b.Websocket.Conn, wsPrivateStream) if err != nil { - b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err } + req := WebsocketRequest{ + Command: "authKeyExpires", + Arguments: []any{creds.Key, timestamp, signature}, + } + packet := []any{wsMsgPacket, wsPrivateStream, wsPrivateStream, req} + resp, err := b.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.Command+":"+wsPrivateStream, packet) + if err != nil { + return err + } + if errMsg, _ := jsonparser.GetString(resp, "error"); errMsg != "" { + return errors.New(errMsg) + } + if b.Verbose { + log.Debugf(log.ExchangeSys, "%s websocket: Successfully authenticated websocket connection", b.Name) + } return nil } @@ -678,3 +629,33 @@ func (b *Bitmex) GetActionFromString(s string) (orderbook.Action, error) { } return 0, fmt.Errorf("%s %w", s, orderbook.ErrInvalidAction) } + +// channelName returns the correct channel name for the asset +func channelName(s *subscription.Subscription, a asset.Item) string { + switch s.Channel { + case subscription.OrderbookChannel: + if a == asset.Index { + return "" // There are no L2 orderbook for index assets + } + return bitmexWSOrderbookL2 + case subscription.AllTradesChannel: + return bitmexWSTrade + } + return s.Channel +} + +const subTplText = ` +{{- if $.S.Asset }} + {{ range $asset, $pairs := $.AssetPairs }} + {{- with $name := channelName $.S $asset }} + {{- range $i, $p := $pairs -}} + {{- $name -}} : {{- $p -}} + {{ $.PairSeparator }} + {{- end }} + {{- end }} + {{ $.AssetSeparator }} + {{- end }} +{{- else -}} + {{ channelName $.S $.S.Asset }} +{{- end }} +` diff --git a/exchanges/bitmex/bitmex_websocket_types.go b/exchanges/bitmex/bitmex_websocket_types.go index 26a2a032..5ae86e38 100644 --- a/exchanges/bitmex/bitmex_websocket_types.go +++ b/exchanges/bitmex/bitmex_websocket_types.go @@ -1,11 +1,13 @@ package bitmex -import "time" +import ( + "time" +) // WebsocketRequest is the main request type type WebsocketRequest struct { - Command string `json:"op"` - Arguments []interface{} `json:"args"` + Command string `json:"op"` + Arguments []any `json:"args"` } // WebsocketErrorResponse main error response diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index 78a13945..eea24d81 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -28,7 +28,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" - "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -137,19 +136,7 @@ func (b *Bitmex) SetDefaults() { Enabled: exchange.FeaturesEnabled{ AutoPairUpdates: true, }, - Subscriptions: subscription.List{ - {Enabled: true, Channel: bitmexWSAnnouncement}, - {Enabled: true, Channel: bitmexWSOrderbookL2, Asset: asset.All}, - {Enabled: true, Channel: bitmexWSTrade, Asset: asset.All}, - {Enabled: true, Channel: bitmexWSAffiliate, Authenticated: true}, - {Enabled: true, Channel: bitmexWSOrder, Authenticated: true}, - {Enabled: true, Channel: bitmexWSMargin, Authenticated: true}, - {Enabled: true, Channel: bitmexWSPrivateNotifications, Authenticated: true}, - {Enabled: true, Channel: bitmexWSTransact, Authenticated: true}, - {Enabled: true, Channel: bitmexWSWallet, Authenticated: true}, - {Enabled: true, Channel: bitmexWSExecution, Authenticated: true, Asset: asset.PerpetualContract}, - {Enabled: true, Channel: bitmexWSPosition, Authenticated: true, Asset: asset.PerpetualContract}, - }, + Subscriptions: defaultSubscriptions.Clone(), } b.Requester, err = request.New(b.Name, diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 9676364c..9b0ede6b 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -58,7 +58,6 @@ var ( errSymbolCannotBeMatched = errors.New("symbol cannot be matched") errSetDefaultsNotCalled = errors.New("set defaults not called") errExchangeIsNil = errors.New("exchange is nil") - errBatchSizeZero = errors.New("batch size cannot be 0") ) // SetRequester sets the instance of the requester @@ -1811,9 +1810,6 @@ func (b *Base) GetOpenInterest(context.Context, ...key.PairAsset) ([]futures.Ope func (b *Base) ParallelChanOp(channels subscription.List, m func(subscription.List) error, batchSize int) error { wg := sync.WaitGroup{} errC := make(chan error, len(channels)) - if batchSize == 0 { - return errBatchSizeZero - } for _, b := range common.Batch(channels, batchSize) { wg.Add(1) diff --git a/testdata/configtest.json b/testdata/configtest.json index 30c83b61..d9c80bf8 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -852,26 +852,9 @@ "websocketOrderbookBufferLimit": 5, "baseCurrencies": "USD", "currencyPairs": { - "assetTypes": [ - "perpetualcontract", - "futures", - "downsideprofitcontract", - "upsideprofitcontract" - ], "pairs": { - "downsideprofitcontract": { - "enabled": "XBT7D_D95", - "available": "XBT7D_D95", - "requestFormat": { - "uppercase": true, - "delimiter": "_" - }, - "configFormat": { - "uppercase": true, - "delimiter": "_" - } - }, "futures": { + "assetEnabled": true, "enabled": "BCHZ19", "available": "XRPZ19,BCHZ19,ADAZ19,EOSZ19,TRXZ19,XBTZ19,ETHZ19,LTCZ19", "requestFormat": { @@ -882,6 +865,7 @@ } }, "perpetualcontract": { + "assetEnabled": true, "enabled": "ETHUSD", "available": "XBTUSD,ETHUSD", "requestFormat": { @@ -890,18 +874,6 @@ "configFormat": { "uppercase": true } - }, - "upsideprofitcontract": { - "enabled": "XBT7D_U105", - "available": "XBT7D_U105", - "requestFormat": { - "uppercase": true, - "delimiter": "_" - }, - "configFormat": { - "uppercase": true, - "delimiter": "_" - } } } }, @@ -934,7 +906,7 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": false + "websocketAPI": true } }, "bankAccounts": [ @@ -951,7 +923,13 @@ "iban": "", "supportedCurrencies": "" } - ] + ], + "orderbook": { + "verificationBypass": false, + "websocketBufferLimit": 5, + "websocketBufferEnabled": false, + "publishPeriod": 10000000000 + } }, { "name": "Bitstamp",