Okx: Fix panic during shutdown due to race (#1240)

* Okx: Fix panic during shutdown due to race

Error:
```
panic: runtime error: slice bounds out of range [:13340] with capacity 8192
bufio.(*Reader).Read(0x14000038540, {0x1400059cffc?, 0x4?, 0x0?})
        /usr/local/go/src/bufio/bufio.go:250 +0x33c
github.com/gorilla/websocket.(*messageReader).Read(0x1400098bb78, {0x1400059cffc, 0x4, 0x4})
        /Users/gbjk/go/pkg/mod/github.com/gorilla/websocket@v1.5.0/conn.go:1050 +0x208
io.ReadAll({0x12f36c618, 0x1400098bb78})
        /usr/local/go/src/io/io.go:701 +0xe4
io/ioutil.ReadAll(...)
        /usr/local/go/src/io/ioutil/ioutil.go:27
github.com/gorilla/websocket.(*Conn).ReadMessage(0x3?)
        /Users/gbjk/go/pkg/mod/github.com/gorilla/websocket@v1.5.0/conn.go:1097 +0x54
github.com/thrasher-corp/gocryptotrader/exchanges/stream.(*WebsocketConnection).ReadMessage(0x140006622d0)
        /Users/gbjk/go/pkg/mod/github.com/gbjk/gocryptotrader@v0.0.0-20230619070715-ae6f283f6be6/exchanges/stream/websocket_connection.go:217 +0x30
github.com/thrasher-corp/gocryptotrader/exchanges/okx.(*Okx).wsFunnelConnectionData(0x140019321e0?, {0x106909450, 0x140006622d0})
        /Users/gbjk/go/pkg/mod/github.com/gbjk/gocryptotrader@v0.0.0-20230619070715-ae6f283f6be6/exchanges/okx/okx_websocket.go:346 +0x94
created by github.com/thrasher-corp/gocryptotrader/exchanges/okx.(*Okx).WsConnect
        /Users/gbjk/go/pkg/mod/github.com/gbjk/gocryptotrader@v0.0.0-20230619070715-ae6f283f6be6/exchanges/okx/okx_websocket.go:234 +0x134
exit status 2
```
This happens when there's a race in calls to bufio because it over-reads. See [this comment](https://github.com/golang/go/issues/42289#issuecomment-723393783)
Detected using go -race:
```
WARNING: DATA RACE
Read at 0x00c000818bc0 by goroutine 2156:
  github.com/gorilla/websocket.(*Conn).NextReader()
      /Users/gbjk/go/pkg/mod/github.com/gorilla/websocket@v1.5.0/conn.go:1000 +0x38
  github.com/gorilla/websocket.(*Conn).ReadMessage()
      /Users/gbjk/go/pkg/mod/github.com/gorilla/websocket@v1.5.0/conn.go:1093 +0x28
  github.com/thrasher-corp/gocryptotrader/exchanges/stream.(*WebsocketConnection).ReadMessage()
      /Users/gbjk/go/pkg/mod/github.com/gbjk/gocryptotrader@v0.0.0-20230619070715-ae6f283f6be6/exchanges/stream/websocket_connection.go:217 +0x44
  github.com/thrasher-corp/gocryptotrader/exchanges/okx.(*Okx).wsFunnelConnectionData()
```

Because we started a new wsFunnelConnectionData for each re-connect.

This bug might apply to other exchanges.

* Okx: Fix websocket waitgroup going negative

Move the waitgroup additions to the actual places that use them

* Okx: Add nolint for revive

* Okx: Move wg Adds to outside goros

There is a risk of a race condition if we let the goros Add themselves.

* Okx: Simplify websocket reading

This fixes the issue that the WsRead and Multiplexer were intrinsically
linked to the websocket, even though they need to survive both
disconnects and Disable/Enables.

Messages are now handled in a goro, which means they might not be
sequential, but there's a very high chance that messages of the same
codepath will be handled sequentially. So orderbook, ticked and order
messages should be sequential

* Okx: Switch to blocking processing of ws msgs

* Okx: Remove nolint from Setup

Actioning a review comment: @gloriousCode prefers to avoid having to nolint this in favour of
a func call return.

* Okx: Remove redundant Wg use inside WsReadData

* Okx: Fix WsMultiplexer Re-Run() shutdown
This commit is contained in:
Gareth Kirwan
2023-07-17 04:21:28 +01:00
committed by GitHub
parent da38b4bdf7
commit ce3d29f5d5
3 changed files with 42 additions and 50 deletions

View File

@@ -3171,6 +3171,7 @@ type wsRequestDataChannelsMultiplexer struct {
Register chan *wsRequestInfo
Unregister chan string
Message chan *wsIncomingData
shutdown chan bool
}
// wsSubscriptionParameters represents toggling boolean values for subscription parameters.

View File

@@ -27,8 +27,6 @@ import (
var (
errInvalidChecksum = errors.New("invalid checksum")
// responseStream a channel thought which the data coming from the two websocket connection will go through.
responseStream = make(chan stream.Response)
)
var (
@@ -226,10 +224,8 @@ func (ok *Okx) WsConnect() error {
if err != nil {
return err
}
ok.Websocket.Wg.Add(2)
go ok.wsFunnelConnectionData(ok.Websocket.Conn)
go ok.WsReadData()
go ok.WsResponseMultiplexer.Run()
ok.Websocket.Wg.Add(1)
go ok.wsReadData(ok.Websocket.Conn)
if ok.Verbose {
log.Debugf(log.ExchangeSys, "Successful connection to %v\n",
ok.Websocket.GetWebsocketURL())
@@ -262,7 +258,7 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error {
return fmt.Errorf("%v Websocket connection %v error. Error %v", ok.Name, okxAPIWebsocketPrivateURL, err)
}
ok.Websocket.Wg.Add(1)
go ok.wsFunnelConnectionData(ok.Websocket.AuthConn)
go ok.wsReadData(ok.Websocket.AuthConn)
ok.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{
MessageType: websocket.TextMessage,
Message: pingMsg,
@@ -335,16 +331,17 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error {
}
}
// wsFunnelConnectionData receives data from multiple connection and pass the data
// to wsRead through a channel responseStream
func (ok *Okx) wsFunnelConnectionData(ws stream.Connection) {
// wsReadData sends msgs from public and auth websockets to data handler
func (ok *Okx) wsReadData(ws stream.Connection) {
defer ok.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
if resp.Raw == nil {
return
}
responseStream <- stream.Response{Raw: resp.Raw}
if err := ok.WsHandleData(resp.Raw); err != nil {
ok.Websocket.DataHandler <- err
}
}
}
@@ -531,34 +528,6 @@ func (ok *Okx) handleSubscription(operation string, subscriptions []stream.Chann
return nil
}
// WsReadData read coming messages thought the websocket connection and process the data.
func (ok *Okx) WsReadData() {
defer ok.Websocket.Wg.Done()
for {
select {
case <-ok.Websocket.ShutdownC:
select {
case resp := <-responseStream:
err := ok.WsHandleData(resp.Raw)
if err != nil {
select {
case ok.Websocket.DataHandler <- err:
default:
log.Errorf(log.WebsocketMgr, "%s websocket handle data error: %v", ok.Name, err)
}
}
default:
}
return
case resp := <-responseStream:
err := ok.WsHandleData(resp.Raw)
if err != nil {
ok.Websocket.DataHandler <- err
}
}
}
}
// WsHandleData will read websocket raw data and pass to appropriate handler
func (ok *Okx) WsHandleData(respRaw []byte) error {
var resp wsIncomingData
@@ -1675,6 +1644,10 @@ func (m *wsRequestDataChannelsMultiplexer) Run() {
tickerData := time.NewTicker(time.Second)
for {
select {
case <-m.shutdown:
// We've consumed the shutdown, so create a new chan for subsequent runs
m.shutdown = make(chan bool)
return
case <-tickerData.C:
for x, myChan := range m.WsResponseChannelsMap {
if myChan == nil {
@@ -1709,6 +1682,12 @@ func (m *wsRequestDataChannelsMultiplexer) Run() {
}
}
// Shutdown causes the multiplexer to exit its Run loop
// All channels are left open, but websocket shutdown first will ensure no more messages block on multiplexer reading
func (m *wsRequestDataChannelsMultiplexer) Shutdown() {
close(m.shutdown)
}
// wsChannelSubscription sends a subscription or unsubscription request for different channels through the websocket stream.
func (ok *Okx) wsChannelSubscription(operation, channel string, assetType asset.Item, pair currency.Pair, tInstrumentType, tInstrumentID, tUnderlying bool) error {
if operation != operationSubscribe && operation != operationUnsubscribe {

View File

@@ -174,16 +174,14 @@ func (ok *Okx) SetDefaults() {
// Setup takes in the supplied exchange configuration details and sets params
func (ok *Okx) Setup(exch *config.Exchange) error {
err := exch.Validate()
if err != nil {
if err := exch.Validate(); err != nil {
return err
}
if !exch.Enabled {
ok.SetEnabled(false)
return nil
}
err = ok.SetupDefaults(exch)
if err != nil {
if err := ok.SetupDefaults(exch); err != nil {
return err
}
@@ -192,13 +190,14 @@ func (ok *Okx) Setup(exch *config.Exchange) error {
Register: make(chan *wsRequestInfo),
Unregister: make(chan string),
Message: make(chan *wsIncomingData),
shutdown: make(chan bool),
}
wsRunningEndpoint, err := ok.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
err = ok.Websocket.Setup(&stream.WebsocketSetup{
if err := ok.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: okxAPIWebsocketPublicURL,
RunningURL: wsRunningEndpoint,
@@ -211,20 +210,21 @@ func (ok *Okx) Setup(exch *config.Exchange) error {
OrderbookBufferConfig: buffer.Config{
Checksum: ok.CalculateUpdateOrderbookChecksum,
},
})
if err != nil {
}); err != nil {
return err
}
err = ok.Websocket.SetupNewConnection(stream.ConnectionSetup{
go ok.WsResponseMultiplexer.Run()
if err := ok.Websocket.SetupNewConnection(stream.ConnectionSetup{
URL: okxAPIWebsocketPublicURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: okxWebsocketResponseMaxLimit,
RateLimit: 500,
})
if err != nil {
}); err != nil {
return err
}
return ok.Websocket.SetupNewConnection(stream.ConnectionSetup{
URL: okxAPIWebsocketPrivateURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
@@ -277,6 +277,18 @@ func (ok *Okx) Run(ctx context.Context) {
}
}
// Shutdown calls Base.Shutdown and then shuts down the response multiplexer
func (ok *Okx) Shutdown() error {
if err := ok.Base.Shutdown(); err != nil {
return err
}
// Must happen after the Websocket shutdown in Base.Shutdown, so there are no new blocking writes to the multiplexer
ok.WsResponseMultiplexer.Shutdown()
return nil
}
// GetServerTime returns the current exchange server time.
func (ok *Okx) GetServerTime(ctx context.Context, _ asset.Item) (time.Time, error) {
return ok.GetSystemTime(ctx)