From 68a6f5828fd8fb2d5679d1482f801f8b4c2d99e4 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Fri, 2 Feb 2024 09:27:17 +0100 Subject: [PATCH] Binance: Subscribe/unsubscribe response handling (#1444) * Binance: Fix subscription failures ignored * Testing: Fix race on shared config singleton * Config: Privatise Global config var We should *either* use a private var *or* use an accessor, but it doesn't make sense to mix paradigms. Since GetConfig() is well established this instead removes the limited uses of direct public access and adds a Setter * Zip: Fix test failure on http mocks --- common/file/archive/zip_test.go | 53 +++----- config/config.go | 13 +- config/config_types.go | 9 +- engine/engine.go | 2 +- exchanges/binance/binance_live_test.go | 53 +++----- exchanges/binance/binance_mock_test.go | 51 ++----- exchanges/binance/binance_test.go | 62 +++++---- exchanges/binance/binance_websocket.go | 126 ++++++++++++------ .../binance/testdata/http.json | 0 internal/testing/exchange/exchange.go | 2 +- main.go | 2 +- testdata/configtest.json | 2 +- 12 files changed, 192 insertions(+), 183 deletions(-) rename testdata/http_mock/binance/binance.json => exchanges/binance/testdata/http.json (100%) diff --git a/common/file/archive/zip_test.go b/common/file/archive/zip_test.go index 12c3bce6..466338e5 100644 --- a/common/file/archive/zip_test.go +++ b/common/file/archive/zip_test.go @@ -3,8 +3,12 @@ package archive import ( "archive/zip" "errors" + "io/fs" "path/filepath" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestUnZip(t *testing.T) { @@ -33,60 +37,39 @@ func TestUnZip(t *testing.T) { func TestZip(t *testing.T) { tempDir := t.TempDir() - singleFile := filepath.Join("..", "..", "..", "testdata", "configtest.json") outFile := filepath.Join(tempDir, "out.zip") - err := Zip(singleFile, outFile) - if err != nil { - t.Fatal(err) - } + err := Zip(filepath.Join("..", "..", "..", "testdata", "configtest.json"), outFile) + require.NoError(t, err, "Zip should not error") o, err := UnZip(outFile, tempDir) - if err != nil { - t.Fatal(err) - } - if len(o) != 1 { - t.Fatalf("expected 1 files to be extracted received: %v ", len(o)) - } + require.NoError(t, err, "UnZip should not error") + assert.Len(t, o, 1, "Should extract 1 file") - folder := filepath.Join("..", "..", "..", "testdata", "http_mock") + folder := filepath.Join("..", "..", "..", "testdata", "gctscript") outFolderZip := filepath.Join(tempDir, "out_folder.zip") err = Zip(folder, outFolderZip) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err, "Zip should not error") o, err = UnZip(outFolderZip, tempDir) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err, "UnZip should not error") var found bool for i := range o { - if filepath.Base(o[i]) == "binance.json" { + if filepath.Base(o[i]) == "timer.gct" { found = true } } - if !found { - t.Fatal("could not find file in zip") - } - - if expected := 6; len(o) < expected { - t.Fatalf("expected at least %v files to be extracted, received: %v ", expected, len(o)) - } + assert.True(t, found, "Should find a gctscript in the zip") + assert.GreaterOrEqual(t, len(o), 6, "Should extract at least 6 files") folder = filepath.Join("..", "..", "..", "testdata", "invalid_file.json") - outFolderZip = filepath.Join(tempDir, "invalid.zip") - err = Zip(folder, outFolderZip) - if err == nil { - t.Fatal("expected IsNotExistError on invalid file") - } + err = Zip(folder, filepath.Join(tempDir, "invalid.zip")) + assert.ErrorIs(t, err, fs.ErrNotExist, "Zip should error correctly") addFilesToZip = addFilesToZipTestWrapper folder = filepath.Join("..", "..", "..", "testdata", "http_mock") outFolderZip = filepath.Join(tempDir, "error_zip.zip") err = Zip(folder, outFolderZip) - if err == nil { - t.Fatal("expected Zip() to fail due to invalid addFilesToZipTestWrapper()") - } + assert.ErrorContains(t, err, "specific error", "Zip should error correctly") } func addFilesToZipTestWrapper(_ *zip.Writer, _ string, _ bool) error { - return errors.New("error") + return errors.New("specific error") } diff --git a/config/config.go b/config/config.go index 7e45a26f..9a86be22 100644 --- a/config/config.go +++ b/config/config.go @@ -1850,9 +1850,18 @@ func (c *Config) UpdateConfig(configPath string, newCfg *Config, dryrun bool) er return c.LoadConfig(configPath, dryrun) } -// GetConfig returns a pointer to a configuration object +// GetConfig returns the global shared config instance func GetConfig() *Config { - return &Cfg + m.Lock() + defer m.Unlock() + return &cfg +} + +// SetConfig sets the global shared config instance +func SetConfig(c *Config) { + m.Lock() + defer m.Unlock() + cfg = *c } // RemoveExchange removes an exchange config diff --git a/config/config_types.go b/config/config_types.go index 9d6855d3..ab662f14 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -76,13 +76,16 @@ const ( DefaultUnsetAccountPlan = "accountPlan" ) -// Variables here are used for configuration +// Public errors exported by this package var ( - Cfg Config - m sync.Mutex ErrExchangeNotFound = errors.New("exchange not found") ) +var ( + cfg Config + m sync.Mutex +) + // Config is the overarching object that holds all the information for // prestart management of Portfolio, Communications, Webserver and Enabled // Exchanges diff --git a/engine/engine.go b/engine/engine.go index daa6bd8d..20f88cbd 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -64,7 +64,7 @@ func New() (*Engine, error) { newEngineMutex.Lock() defer newEngineMutex.Unlock() var b Engine - b.Config = &config.Cfg + b.Config = config.GetConfig() err := b.Config.LoadConfig("", false) if err != nil { diff --git a/exchanges/binance/binance_live_test.go b/exchanges/binance/binance_live_test.go index 98f6d79f..2ccf48c2 100644 --- a/exchanges/binance/binance_live_test.go +++ b/exchanges/binance/binance_live_test.go @@ -10,53 +10,44 @@ import ( "os" "testing" - "github.com/thrasher-corp/gocryptotrader/config" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) var mockTests = false func TestMain(m *testing.M) { - cfg := config.GetConfig() - err := cfg.LoadConfig("../../testdata/configtest.json", true) - if err != nil { - log.Fatal("Binance load config error", err) - } - binanceConfig, err := cfg.GetExchangeConfig("Binance") - if err != nil { - log.Fatal("Binance Setup() init error", err) + + b = new(Binance) + if err := testexch.TestInstance(b); err != nil { + log.Fatal(err) + } + + if apiKey != "" && apiSecret != "" { + b.API.AuthenticatedSupport = true + b.API.CredentialsValidator.RequiresBase64DecodeSecret = false + b.SetCredentials(apiKey, apiSecret, "", "", "", "") } - binanceConfig.API.AuthenticatedSupport = true - binanceConfig.API.Credentials.Key = apiKey - binanceConfig.API.Credentials.Secret = apiSecret - b.SetDefaults() - b.Websocket = sharedtestvalues.NewTestWebsocket() if useTestNet { - err = b.API.Endpoints.SetRunning(exchange.RestUSDTMargined.String(), testnetFutures) - if err != nil { - log.Fatal("Binance setup error", err) - } - err = b.API.Endpoints.SetRunning(exchange.RestCoinMargined.String(), testnetFutures) - if err != nil { - log.Fatal("Binance setup error", err) - } - err = b.API.Endpoints.SetRunning(exchange.RestSpot.String(), testnetSpotURL) - if err != nil { - log.Fatal("Binance setup error", err) + for k, v := range map[exchange.URL]string{ + exchange.RestUSDTMargined: testnetFutures, + exchange.RestCoinMargined: testnetFutures, + exchange.RestSpot: testnetSpotURL, + } { + if err := b.API.Endpoints.SetRunning(k.String(), v); err != nil { + log.Fatalf("Testnet `%s` URL error with `%s`: %s", k, v, err) + } } } - err = b.Setup(binanceConfig) - if err != nil { - log.Fatal("Binance setup error", err) - } + b.setupOrderbookManager() b.Websocket.DataHandler = sharedtestvalues.GetWebsocketInterfaceChannelOverride() log.Printf(sharedtestvalues.LiveTesting, b.Name) - err = b.UpdateTradablePairs(context.Background(), true) - if err != nil { + if err := b.UpdateTradablePairs(context.Background(), true); err != nil { log.Fatal("Binance setup error", err) } + os.Exit(m.Run()) } diff --git a/exchanges/binance/binance_mock_test.go b/exchanges/binance/binance_mock_test.go index 5a3cd9d9..85cb95b1 100644 --- a/exchanges/binance/binance_mock_test.go +++ b/exchanges/binance/binance_mock_test.go @@ -10,60 +10,29 @@ import ( "os" "testing" - "github.com/thrasher-corp/gocryptotrader/config" - "github.com/thrasher-corp/gocryptotrader/exchanges/mock" - "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) -const mockfile = "../../testdata/http_mock/binance/binance.json" - var mockTests = true func TestMain(m *testing.M) { if useTestNet { log.Fatal("cannot use testnet with mock tests") } - cfg := config.GetConfig() - err := cfg.LoadConfig("../../testdata/configtest.json", true) - if err != nil { - log.Fatal("Binance load config error", err) + + b = new(Binance) + if err := testexch.TestInstance(b); err != nil { + log.Fatal(err) } - binanceConfig, err := cfg.GetExchangeConfig("Binance") - if err != nil { - log.Fatal("Binance Setup() init error", err) - } - b.SkipAuthCheck = true - binanceConfig.API.AuthenticatedSupport = true - binanceConfig.API.Credentials.Key = apiKey - binanceConfig.API.Credentials.Secret = apiSecret - b.SetDefaults() - b.Websocket = sharedtestvalues.NewTestWebsocket() - err = b.Setup(binanceConfig) - if err != nil { - log.Fatal("Binance setup error", err) + + if err := testexch.MockHTTPInstance(b); err != nil { + log.Fatal(err) } b.setupOrderbookManager() - - serverDetails, newClient, err := mock.NewVCRServer(mockfile) - if err != nil { - log.Fatalf("Mock server error %s", err) - } - err = b.SetHTTPClient(newClient) - if err != nil { - log.Fatalf("Mock server error %s", err) - } - endpointMap := b.API.Endpoints.GetURLMap() - for k := range endpointMap { - err = b.API.Endpoints.SetRunning(k, serverDetails) - if err != nil { - log.Fatal(err) - } - } - log.Printf(sharedtestvalues.MockTesting, b.Name) - err = b.UpdateTradablePairs(context.Background(), true) - if err != nil { + if err := b.UpdateTradablePairs(context.Background(), true); err != nil { log.Fatal(err) } + os.Exit(m.Run()) } diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 847b17c1..629ed3a9 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -10,7 +10,9 @@ import ( "testing" "time" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/core" @@ -26,6 +28,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -1952,34 +1955,47 @@ func TestGetDepositAddress(t *testing.T) { } } -func TestWSSubscriptionHandling(t *testing.T) { +func TestSubscribe(t *testing.T) { t.Parallel() - pressXToJSON := []byte(`{ - "method": "SUBSCRIBE", - "params": [ - "btcusdt@aggTrade", - "btcusdt@depth" - ], - "id": 1 -}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) + b := b + channels := []subscription.Subscription{ + {Channel: "btcusdt@ticker"}, + {Channel: "btcusdt@trade"}, } + if mockTests { + b = testexch.MockWSInstance[Binance](t, func(msg []byte, w *websocket.Conn) error { + var req WsPayload + err := json.Unmarshal(msg, &req) + require.NoError(t, err, "Unmarshal should not error") + require.Len(t, req.Params, len(channels), "Params should only have 2 channel") // Failure might mean mockWSInstance default Subs is not empty + assert.Equal(t, req.Params[0], channels[0].Channel, "Channel name should be correct") + assert.Equal(t, req.Params[1], channels[1].Channel, "Channel name should be correct") + return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":null,"id":%d}`, req.ID))) + }) + } else { + testexch.SetupWs(t, b) + } + err := b.Subscribe(channels) + require.NoError(t, err, "Subscribe should not error") + err = b.Unsubscribe(channels) + require.NoError(t, err, "Unsubscribe should not error") } -func TestWSUnsubscriptionHandling(t *testing.T) { - pressXToJSON := []byte(`{ - "method": "UNSUBSCRIBE", - "params": [ - "btcusdt@depth" - ], - "id": 312 -}`) - err := b.wsHandleData(pressXToJSON) - if err != nil { - t.Error(err) +func TestSubscribeBadResp(t *testing.T) { + t.Parallel() + channels := []subscription.Subscription{ + {Channel: "moons@ticker"}, } + b := testexch.MockWSInstance[Binance](t, func(msg []byte, w *websocket.Conn) error { //nolint:govet // shadow + var req WsPayload + err := json.Unmarshal(msg, &req) + require.NoError(t, err, "Unmarshal should not error") + return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":{"error":"carrots"},"id":%d}`, req.ID))) + }) + err := b.Subscribe(channels) + assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Subscribe should error ErrSubscriptionFailure") + assert.ErrorIs(t, err, errUnknownError, "Subscribe should error errUnknownError") + assert.ErrorContains(t, err, "carrots", "Subscribe should error containing the carrots") } func TestWsTickerUpdate(t *testing.T) { diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index bffe11b0..0a3fb38e 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/buger/jsonparser" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -25,6 +26,10 @@ import ( const ( binanceDefaultWebsocketURL = "wss://stream.binance.com:9443/stream" pingDelay = time.Minute * 9 + + wsSubscribeMethod = "SUBSCRIBE" + wsUnsubscribeMethod = "UNSUBSCRIBE" + wsListSubscriptionsMethod = "LIST_SUBSCRIPTIONS" ) var listenKey string @@ -39,6 +44,7 @@ var ( // maxWSOrderbookWorkers defines a max amount of workers allowed to execute // jobs from the job channel maxWSOrderbookWorkers = 10 + errUnknownError = errors.New("unknown error") ) // WsConnect initiates a websocket connection @@ -164,21 +170,18 @@ func (b *Binance) wsHandleData(respRaw []byte) error { return err } + if id, err := jsonparser.GetInt(respRaw, "id"); err == nil { + if b.Websocket.Match.IncomingWithData(id, respRaw) { + return nil + } + } + if r, ok := multiStreamData["result"]; ok { if r == nil { return nil } } - if method, ok := multiStreamData["method"].(string); ok { - // TODO handle subscription handling - if strings.EqualFold(method, "subscribe") { - return nil - } - if strings.EqualFold(method, "unsubscribe") { - return nil - } - } if newdata, ok := multiStreamData["data"].(map[string]interface{}); ok { if e, ok := newdata["e"].(string); ok { switch e { @@ -601,52 +604,87 @@ func channelName(s *subscription.Subscription) (string, error) { } // Subscribe subscribes to a set of channels -func (b *Binance) Subscribe(channelsToSubscribe []subscription.Subscription) error { - payload := WsPayload{ - Method: "SUBSCRIBE", - } - for i := range channelsToSubscribe { - payload.Params = append(payload.Params, channelsToSubscribe[i].Channel) - if i%50 == 0 && i != 0 { - err := b.Websocket.Conn.SendJSONMessage(payload) - if err != nil { - return err - } - payload.Params = []string{} +func (b *Binance) Subscribe(channels []subscription.Subscription) error { + return b.ParallelChanOp(channels, b.subscribeToChan, 50) +} + +// subscribeToChan handles a single subscription and parses the result +// on success it adds the subscription to the websocket +func (b *Binance) subscribeToChan(chans []subscription.Subscription) error { + id := b.Websocket.Conn.GenerateMessageID(false) + + cNames := make([]string, len(chans)) + for i := range chans { + c := chans[i] + cNames[i] = c.Channel + c.State = subscription.SubscribingState + if err := b.Websocket.AddSubscription(&c); err != nil { + return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pair, err) } } - if len(payload.Params) > 0 { - err := b.Websocket.Conn.SendJSONMessage(payload) - if err != nil { - return err + + req := WsPayload{ + Method: wsSubscribeMethod, + Params: cNames, + ID: id, + } + + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req) + if err == nil { + if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil { + err = rErr + } else if d != jsonparser.Null { // null is the only expected and acceptable response + err = fmt.Errorf("%w: %s", errUnknownError, v) } } - b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) - return nil + + if err != nil { + b.Websocket.RemoveSubscriptions(chans...) + err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrSubscriptionFailure, err, strings.Join(cNames, ", ")) + b.Websocket.DataHandler <- err + } else { + b.Websocket.AddSuccessfulSubscriptions(chans...) + } + + return err } // Unsubscribe unsubscribes from a set of channels -func (b *Binance) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error { - payload := WsPayload{ - Method: "UNSUBSCRIBE", +func (b *Binance) Unsubscribe(channels []subscription.Subscription) error { + return b.ParallelChanOp(channels, b.unsubscribeFromChan, 50) +} + +// unsubscribeFromChan sends a websocket message to stop receiving data from a channel +func (b *Binance) unsubscribeFromChan(chans []subscription.Subscription) error { + id := b.Websocket.Conn.GenerateMessageID(false) + + cNames := make([]string, len(chans)) + for i := range chans { + cNames[i] = chans[i].Channel } - for i := range channelsToUnsubscribe { - payload.Params = append(payload.Params, channelsToUnsubscribe[i].Channel) - if i%50 == 0 && i != 0 { - err := b.Websocket.Conn.SendJSONMessage(payload) - if err != nil { - return err - } - payload.Params = []string{} + + req := WsPayload{ + Method: wsUnsubscribeMethod, + Params: cNames, + ID: id, + } + + respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req) + if err == nil { + if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil { + err = rErr + } else if d != jsonparser.Null { // null is the only expected and acceptable response + err = fmt.Errorf("%w: %s", errUnknownError, v) } } - if len(payload.Params) > 0 { - err := b.Websocket.Conn.SendJSONMessage(payload) - if err != nil { - return err - } + + if err != nil { + err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrUnsubscribeFailure, err, strings.Join(cNames, ", ")) + b.Websocket.DataHandler <- err + } else { + b.Websocket.RemoveSubscriptions(chans...) } - b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...) + return nil } diff --git a/testdata/http_mock/binance/binance.json b/exchanges/binance/testdata/http.json similarity index 100% rename from testdata/http_mock/binance/binance.json rename to exchanges/binance/testdata/http.json diff --git a/internal/testing/exchange/exchange.go b/internal/testing/exchange/exchange.go index b5ddbef4..8a943677 100644 --- a/internal/testing/exchange/exchange.go +++ b/internal/testing/exchange/exchange.go @@ -22,7 +22,7 @@ import ( // TestInstance takes an empty exchange instance and loads config for it from testdata/configtest and connects a NewTestWebsocket func TestInstance(e exchange.IBotExchange) error { - cfg := config.GetConfig() + cfg := &config.Config{} err := cfg.LoadConfig("../../testdata/configtest.json", true) if err != nil { return fmt.Errorf("LoadConfig() error: %w", err) diff --git a/main.go b/main.go index 8d71e29b..d84ac9cf 100644 --- a/main.go +++ b/main.go @@ -135,7 +135,7 @@ func main() { if engine.Bot == nil || err != nil { log.Fatalf("Unable to initialise bot engine. Error: %s\n", err) } - config.Cfg = *engine.Bot.Config + config.SetConfig(engine.Bot.Config) gctscript.Setup() diff --git a/testdata/configtest.json b/testdata/configtest.json index 0d31f12a..e61225b4 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -495,7 +495,7 @@ }, "enabled": { "autoPairUpdates": true, - "websocketAPI": false + "websocketAPI": true } }, "bankAccounts": [