diff --git a/exchange/websocket/connection.go b/exchange/websocket/connection.go index e1e7968e..45f2be50 100644 --- a/exchange/websocket/connection.go +++ b/exchange/websocket/connection.go @@ -282,7 +282,7 @@ func (c *connection) ReadMessage() Response { // method on WebsocketConnection type has been called and can // be skipped. select { - case c.readMessageErrors <- fmt.Errorf("%w: %w", err, errConnectionFault): + case c.readMessageErrors <- fmt.Errorf("%w: %w (%q)", err, errConnectionFault, c.URL): default: // bypass if there is no receiver, as this stops it returning // when shutdown is called. diff --git a/exchange/websocket/types.go b/exchange/websocket/types.go index 2ca3c012..87d26c9d 100644 --- a/exchange/websocket/types.go +++ b/exchange/websocket/types.go @@ -10,7 +10,6 @@ import ( // PingHandler container for ping handler settings type PingHandler struct { - Websocket bool UseGorillaHandler bool MessageType int Message []byte diff --git a/exchanges/gateio/gateio_test.go b/exchanges/gateio/gateio_test.go index 2fc2c60e..905bcb56 100644 --- a/exchanges/gateio/gateio_test.go +++ b/exchanges/gateio/gateio_test.go @@ -2274,6 +2274,12 @@ func TestOptionsPositionPushData(t *testing.T) { } } +func TestOptionsPongPushData(t *testing.T) { + t.Parallel() + err := e.WsHandleOptionsData(t.Context(), nil, []byte(`{"time":1756700469,"channel":"options.pong","event":"","result":null}`)) + require.NoError(t, err) +} + func TestGenerateSubscriptionsSpot(t *testing.T) { t.Parallel() diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 403785b7..115491da 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -77,28 +77,26 @@ var subscriptionNames = map[string]string{ subscription.AllTradesChannel: spotTradesChannel, } -var standardMarginAssetTypes = []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin} +var ( + standardMarginAssetTypes = []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin} + validPingChannels = []string{optionsPingChannel, futuresPingChannel, spotPingChannel} +) + +var errInvalidPingChannel = errors.New("invalid ping channel") // WsConnectSpot initiates a websocket connection func (e *Exchange) WsConnectSpot(ctx context.Context, conn websocket.Connection) error { - err := e.CurrencyPairs.IsAssetEnabled(asset.Spot) + if err := e.CurrencyPairs.IsAssetEnabled(asset.Spot); err != nil { + return err + } + if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil { + return err + } + pingHandler, err := getWSPingHandler(spotPingChannel) if err != nil { return err } - err = conn.Dial(ctx, &gws.Dialer{}, http.Header{}) - if err != nil { - return err - } - pingMessage, err := json.Marshal(WsInput{Channel: spotPingChannel}) - if err != nil { - return err - } - conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{ - Websocket: true, - Delay: time.Second * 15, - Message: pingMessage, - MessageType: gws.TextMessage, - }) + conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler) return nil } @@ -1008,3 +1006,18 @@ type wsRespAckInspector struct{} func (wsRespAckInspector) IsFinal(data []byte) bool { return !strings.Contains(string(data), "ack") } + +func getWSPingHandler(channel string) (websocket.PingHandler, error) { + if !slices.Contains(validPingChannels, channel) { + return websocket.PingHandler{}, fmt.Errorf("%w: %q", errInvalidPingChannel, channel) + } + pingMessage, err := json.Marshal(WsInput{Channel: channel}) + if err != nil { + return websocket.PingHandler{}, err + } + return websocket.PingHandler{ + Delay: time.Second * 10, // Arbitrary reasonable delay + Message: pingMessage, + MessageType: gws.TextMessage, + }, nil +} diff --git a/exchanges/gateio/gateio_websocket_delivery_futures.go b/exchanges/gateio/gateio_websocket_delivery_futures.go index 2c6851c9..ea65bb04 100644 --- a/exchanges/gateio/gateio_websocket_delivery_futures.go +++ b/exchanges/gateio/gateio_websocket_delivery_futures.go @@ -9,7 +9,6 @@ import ( gws "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/currency" - "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" @@ -44,20 +43,11 @@ func (e *Exchange) WsDeliveryFuturesConnect(ctx context.Context, conn websocket. if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil { return err } - pingMessage, err := json.Marshal(WsInput{ - ID: conn.GenerateMessageID(false), - Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message. - Channel: futuresPingChannel, - }) + pingHandler, err := getWSPingHandler(futuresPingChannel) if err != nil { return err } - conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{ - Websocket: true, - Delay: time.Second * 5, - MessageType: gws.PingMessage, - Message: pingMessage, - }) + conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler) return nil } diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 6cdba717..57d2f80c 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -69,20 +69,11 @@ func (e *Exchange) WsFuturesConnect(ctx context.Context, conn websocket.Connecti if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil { return err } - pingMessage, err := json.Marshal(WsInput{ - ID: conn.GenerateMessageID(false), - Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message. - Channel: futuresPingChannel, - }) + pingHandler, err := getWSPingHandler(futuresPingChannel) if err != nil { return err } - conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{ - Websocket: true, - MessageType: gws.PingMessage, - Delay: time.Second * 15, - Message: pingMessage, - }) + conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler) return nil } @@ -193,6 +184,8 @@ func (e *Exchange) WsHandleFuturesData(ctx context.Context, conn websocket.Conne return e.processFuturesPositionsNotification(respRaw) case futuresAutoOrdersChannel: return e.processFuturesAutoOrderPushData(respRaw) + case "futures.pong": + return nil default: e.Websocket.DataHandler <- websocket.UnhandledMessageWarning{ Message: e.Name + websocket.UnhandledMessage + string(respRaw), diff --git a/exchanges/gateio/gateio_websocket_option.go b/exchanges/gateio/gateio_websocket_option.go index 4c4799db..8b5d191b 100644 --- a/exchanges/gateio/gateio_websocket_option.go +++ b/exchanges/gateio/gateio_websocket_option.go @@ -68,28 +68,17 @@ var defaultOptionsSubscriptions = []string{ // WsOptionsConnect initiates a websocket connection to options websocket endpoints. func (e *Exchange) WsOptionsConnect(ctx context.Context, conn websocket.Connection) error { - err := e.CurrencyPairs.IsAssetEnabled(asset.Options) + if err := e.CurrencyPairs.IsAssetEnabled(asset.Options); err != nil { + return err + } + if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil { + return err + } + pingHandler, err := getWSPingHandler(optionsPingChannel) if err != nil { return err } - err = conn.Dial(ctx, &gws.Dialer{}, http.Header{}) - if err != nil { - return err - } - pingMessage, err := json.Marshal(WsInput{ - ID: conn.GenerateMessageID(false), - Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message. - Channel: optionsPingChannel, - }) - if err != nil { - return err - } - conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{ - Websocket: true, - Delay: time.Second * 5, - MessageType: gws.PingMessage, - Message: pingMessage, - }) + conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler) return nil } @@ -344,6 +333,8 @@ func (e *Exchange) WsHandleOptionsData(ctx context.Context, conn websocket.Conne return e.processBalancePushData(ctx, respRaw, asset.Options) case optionsPositionsChannel: return e.processOptionsPositionPushData(respRaw) + case "options.pong": + return nil default: e.Websocket.DataHandler <- websocket.UnhandledMessageWarning{ Message: e.Name + websocket.UnhandledMessage + string(respRaw), diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go new file mode 100644 index 00000000..c9404b3b --- /dev/null +++ b/exchanges/gateio/gateio_websocket_test.go @@ -0,0 +1,33 @@ +package gateio + +import ( + "testing" + "time" + + gws "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +func TestGetWSPingHandler(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + channel string + err error + }{ + {optionsPingChannel, nil}, + {futuresPingChannel, nil}, + {spotPingChannel, nil}, + {"dong", errInvalidPingChannel}, + } { + got, err := getWSPingHandler(tc.channel) + if tc.err != nil { + require.ErrorIs(t, err, tc.err) + continue + } + require.NoError(t, err) + require.Equal(t, time.Second*10, got.Delay) + require.Equal(t, gws.TextMessage, got.MessageType) + require.Contains(t, string(got.Message), tc.channel) + } +} diff --git a/exchanges/gateio/testdata/wsFutures.json b/exchanges/gateio/testdata/wsFutures.json index c4cfb803..f8fb7fe1 100644 --- a/exchanges/gateio/testdata/wsFutures.json +++ b/exchanges/gateio/testdata/wsFutures.json @@ -13,3 +13,4 @@ {"time":1678468497,"time_ms":1678468497232,"channel":"futures.order_book","event":"all","result":{"t":1678468497168,"id":4010394406,"contract":"BTC_USD","asks":[{"p":"19909","s":3100},{"p":"19909.1","s":5000},{"p":"19910","s":3100},{"p":"19914.4","s":4400},{"p":"19916.6","s":5000},{"p":"19917.2","s":8255},{"p":"19919.2","s":5000},{"p":"19920.3","s":11967},{"p":"19922.2","s":5000},{"p":"19924.2","s":5000},{"p":"19927.1","s":17129},{"p":"19927.2","s":5000},{"p":"19929","s":20864},{"p":"19929.3","s":5000},{"p":"19929.7","s":24683},{"p":"19930.3","s":750},{"p":"19931.4","s":5000},{"p":"19931.5","s":1},{"p":"19934.2","s":5000},{"p":"19935.4","s":1}],"bids":[{"p":"19901.2","s":5000},{"p":"19900.3","s":3100},{"p":"19900.2","s":5000},{"p":"19899.3","s":2983},{"p":"19899.2","s":6035},{"p":"19897.2","s":5000},{"p":"19895.7","s":5984},{"p":"19895","s":5000},{"p":"19892.9","s":195},{"p":"19892.8","s":5000},{"p":"19889.4","s":5000},{"p":"19889","s":8800},{"p":"19888.5","s":11968},{"p":"19887.1","s":5000},{"p":"19886.4","s":24683},{"p":"19885.7","s":1},{"p":"19883.8","s":5000},{"p":"19880.2","s":5000},{"p":"19878.2","s":5000},{"p":"19876.8","s":1}]}} {"time":1678469222,"time_ms":1678469222982,"channel":"futures.order_book_update","event":"update","result":{"t":1678469222617,"s":"BTC_USD","U":4010424331,"u":4010424361,"b":[{"p":"19860.7","s":5984},{"p":"19858.6","s":5000},{"p":"19845.4","s":20864},{"p":"19859.1","s":0},{"p":"19862.5","s":0},{"p":"19358","s":0},{"p":"19864.5","s":5000},{"p":"19840.7","s":0},{"p":"19863.6","s":3100},{"p":"19839.3","s":0},{"p":"19851.5","s":8800},{"p":"19720","s":0},{"p":"19333","s":0},{"p":"19852.7","s":5000},{"p":"19861.5","s":0},{"p":"19860.6","s":3100},{"p":"19833.6","s":0},{"p":"19360","s":0},{"p":"19863.5","s":5000},{"p":"19736.9","s":0},{"p":"19838.5","s":0},{"p":"19841.3","s":0},{"p":"19858.1","s":3100},{"p":"19710.9","s":0},{"p":"19342","s":0},{"p":"19852.1","s":11967},{"p":"19343","s":0},{"p":"19705","s":0},{"p":"19836.5","s":0},{"p":"19862.6","s":3100},{"p":"19729.6","s":0},{"p":"19849.9","s":5000}],"a":[{"p":"19900.5","s":0},{"p":"19883.1","s":11967},{"p":"19910.9","s":0},{"p":"19897.7","s":5000},{"p":"19875.9","s":5984},{"p":"19899.6","s":0},{"p":"19878","s":4400},{"p":"19877.6","s":0},{"p":"19889.5","s":5000},{"p":"19875.5","s":3100},{"p":"19875.3","s":0},{"p":"19878.5","s":0},{"p":"19895.2","s":0},{"p":"20284.6","s":0},{"p":"19880.7","s":5000},{"p":"19875.4","s":0},{"p":"19985.8","s":0},{"p":"19887.1","s":5000},{"p":"19896","s":1},{"p":"19869.3","s":0},{"p":"19900","s":0},{"p":"19875.6","s":5000},{"p":"19980.6","s":0},{"p":"19885.1","s":5000},{"p":"19877.7","s":5000},{"p":"20000","s":0},{"p":"19892.2","s":8255},{"p":"19886.8","s":0},{"p":"20257.4","s":0},{"p":"20280","s":0},{"p":"20002.5","s":0},{"p":"20263.1","s":0},{"p":"19900.2","s":0}]}} {"time":1678469467,"time_ms":1678469467981,"channel":"futures.candlesticks","event":"update","result":[{"t":1678469460,"v":0,"c":"19896","h":"19896","l":"19896","o":"19896","n":"1m_BTC_USD"}]} +{"time":1756700469,"channel":"futures.pong","event":"","result":null}