GateIO: Fix and standardise ping handling (#2023)

* gateio: standardise ping handlers (cherry-pick)

* Add tests and expand incoming cases for proof

* lint: fix and add commentary on ping delay

* Update exchange/websocket/connection.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/gateio/gateio_websocket.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* linter: fix

* Update exchange/websocket/connection.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* fix test

* glorious: insane catch

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
This commit is contained in:
Ryan O'Hara-Reid
2025-09-10 21:02:09 +10:00
committed by GitHub
parent 6b48bb58cc
commit 16f543666f
9 changed files with 86 additions and 60 deletions

View File

@@ -282,7 +282,7 @@ func (c *connection) ReadMessage() Response {
// method on WebsocketConnection type has been called and can
// be skipped.
select {
case c.readMessageErrors <- fmt.Errorf("%w: %w", err, errConnectionFault):
case c.readMessageErrors <- fmt.Errorf("%w: %w (%q)", err, errConnectionFault, c.URL):
default:
// bypass if there is no receiver, as this stops it returning
// when shutdown is called.

View File

@@ -10,7 +10,6 @@ import (
// PingHandler container for ping handler settings
type PingHandler struct {
Websocket bool
UseGorillaHandler bool
MessageType int
Message []byte

View File

@@ -2274,6 +2274,12 @@ func TestOptionsPositionPushData(t *testing.T) {
}
}
func TestOptionsPongPushData(t *testing.T) {
t.Parallel()
err := e.WsHandleOptionsData(t.Context(), nil, []byte(`{"time":1756700469,"channel":"options.pong","event":"","result":null}`))
require.NoError(t, err)
}
func TestGenerateSubscriptionsSpot(t *testing.T) {
t.Parallel()

View File

@@ -77,28 +77,26 @@ var subscriptionNames = map[string]string{
subscription.AllTradesChannel: spotTradesChannel,
}
var standardMarginAssetTypes = []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin}
var (
standardMarginAssetTypes = []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin}
validPingChannels = []string{optionsPingChannel, futuresPingChannel, spotPingChannel}
)
var errInvalidPingChannel = errors.New("invalid ping channel")
// WsConnectSpot initiates a websocket connection
func (e *Exchange) WsConnectSpot(ctx context.Context, conn websocket.Connection) error {
err := e.CurrencyPairs.IsAssetEnabled(asset.Spot)
if err := e.CurrencyPairs.IsAssetEnabled(asset.Spot); err != nil {
return err
}
if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil {
return err
}
pingHandler, err := getWSPingHandler(spotPingChannel)
if err != nil {
return err
}
err = conn.Dial(ctx, &gws.Dialer{}, http.Header{})
if err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{Channel: spotPingChannel})
if err != nil {
return err
}
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{
Websocket: true,
Delay: time.Second * 15,
Message: pingMessage,
MessageType: gws.TextMessage,
})
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler)
return nil
}
@@ -1008,3 +1006,18 @@ type wsRespAckInspector struct{}
func (wsRespAckInspector) IsFinal(data []byte) bool {
return !strings.Contains(string(data), "ack")
}
func getWSPingHandler(channel string) (websocket.PingHandler, error) {
if !slices.Contains(validPingChannels, channel) {
return websocket.PingHandler{}, fmt.Errorf("%w: %q", errInvalidPingChannel, channel)
}
pingMessage, err := json.Marshal(WsInput{Channel: channel})
if err != nil {
return websocket.PingHandler{}, err
}
return websocket.PingHandler{
Delay: time.Second * 10, // Arbitrary reasonable delay
Message: pingMessage,
MessageType: gws.TextMessage,
}, nil
}

View File

@@ -9,7 +9,6 @@ import (
gws "github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
@@ -44,20 +43,11 @@ func (e *Exchange) WsDeliveryFuturesConnect(ctx context.Context, conn websocket.
if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{
ID: conn.GenerateMessageID(false),
Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message.
Channel: futuresPingChannel,
})
pingHandler, err := getWSPingHandler(futuresPingChannel)
if err != nil {
return err
}
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: gws.PingMessage,
Message: pingMessage,
})
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler)
return nil
}

