Binance: Subscribe/unsubscribe response handling (#1444)

* Binance: Fix subscription failures ignored

* Testing: Fix race on shared config singleton

* Config: Privatise Global config var

We should *either* use a private var *or* use an accessor, but it
doesn't make sense to mix paradigms.
Since GetConfig() is well established this instead removes the limited uses of direct public access and adds a Setter

* Zip: Fix test failure on http mocks
This commit is contained in:
Gareth Kirwan
2024-02-02 09:27:17 +01:00
committed by GitHub
parent e16ee53746
commit 68a6f5828f
12 changed files with 192 additions and 183 deletions

View File

@@ -3,8 +3,12 @@ package archive
import (
"archive/zip"
"errors"
"io/fs"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUnZip(t *testing.T) {
@@ -33,60 +37,39 @@ func TestUnZip(t *testing.T) {
func TestZip(t *testing.T) {
tempDir := t.TempDir()
singleFile := filepath.Join("..", "..", "..", "testdata", "configtest.json")
outFile := filepath.Join(tempDir, "out.zip")
err := Zip(singleFile, outFile)
if err != nil {
t.Fatal(err)
}
err := Zip(filepath.Join("..", "..", "..", "testdata", "configtest.json"), outFile)
require.NoError(t, err, "Zip should not error")
o, err := UnZip(outFile, tempDir)
if err != nil {
t.Fatal(err)
}
if len(o) != 1 {
t.Fatalf("expected 1 files to be extracted received: %v ", len(o))
}
require.NoError(t, err, "UnZip should not error")
assert.Len(t, o, 1, "Should extract 1 file")
folder := filepath.Join("..", "..", "..", "testdata", "http_mock")
folder := filepath.Join("..", "..", "..", "testdata", "gctscript")
outFolderZip := filepath.Join(tempDir, "out_folder.zip")
err = Zip(folder, outFolderZip)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err, "Zip should not error")
o, err = UnZip(outFolderZip, tempDir)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err, "UnZip should not error")
var found bool
for i := range o {
if filepath.Base(o[i]) == "binance.json" {
if filepath.Base(o[i]) == "timer.gct" {
found = true
}
}
if !found {
t.Fatal("could not find file in zip")
}
if expected := 6; len(o) < expected {
t.Fatalf("expected at least %v files to be extracted, received: %v ", expected, len(o))
}
assert.True(t, found, "Should find a gctscript in the zip")
assert.GreaterOrEqual(t, len(o), 6, "Should extract at least 6 files")
folder = filepath.Join("..", "..", "..", "testdata", "invalid_file.json")
outFolderZip = filepath.Join(tempDir, "invalid.zip")
err = Zip(folder, outFolderZip)
if err == nil {
t.Fatal("expected IsNotExistError on invalid file")
}
err = Zip(folder, filepath.Join(tempDir, "invalid.zip"))
assert.ErrorIs(t, err, fs.ErrNotExist, "Zip should error correctly")
addFilesToZip = addFilesToZipTestWrapper
folder = filepath.Join("..", "..", "..", "testdata", "http_mock")
outFolderZip = filepath.Join(tempDir, "error_zip.zip")
err = Zip(folder, outFolderZip)
if err == nil {
t.Fatal("expected Zip() to fail due to invalid addFilesToZipTestWrapper()")
}
assert.ErrorContains(t, err, "specific error", "Zip should error correctly")
}
func addFilesToZipTestWrapper(_ *zip.Writer, _ string, _ bool) error {
return errors.New("error")
return errors.New("specific error")
}

View File

@@ -1850,9 +1850,18 @@ func (c *Config) UpdateConfig(configPath string, newCfg *Config, dryrun bool) er
return c.LoadConfig(configPath, dryrun)
}
// GetConfig returns a pointer to a configuration object
// GetConfig returns the global shared config instance
func GetConfig() *Config {
return &Cfg
m.Lock()
defer m.Unlock()
return &cfg
}
// SetConfig sets the global shared config instance
func SetConfig(c *Config) {
m.Lock()
defer m.Unlock()
cfg = *c
}
// RemoveExchange removes an exchange config

View File

@@ -76,13 +76,16 @@ const (
DefaultUnsetAccountPlan = "accountPlan"
)
// Variables here are used for configuration
// Public errors exported by this package
var (
Cfg Config
m sync.Mutex
ErrExchangeNotFound = errors.New("exchange not found")
)
var (
cfg Config
m sync.Mutex
)
// Config is the overarching object that holds all the information for
// prestart management of Portfolio, Communications, Webserver and Enabled
// Exchanges

View File

@@ -64,7 +64,7 @@ func New() (*Engine, error) {
newEngineMutex.Lock()
defer newEngineMutex.Unlock()
var b Engine
b.Config = &config.Cfg
b.Config = config.GetConfig()
err := b.Config.LoadConfig("", false)
if err != nil {

View File

@@ -10,53 +10,44 @@ import (
"os"
"testing"
"github.com/thrasher-corp/gocryptotrader/config"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
)
var mockTests = false
func TestMain(m *testing.M) {
cfg := config.GetConfig()
err := cfg.LoadConfig("../../testdata/configtest.json", true)
if err != nil {
log.Fatal("Binance load config error", err)
}
binanceConfig, err := cfg.GetExchangeConfig("Binance")
if err != nil {
log.Fatal("Binance Setup() init error", err)
b = new(Binance)
if err := testexch.TestInstance(b); err != nil {
log.Fatal(err)
}
if apiKey != "" && apiSecret != "" {
b.API.AuthenticatedSupport = true
b.API.CredentialsValidator.RequiresBase64DecodeSecret = false
b.SetCredentials(apiKey, apiSecret, "", "", "", "")
}
binanceConfig.API.AuthenticatedSupport = true
binanceConfig.API.Credentials.Key = apiKey
binanceConfig.API.Credentials.Secret = apiSecret
b.SetDefaults()
b.Websocket = sharedtestvalues.NewTestWebsocket()
if useTestNet {
err = b.API.Endpoints.SetRunning(exchange.RestUSDTMargined.String(), testnetFutures)
if err != nil {
log.Fatal("Binance setup error", err)
}
err = b.API.Endpoints.SetRunning(exchange.RestCoinMargined.String(), testnetFutures)
if err != nil {
log.Fatal("Binance setup error", err)
}
err = b.API.Endpoints.SetRunning(exchange.RestSpot.String(), testnetSpotURL)
if err != nil {
log.Fatal("Binance setup error", err)
for k, v := range map[exchange.URL]string{
exchange.RestUSDTMargined: testnetFutures,
exchange.RestCoinMargined: testnetFutures,
exchange.RestSpot: testnetSpotURL,
} {
if err := b.API.Endpoints.SetRunning(k.String(), v); err != nil {
log.Fatalf("Testnet `%s` URL error with `%s`: %s", k, v, err)
}
}
}
err = b.Setup(binanceConfig)
if err != nil {
log.Fatal("Binance setup error", err)
}
b.setupOrderbookManager()
b.Websocket.DataHandler = sharedtestvalues.GetWebsocketInterfaceChannelOverride()
log.Printf(sharedtestvalues.LiveTesting, b.Name)
err = b.UpdateTradablePairs(context.Background(), true)
if err != nil {
if err := b.UpdateTradablePairs(context.Background(), true); err != nil {
log.Fatal("Binance setup error", err)
}
os.Exit(m.Run())
}

View File

@@ -10,60 +10,29 @@ import (
"os"
"testing"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/exchanges/mock"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
)
const mockfile = "../../testdata/http_mock/binance/binance.json"
var mockTests = true
func TestMain(m *testing.M) {
if useTestNet {
log.Fatal("cannot use testnet with mock tests")
}
cfg := config.GetConfig()
err := cfg.LoadConfig("../../testdata/configtest.json", true)
if err != nil {
log.Fatal("Binance load config error", err)
b = new(Binance)
if err := testexch.TestInstance(b); err != nil {
log.Fatal(err)
}
binanceConfig, err := cfg.GetExchangeConfig("Binance")
if err != nil {
log.Fatal("Binance Setup() init error", err)
}
b.SkipAuthCheck = true
binanceConfig.API.AuthenticatedSupport = true
binanceConfig.API.Credentials.Key = apiKey
binanceConfig.API.Credentials.Secret = apiSecret
b.SetDefaults()
b.Websocket = sharedtestvalues.NewTestWebsocket()
err = b.Setup(binanceConfig)
if err != nil {
log.Fatal("Binance setup error", err)
if err := testexch.MockHTTPInstance(b); err != nil {
log.Fatal(err)
}
b.setupOrderbookManager()
serverDetails, newClient, err := mock.NewVCRServer(mockfile)
if err != nil {
log.Fatalf("Mock server error %s", err)
}
err = b.SetHTTPClient(newClient)
if err != nil {
log.Fatalf("Mock server error %s", err)
}
endpointMap := b.API.Endpoints.GetURLMap()
for k := range endpointMap {
err = b.API.Endpoints.SetRunning(k, serverDetails)
if err != nil {
log.Fatal(err)
}
}
log.Printf(sharedtestvalues.MockTesting, b.Name)
err = b.UpdateTradablePairs(context.Background(), true)
if err != nil {
if err := b.UpdateTradablePairs(context.Background(), true); err != nil {
log.Fatal(err)
}
os.Exit(m.Run())
}

View File

@@ -10,7 +10,9 @@ import (
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/core"
@@ -26,6 +28,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -1952,34 +1955,47 @@ func TestGetDepositAddress(t *testing.T) {
}
}
func TestWSSubscriptionHandling(t *testing.T) {
func TestSubscribe(t *testing.T) {
t.Parallel()
pressXToJSON := []byte(`{
"method": "SUBSCRIBE",
"params": [
"btcusdt@aggTrade",
"btcusdt@depth"
],
"id": 1
}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
b := b
channels := []subscription.Subscription{
{Channel: "btcusdt@ticker"},
{Channel: "btcusdt@trade"},
}
if mockTests {
b = testexch.MockWSInstance[Binance](t, func(msg []byte, w *websocket.Conn) error {
var req WsPayload
err := json.Unmarshal(msg, &req)
require.NoError(t, err, "Unmarshal should not error")
require.Len(t, req.Params, len(channels), "Params should only have 2 channel") // Failure might mean mockWSInstance default Subs is not empty
assert.Equal(t, req.Params[0], channels[0].Channel, "Channel name should be correct")
assert.Equal(t, req.Params[1], channels[1].Channel, "Channel name should be correct")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":null,"id":%d}`, req.ID)))
})
} else {
testexch.SetupWs(t, b)
}
err := b.Subscribe(channels)
require.NoError(t, err, "Subscribe should not error")
err = b.Unsubscribe(channels)
require.NoError(t, err, "Unsubscribe should not error")
}
func TestWSUnsubscriptionHandling(t *testing.T) {
pressXToJSON := []byte(`{
"method": "UNSUBSCRIBE",
"params": [
"btcusdt@depth"
],
"id": 312
}`)
err := b.wsHandleData(pressXToJSON)
if err != nil {
t.Error(err)
func TestSubscribeBadResp(t *testing.T) {
t.Parallel()
channels := []subscription.Subscription{
{Channel: "moons@ticker"},
}
b := testexch.MockWSInstance[Binance](t, func(msg []byte, w *websocket.Conn) error { //nolint:govet // shadow
var req WsPayload
err := json.Unmarshal(msg, &req)
require.NoError(t, err, "Unmarshal should not error")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":{"error":"carrots"},"id":%d}`, req.ID)))
})
err := b.Subscribe(channels)
assert.ErrorIs(t, err, stream.ErrSubscriptionFailure, "Subscribe should error ErrSubscriptionFailure")
assert.ErrorIs(t, err, errUnknownError, "Subscribe should error errUnknownError")
assert.ErrorContains(t, err, "carrots", "Subscribe should error containing the carrots")
}
func TestWsTickerUpdate(t *testing.T) {

View File

@@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
@@ -25,6 +26,10 @@ import (
const (
binanceDefaultWebsocketURL = "wss://stream.binance.com:9443/stream"
pingDelay = time.Minute * 9
wsSubscribeMethod = "SUBSCRIBE"
wsUnsubscribeMethod = "UNSUBSCRIBE"
wsListSubscriptionsMethod = "LIST_SUBSCRIPTIONS"
)
var listenKey string
@@ -39,6 +44,7 @@ var (
// maxWSOrderbookWorkers defines a max amount of workers allowed to execute
// jobs from the job channel
maxWSOrderbookWorkers = 10
errUnknownError = errors.New("unknown error")
)
// WsConnect initiates a websocket connection
@@ -164,21 +170,18 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
return err
}
if id, err := jsonparser.GetInt(respRaw, "id"); err == nil {
if b.Websocket.Match.IncomingWithData(id, respRaw) {
return nil
}
}
if r, ok := multiStreamData["result"]; ok {
if r == nil {
return nil
}
}
if method, ok := multiStreamData["method"].(string); ok {
// TODO handle subscription handling
if strings.EqualFold(method, "subscribe") {
return nil
}
if strings.EqualFold(method, "unsubscribe") {
return nil
}
}
if newdata, ok := multiStreamData["data"].(map[string]interface{}); ok {
if e, ok := newdata["e"].(string); ok {
switch e {
@@ -601,52 +604,87 @@ func channelName(s *subscription.Subscription) (string, error) {
}
// Subscribe subscribes to a set of channels
func (b *Binance) Subscribe(channelsToSubscribe []subscription.Subscription) error {
payload := WsPayload{
Method: "SUBSCRIBE",
}
for i := range channelsToSubscribe {
payload.Params = append(payload.Params, channelsToSubscribe[i].Channel)
if i%50 == 0 && i != 0 {
err := b.Websocket.Conn.SendJSONMessage(payload)
if err != nil {
return err
}
payload.Params = []string{}
func (b *Binance) Subscribe(channels []subscription.Subscription) error {
return b.ParallelChanOp(channels, b.subscribeToChan, 50)
}
// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Binance) subscribeToChan(chans []subscription.Subscription) error {
id := b.Websocket.Conn.GenerateMessageID(false)
cNames := make([]string, len(chans))
for i := range chans {
c := chans[i]
cNames[i] = c.Channel
c.State = subscription.SubscribingState
if err := b.Websocket.AddSubscription(&c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pair, err)
}
}
if len(payload.Params) > 0 {
err := b.Websocket.Conn.SendJSONMessage(payload)
if err != nil {
return err
req := WsPayload{
Method: wsSubscribeMethod,
Params: cNames,
ID: id,
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
} else if d != jsonparser.Null { // null is the only expected and acceptable response
err = fmt.Errorf("%w: %s", errUnknownError, v)
}
}
b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...)
return nil
if err != nil {
b.Websocket.RemoveSubscriptions(chans...)
err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrSubscriptionFailure, err, strings.Join(cNames, ", "))
b.Websocket.DataHandler <- err
} else {
b.Websocket.AddSuccessfulSubscriptions(chans...)
}
return err
}
// Unsubscribe unsubscribes from a set of channels
func (b *Binance) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error {
payload := WsPayload{
Method: "UNSUBSCRIBE",
func (b *Binance) Unsubscribe(channels []subscription.Subscription) error {
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 50)
}
// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Binance) unsubscribeFromChan(chans []subscription.Subscription) error {
id := b.Websocket.Conn.GenerateMessageID(false)
cNames := make([]string, len(chans))
for i := range chans {
cNames[i] = chans[i].Channel
}
for i := range channelsToUnsubscribe {
payload.Params = append(payload.Params, channelsToUnsubscribe[i].Channel)
if i%50 == 0 && i != 0 {
err := b.Websocket.Conn.SendJSONMessage(payload)
if err != nil {
return err
}
payload.Params = []string{}
req := WsPayload{
Method: wsUnsubscribeMethod,
Params: cNames,
ID: id,
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(id, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
} else if d != jsonparser.Null { // null is the only expected and acceptable response
err = fmt.Errorf("%w: %s", errUnknownError, v)
}
}
if len(payload.Params) > 0 {
err := b.Websocket.Conn.SendJSONMessage(payload)
if err != nil {
return err
}
if err != nil {
err = fmt.Errorf("%w: %w; Channels: %s", stream.ErrUnsubscribeFailure, err, strings.Join(cNames, ", "))
b.Websocket.DataHandler <- err
} else {
b.Websocket.RemoveSubscriptions(chans...)
}
b.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

View File

@@ -22,7 +22,7 @@ import (
// TestInstance takes an empty exchange instance and loads config for it from testdata/configtest and connects a NewTestWebsocket
func TestInstance(e exchange.IBotExchange) error {
cfg := config.GetConfig()
cfg := &config.Config{}
err := cfg.LoadConfig("../../testdata/configtest.json", true)
if err != nil {
return fmt.Errorf("LoadConfig() error: %w", err)

View File

@@ -135,7 +135,7 @@ func main() {
if engine.Bot == nil || err != nil {
log.Fatalf("Unable to initialise bot engine. Error: %s\n", err)
}
config.Cfg = *engine.Bot.Config
config.SetConfig(engine.Bot.Config)
gctscript.Setup()

View File

@@ -495,7 +495,7 @@
},
"enabled": {
"autoPairUpdates": true,
"websocketAPI": false
"websocketAPI": true
}
},
"bankAccounts": [