Websocket: Prevent panic on Disconnect/Connect (#1471)

* Websocket: Prevent panic on Disconnect/Connect

Previously we'd set the websocket to disconnected when *either* of the
Conn/AuthConn got a disconnect message.

This meant that:
* The other connection was still functioning
* A user would be free to call Connect()
* wsReadData() may not have exited

Any call to Connect would be acceptable, and we'd probably get a panic
from the underlying shared/re-used gorilla websocket:
`panic: runtime error: slice bounds out of range [:13501] with capacity 8192`
when a new wsReadData goro is started and the old tries to ReadMessage
as well, overallocating the buffer.

This wouldn't normally occur because trafficMonitor would see traffic
(on either connection) and then set the websocket to being connected
again.

The solution is to treat a Disconnect on either websocket as a call to
Shutdown the whole websocket, and then trafficMonitor can reconnect it
properly.

Unit Testing for this has been difficult. So far I've had to rely on a
disruption inside websocket's connectionMonitor itself to reproduce the
panic, but from there it's been very consistent.

* Websocket: Fix race on DataHandler during shutdown

Previously we would encounter situations where shutdown would race and
fail TestConnectionMessageErrors, demonstrating that consumers might not
be told why their connection is closing.

* Do not drain DataHandler on dataMonitor shutdown
Consumers should be allowed to process whatever messages were in flight
prior to a socket shutdown

* Ensure all DataHandler messages are sent to ToRoutine before shutdown
This avoids a race where we'd read a message from DataHandler, but then
process a shutdown before trying to relay it. The relay is non-blocking
anyway, so we can trust that we'd pick up the Shutdown during the next
loop.

* Send errors to DataHandler before starting a shutdown
This just reduces the chance of a race on shutdown processing

* Websocket: Log counter of dropped messages
This commit is contained in:
Gareth Kirwan
2024-03-06 03:47:54 +01:00
committed by GitHub
parent 6ccb0e0c2f
commit 9854fe1b7c
2 changed files with 26 additions and 27 deletions

View File

@@ -348,18 +348,10 @@ func (w *Websocket) dataMonitor() {
go func() {
defer func() {
for {
// Bleeds data from the websocket connection if needed
select {
case <-w.DataHandler:
default:
w.setDataMonitorRunning(false)
w.Wg.Done()
return
}
}
w.setDataMonitorRunning(false)
w.Wg.Done()
}()
dropped := 0
for {
select {
case <-w.ShutdownC:
@@ -367,15 +359,16 @@ func (w *Websocket) dataMonitor() {
case d := <-w.DataHandler:
select {
case w.ToRoutine <- d:
case <-w.ShutdownC:
return
default:
log.Warnf(log.WebsocketMgr, "%s exchange backlog in websocket processing detected", w.exchangeName)
select {
case w.ToRoutine <- d:
case <-w.ShutdownC:
return
if dropped != 0 {
log.Infof(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer recovered; %d messages were dropped", w.exchangeName, dropped)
dropped = 0
}
default:
if dropped == 0 {
// If this becomes prone to flapping we could drain the buffer, but that's extreme and we'd like to avoid it if possible
log.Warnf(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer full; dropping messages", w.exchangeName)
}
dropped++
}
}
}
@@ -413,12 +406,15 @@ func (w *Websocket) connectionMonitor() error {
}
select {
case err := <-w.ReadMessageErrors:
w.DataHandler <- err
if IsDisconnectionError(err) {
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
w.setState(disconnected)
if w.IsConnected() {
if shutdownErr := w.Shutdown(); shutdownErr != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, shutdownErr)
}
}
}
w.DataHandler <- err
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()

View File

@@ -326,26 +326,29 @@ func TestConnectionMessageErrors(t *testing.T) {
err = ws.Connect()
require.NoError(t, err, "Connect must not error")
ws.TrafficAlert <- struct{}{}
c := func(tb *assert.CollectT) {
select {
case v := <-ws.ToRoutine:
case v, ok := <-ws.ToRoutine:
require.True(tb, ok, "ToRoutine should not be closed on us")
switch err := v.(type) {
case *websocket.CloseError:
assert.Equal(tb, "SpecialText", err.Text, "Should get correct Close Error")
case error:
assert.ErrorIs(tb, err, errDastardlyReason, "Should get the correct error")
default:
assert.Failf(tb, "Wrong data type sent to ToRoutine", "Got type: %T", err)
}
default:
assert.Fail(tb, "Nothing available on ToRoutine")
}
}
ws.TrafficAlert <- struct{}{}
ws.ReadMessageErrors <- errDastardlyReason
assert.EventuallyWithT(t, c, 900*time.Millisecond, 10*time.Millisecond, "Should get an error down the routine")
assert.EventuallyWithT(t, c, 2*time.Second, 10*time.Millisecond, "Should get an error down the routine")
ws.ReadMessageErrors <- &websocket.CloseError{Code: 1006, Text: "SpecialText"}
assert.EventuallyWithT(t, c, 900*time.Millisecond, 10*time.Millisecond, "Should get an error down the routine")
assert.EventuallyWithT(t, c, 2*time.Second, 10*time.Millisecond, "Should get an error down the routine")
}
func TestWebsocket(t *testing.T) {