View File

@@ -69,20 +69,11 @@ func (e *Exchange) WsFuturesConnect(ctx context.Context, conn websocket.Connecti
if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{
ID: conn.GenerateMessageID(false),
Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message.
Channel: futuresPingChannel,
})
pingHandler, err := getWSPingHandler(futuresPingChannel)
if err != nil {
return err
}
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{
Websocket: true,
MessageType: gws.PingMessage,
Delay: time.Second * 15,
Message: pingMessage,
})
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler)
return nil
}
@@ -193,6 +184,8 @@ func (e *Exchange) WsHandleFuturesData(ctx context.Context, conn websocket.Conne
return e.processFuturesPositionsNotification(respRaw)
case futuresAutoOrdersChannel:
return e.processFuturesAutoOrderPushData(respRaw)
case "futures.pong":
return nil
default:
e.Websocket.DataHandler <- websocket.UnhandledMessageWarning{
Message: e.Name + websocket.UnhandledMessage + string(respRaw),

View File

@@ -68,28 +68,17 @@ var defaultOptionsSubscriptions = []string{
// WsOptionsConnect initiates a websocket connection to options websocket endpoints.
func (e *Exchange) WsOptionsConnect(ctx context.Context, conn websocket.Connection) error {
err := e.CurrencyPairs.IsAssetEnabled(asset.Options)
if err := e.CurrencyPairs.IsAssetEnabled(asset.Options); err != nil {
return err
}
if err := conn.Dial(ctx, &gws.Dialer{}, http.Header{}); err != nil {
return err
}
pingHandler, err := getWSPingHandler(optionsPingChannel)
if err != nil {
return err
}
err = conn.Dial(ctx, &gws.Dialer{}, http.Header{})
if err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{
ID: conn.GenerateMessageID(false),
Time: time.Now().Unix(), // TODO: Func for dynamic time as this will be the same time for every ping message.
Channel: optionsPingChannel,
})
if err != nil {
return err
}
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, websocket.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: gws.PingMessage,
Message: pingMessage,
})
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, pingHandler)
return nil
}
@@ -344,6 +333,8 @@ func (e *Exchange) WsHandleOptionsData(ctx context.Context, conn websocket.Conne
return e.processBalancePushData(ctx, respRaw, asset.Options)
case optionsPositionsChannel:
return e.processOptionsPositionPushData(respRaw)
case "options.pong":
return nil
default:
e.Websocket.DataHandler <- websocket.UnhandledMessageWarning{
Message: e.Name + websocket.UnhandledMessage + string(respRaw),

View File

@@ -0,0 +1,33 @@
package gateio
import (
"testing"
"time"
gws "github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
func TestGetWSPingHandler(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
channel string
err error
}{
{optionsPingChannel, nil},
{futuresPingChannel, nil},
{spotPingChannel, nil},
{"dong", errInvalidPingChannel},
} {
got, err := getWSPingHandler(tc.channel)
if tc.err != nil {
require.ErrorIs(t, err, tc.err)
continue
}
require.NoError(t, err)
require.Equal(t, time.Second*10, got.Delay)
require.Equal(t, gws.TextMessage, got.MessageType)
require.Contains(t, string(got.Message), tc.channel)
}
}

View File

@@ -13,3 +13,4 @@
{"time":1678468497,"time_ms":1678468497232,"channel":"futures.order_book","event":"all","result":{"t":1678468497168,"id":4010394406,"contract":"BTC_USD","asks":[{"p":"19909","s":3100},{"p":"19909.1","s":5000},{"p":"19910","s":3100},{"p":"19914.4","s":4400},{"p":"19916.6","s":5000},{"p":"19917.2","s":8255},{"p":"19919.2","s":5000},{"p":"19920.3","s":11967},{"p":"19922.2","s":5000},{"p":"19924.2","s":5000},{"p":"19927.1","s":17129},{"p":"19927.2","s":5000},{"p":"19929","s":20864},{"p":"19929.3","s":5000},{"p":"19929.7","s":24683},{"p":"19930.3","s":750},{"p":"19931.4","s":5000},{"p":"19931.5","s":1},{"p":"19934.2","s":5000},{"p":"19935.4","s":1}],"bids":[{"p":"19901.2","s":5000},{"p":"19900.3","s":3100},{"p":"19900.2","s":5000},{"p":"19899.3","s":2983},{"p":"19899.2","s":6035},{"p":"19897.2","s":5000},{"p":"19895.7","s":5984},{"p":"19895","s":5000},{"p":"19892.9","s":195},{"p":"19892.8","s":5000},{"p":"19889.4","s":5000},{"p":"19889","s":8800},{"p":"19888.5","s":11968},{"p":"19887.1","s":5000},{"p":"19886.4","s":24683},{"p":"19885.7","s":1},{"p":"19883.8","s":5000},{"p":"19880.2","s":5000},{"p":"19878.2","s":5000},{"p":"19876.8","s":1}]}}
{"time":1678469222,"time_ms":1678469222982,"channel":"futures.order_book_update","event":"update","result":{"t":1678469222617,"s":"BTC_USD","U":4010424331,"u":4010424361,"b":[{"p":"19860.7","s":5984},{"p":"19858.6","s":5000},{"p":"19845.4","s":20864},{"p":"19859.1","s":0},{"p":"19862.5","s":0},{"p":"19358","s":0},{"p":"19864.5","s":5000},{"p":"19840.7","s":0},{"p":"19863.6","s":3100},{"p":"19839.3","s":0},{"p":"19851.5","s":8800},{"p":"19720","s":0},{"p":"19333","s":0},{"p":"19852.7","s":5000},{"p":"19861.5","s":0},{"p":"19860.6","s":3100},{"p":"19833.6","s":0},{"p":"19360","s":0},{"p":"19863.5","s":5000},{"p":"19736.9","s":0},{"p":"19838.5","s":0},{"p":"19841.3","s":0},{"p":"19858.1","s":3100},{"p":"19710.9","s":0},{"p":"19342","s":0},{"p":"19852.1","s":11967},{"p":"19343","s":0},{"p":"19705","s":0},{"p":"19836.5","s":0},{"p":"19862.6","s":3100},{"p":"19729.6","s":0},{"p":"19849.9","s":5000}],"a":[{"p":"19900.5","s":0},{"p":"19883.1","s":11967},{"p":"19910.9","s":0},{"p":"19897.7","s":5000},{"p":"19875.9","s":5984},{"p":"19899.6","s":0},{"p":"19878","s":4400},{"p":"19877.6","s":0},{"p":"19889.5","s":5000},{"p":"19875.5","s":3100},{"p":"19875.3","s":0},{"p":"19878.5","s":0},{"p":"19895.2","s":0},{"p":"20284.6","s":0},{"p":"19880.7","s":5000},{"p":"19875.4","s":0},{"p":"19985.8","s":0},{"p":"19887.1","s":5000},{"p":"19896","s":1},{"p":"19869.3","s":0},{"p":"19900","s":0},{"p":"19875.6","s":5000},{"p":"19980.6","s":0},{"p":"19885.1","s":5000},{"p":"19877.7","s":5000},{"p":"20000","s":0},{"p":"19892.2","s":8255},{"p":"19886.8","s":0},{"p":"20257.4","s":0},{"p":"20280","s":0},{"p":"20002.5","s":0},{"p":"20263.1","s":0},{"p":"19900.2","s":0}]}}
{"time":1678469467,"time_ms":1678469467981,"channel":"futures.candlesticks","event":"update","result":[{"t":1678469460,"v":0,"c":"19896","h":"19896","l":"19896","o":"19896","n":"1m_BTC_USD"}]}
{"time":1756700469,"channel":"futures.pong","event":"","result":null}