websocket/exchanges: populate context before multi connection upgrade (#1933)

* websocket/exchanges: populate context before multi connection upgrade

* fix test

* linter: fix

* gk: dial

* gk: nits rm param names

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2025-06-17 13:43:00 +10:00
committed by GitHub
parent 2958e64afe
commit 3e80f1b9e5
64 changed files with 1160 additions and 1124 deletions

View File

@@ -41,10 +41,11 @@ func TestMain(m *testing.M) {
}
}
b.setupOrderbookManager()
ctx := context.Background()
b.setupOrderbookManager(ctx)
b.Websocket.DataHandler = sharedtestvalues.GetWebsocketInterfaceChannelOverride()
log.Printf(sharedtestvalues.LiveTesting, b.Name)
if err := b.UpdateTradablePairs(context.Background(), true); err != nil {
if err := b.UpdateTradablePairs(ctx, true); err != nil {
log.Fatal("Binance setup error", err)
}

View File

@@ -29,8 +29,9 @@ func TestMain(m *testing.M) {
log.Fatal(err)
}
b.setupOrderbookManager()
if err := b.UpdateTradablePairs(context.Background(), true); err != nil {
ctx := context.Background()
b.setupOrderbookManager(ctx)
if err := b.UpdateTradablePairs(ctx, true); err != nil {
log.Fatal(err)
}

View File

@@ -2092,7 +2092,7 @@ func TestWsDepthUpdate(t *testing.T) {
t.Parallel()
b := new(Binance) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
b.setupOrderbookManager()
b.setupOrderbookManager(t.Context())
seedLastUpdateID := int64(161)
book := OrderBook{
Asks: []OrderbookItem{
@@ -2454,7 +2454,7 @@ func TestProcessOrderbookUpdate(t *testing.T) {
t.Parallel()
b := new(Binance) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
b.setupOrderbookManager()
b.setupOrderbookManager(t.Context())
p := currency.NewBTCUSDT()
var depth WebsocketDepthStream
err := json.Unmarshal(websocketDepthUpdate, &depth)

View File

@@ -51,6 +51,7 @@ var (
// WsConnect initiates a websocket connection
func (b *Binance) WsConnect() error {
ctx := context.TODO()
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return websocket.ErrWebsocketNotEnabled
}
@@ -60,7 +61,7 @@ func (b *Binance) WsConnect() error {
dialer.Proxy = http.ProxyFromEnvironment
var err error
if b.Websocket.CanUseAuthenticatedEndpoints() {
listenKey, err = b.GetWsAuthStreamKey(context.TODO())
listenKey, err = b.GetWsAuthStreamKey(ctx)
if err != nil {
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
log.Errorf(log.ExchangeSys,
@@ -78,7 +79,7 @@ func (b *Binance) WsConnect() error {
}
}
err = b.Websocket.Conn.Dial(&dialer, http.Header{})
err = b.Websocket.Conn.Dial(ctx, &dialer, http.Header{})
if err != nil {
return fmt.Errorf("%v - Unable to connect to Websocket. Error: %s",
b.Name,
@@ -86,7 +87,7 @@ func (b *Binance) WsConnect() error {
}
if b.Websocket.CanUseAuthenticatedEndpoints() {
go b.KeepAuthKeyAlive()
go b.KeepAuthKeyAlive(ctx)
}
b.Websocket.Conn.SetupPingHandler(request.Unset, websocket.PingHandler{
@@ -98,11 +99,11 @@ func (b *Binance) WsConnect() error {
b.Websocket.Wg.Add(1)
go b.wsReadData()
b.setupOrderbookManager()
b.setupOrderbookManager(ctx)
return nil
}
func (b *Binance) setupOrderbookManager() {
func (b *Binance) setupOrderbookManager(ctx context.Context) {
if b.obm == nil {
b.obm = &orderbookManager{
state: make(map[currency.Code]map[currency.Code]map[asset.Item]*update),
@@ -123,13 +124,13 @@ func (b *Binance) setupOrderbookManager() {
for range maxWSOrderbookWorkers {
// 10 workers for synchronising book
b.SynchroniseWebsocketOrderbook()
b.SynchroniseWebsocketOrderbook(ctx)
}
}
// KeepAuthKeyAlive will continuously send messages to
// keep the WS auth key active
func (b *Binance) KeepAuthKeyAlive() {
func (b *Binance) KeepAuthKeyAlive(ctx context.Context) {
b.Websocket.Wg.Add(1)
defer b.Websocket.Wg.Done()
ticks := time.NewTicker(time.Minute * 30)
@@ -139,7 +140,7 @@ func (b *Binance) KeepAuthKeyAlive() {
ticks.Stop()
return
case <-ticks.C:
err := b.MaintainWsAuthStreamKey(context.TODO())
err := b.MaintainWsAuthStreamKey(ctx)
if err != nil {
b.Websocket.DataHandler <- err
log.Warnf(log.ExchangeSys, "%s - Unable to renew auth websocket token, may experience shutdown", b.Name)
@@ -558,16 +559,18 @@ func formatChannelInterval(s *subscription.Subscription) string {
// Subscribe subscribes to a set of channels
func (b *Binance) Subscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, func(l subscription.List) error { return b.manageSubs(wsSubscribeMethod, l) }, 50)
ctx := context.TODO()
return b.ParallelChanOp(ctx, channels, func(ctx context.Context, l subscription.List) error { return b.manageSubs(ctx, wsSubscribeMethod, l) }, 50)
}
// Unsubscribe unsubscribes from a set of channels
func (b *Binance) Unsubscribe(channels subscription.List) error {
return b.ParallelChanOp(channels, func(l subscription.List) error { return b.manageSubs(wsUnsubscribeMethod, l) }, 50)
ctx := context.TODO()
return b.ParallelChanOp(ctx, channels, func(ctx context.Context, l subscription.List) error { return b.manageSubs(ctx, wsUnsubscribeMethod, l) }, 50)
}
// manageSubs subscribes or unsubscribes from a list of subscriptions
func (b *Binance) manageSubs(op string, subs subscription.List) error {
func (b *Binance) manageSubs(ctx context.Context, op string, subs subscription.List) error {
if op == wsSubscribeMethod {
if err := b.Websocket.AddSubscriptions(b.Websocket.Conn, subs...); err != nil { // Note: AddSubscription will set state to subscribing
return err
@@ -584,7 +587,7 @@ func (b *Binance) manageSubs(op string, subs subscription.List) error {
Params: subs.QualifiedChannels(),
}
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, req.ID, req)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(ctx, request.Unset, req.ID, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
@@ -699,7 +702,7 @@ func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error {
// SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair
// asset
func (b *Binance) SynchroniseWebsocketOrderbook() {
func (b *Binance) SynchroniseWebsocketOrderbook(ctx context.Context) {
b.Websocket.Wg.Add(1)
go func() {
defer b.Websocket.Wg.Done()
@@ -714,7 +717,7 @@ func (b *Binance) SynchroniseWebsocketOrderbook() {
}
}
case j := <-b.obm.jobs:
err := b.processJob(j.Pair)
err := b.processJob(ctx, j.Pair)
if err != nil {
log.Errorf(log.WebsocketMgr,
"%s processing websocket orderbook error %v",
@@ -726,8 +729,8 @@ func (b *Binance) SynchroniseWebsocketOrderbook() {
}
// processJob fetches and processes orderbook updates
func (b *Binance) processJob(p currency.Pair) error {
err := b.SeedLocalCache(context.TODO(), p)
func (b *Binance) processJob(ctx context.Context, p currency.Pair) error {
err := b.SeedLocalCache(ctx, p)
if err != nil {
return fmt.Errorf("%s %s seeding local cache for orderbook error: %v",
p, asset.Spot, err)