From 9854fe1b7c825404562c710a410f8eb95030d873 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Wed, 6 Mar 2024 03:47:54 +0100 Subject: [PATCH] Websocket: Prevent panic on Disconnect/Connect (#1471) * Websocket: Prevent panic on Disconnect/Connect Previously we'd set the websocket to disconnected when *either* of the Conn/AuthConn got a disconnect message. This meant that: * The other connection was still functioning * A user would be free to call Connect() * wsReadData() may not have exited Any call to Connect would be acceptable, and we'd probably get a panic from the underlying shared/re-used gorilla websocket: `panic: runtime error: slice bounds out of range [:13501] with capacity 8192` when a new wsReadData goro is started and the old tries to ReadMessage as well, overallocating the buffer. This wouldn't normally occur because trafficMonitor would see traffic (on either connection) and then set the websocket to being connected again. The solution is to treat a Disconnect on either websocket as a call to Shutdown the whole websocket, and then trafficMonitor can reconnect it properly. Unit Testing for this has been difficult. So far I've had to rely on a disruption inside websocket's connectionMonitor itself to reproduce the panic, but from there it's been very consistent. * Websocket: Fix race on DataHandler during shutdown Previously we would encounter situations where shutdown would race and fail TestConnectionMessageErrors, demonstrating that consumers might not be told why their connection is closing. * Do not drain DataHandler on dataMonitor shutdown Consumers should be allowed to process whatever messages were in flight prior to a socket shutdown * Ensure all DataHandler messages are sent to ToRoutine before shutdown This avoids a race where we'd read a message from DataHandler, but then process a shutdown before trying to relay it. The relay is non-blocking anyway, so we can trust that we'd pick up the Shutdown during the next loop. * Send errors to DataHandler before starting a shutdown This just reduces the chance of a race on shutdown processing * Websocket: Log counter of dropped messages --- exchanges/stream/websocket.go | 40 ++++++++++++++---------------- exchanges/stream/websocket_test.go | 13 ++++++---- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 404d76d8..e7585e94 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -348,18 +348,10 @@ func (w *Websocket) dataMonitor() { go func() { defer func() { - for { - // Bleeds data from the websocket connection if needed - select { - case <-w.DataHandler: - default: - w.setDataMonitorRunning(false) - w.Wg.Done() - return - } - } + w.setDataMonitorRunning(false) + w.Wg.Done() }() - + dropped := 0 for { select { case <-w.ShutdownC: @@ -367,15 +359,16 @@ func (w *Websocket) dataMonitor() { case d := <-w.DataHandler: select { case w.ToRoutine <- d: - case <-w.ShutdownC: - return - default: - log.Warnf(log.WebsocketMgr, "%s exchange backlog in websocket processing detected", w.exchangeName) - select { - case w.ToRoutine <- d: - case <-w.ShutdownC: - return + if dropped != 0 { + log.Infof(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer recovered; %d messages were dropped", w.exchangeName, dropped) + dropped = 0 } + default: + if dropped == 0 { + // If this becomes prone to flapping we could drain the buffer, but that's extreme and we'd like to avoid it if possible + log.Warnf(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer full; dropping messages", w.exchangeName) + } + dropped++ } } } @@ -413,12 +406,15 @@ func (w *Websocket) connectionMonitor() error { } select { case err := <-w.ReadMessageErrors: + w.DataHandler <- err if IsDisconnectionError(err) { log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err) - w.setState(disconnected) + if w.IsConnected() { + if shutdownErr := w.Shutdown(); shutdownErr != nil { + log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, shutdownErr) + } + } } - - w.DataHandler <- err case <-timer.C: if !w.IsConnecting() && !w.IsConnected() { err := w.Connect() diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index 0d4e9c02..5e4891f8 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -326,26 +326,29 @@ func TestConnectionMessageErrors(t *testing.T) { err = ws.Connect() require.NoError(t, err, "Connect must not error") - ws.TrafficAlert <- struct{}{} - c := func(tb *assert.CollectT) { select { - case v := <-ws.ToRoutine: + case v, ok := <-ws.ToRoutine: + require.True(tb, ok, "ToRoutine should not be closed on us") switch err := v.(type) { case *websocket.CloseError: assert.Equal(tb, "SpecialText", err.Text, "Should get correct Close Error") case error: assert.ErrorIs(tb, err, errDastardlyReason, "Should get the correct error") + default: + assert.Failf(tb, "Wrong data type sent to ToRoutine", "Got type: %T", err) } default: + assert.Fail(tb, "Nothing available on ToRoutine") } } + ws.TrafficAlert <- struct{}{} ws.ReadMessageErrors <- errDastardlyReason - assert.EventuallyWithT(t, c, 900*time.Millisecond, 10*time.Millisecond, "Should get an error down the routine") + assert.EventuallyWithT(t, c, 2*time.Second, 10*time.Millisecond, "Should get an error down the routine") ws.ReadMessageErrors <- &websocket.CloseError{Code: 1006, Text: "SpecialText"} - assert.EventuallyWithT(t, c, 900*time.Millisecond, 10*time.Millisecond, "Should get an error down the routine") + assert.EventuallyWithT(t, c, 2*time.Second, 10*time.Millisecond, "Should get an error down the routine") } func TestWebsocket(t *testing.T) {