From 66fbd43cf0843c1d08ab1aca52a260d567adbe77 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Fri, 3 Sep 2021 17:21:23 +1000 Subject: [PATCH] websocket: fix deadlock when enabling/disabling via gctrpc (#754) * websocket: select case error if no receiver, add in functionality to reset to initial sync for books on a new websocket connection * websocket: fix tests * websocket: log error instead of losing it * websocket: fix whoopsie * exchanges: fix test * websocket: force requirement of specific functionality * exchanges: fix tests * exchanges/websocket: move waitgroup add before scheduling across exchanges * gateio: add feature subscribe * bithumb/bittrex: include connection state reset, fix reconnection bug for Bithumb * huobi: Add listen to shutdown to routine so it actually returns and stops being a naughty boy. * huobi: add missing waitgroup add. * exchanges: bleed comms channels * binance: fix reconnection bug with buffer * bithumb: fix reconnection bug with ws orderbook when websocket is diabled/enabled * bithumb/bittrex: add bleeders for ws orderbook jobs * linter: fix * kraken: reduce code block from double assertion * This bug ruined my day. * glorious: error checking * zb: add correct path for websocket connection * exchange: Add verbosity when config conflicts and overwrites default values * zb: add https to path * exchanges: glorious nits * stream: Add checkAndSetMonitoring to reduce potential routine bundling, increase timeout and check state in tests * stream: remove check that is not needed. * glorious: nits addr. * lint: test --- exchanges/binance/binance_test.go | 3 + exchanges/binance/binance_types.go | 9 +- exchanges/binance/binance_websocket.go | 127 +++++++++++++++--- exchanges/bitfinex/bitfinex_websocket.go | 29 +++- exchanges/bithumb/bithumb_websocket.go | 21 ++- exchanges/bithumb/bithumb_websocket_types.go | 9 +- exchanges/bithumb/bithumb_ws_orderbook.go | 123 ++++++++++++++--- exchanges/bitmex/bitmex_websocket.go | 2 +- exchanges/bitstamp/bitstamp_websocket.go | 4 +- exchanges/bittrex/bittrex_websocket.go | 3 +- exchanges/bittrex/bittrex_ws_orderbook.go | 28 +++- exchanges/btcmarkets/btcmarkets_websocket.go | 3 +- exchanges/btse/btse_websocket.go | 3 +- .../coinbasepro/coinbasepro_websocket.go | 7 +- exchanges/coinbene/coinbene_websocket.go | 3 +- exchanges/coinut/coinut_websocket.go | 3 +- exchanges/exchange.go | 96 +++++++++++-- exchanges/exchange_test.go | 70 ++++++++-- exchanges/exchange_types.go | 17 ++- exchanges/ftx/ftx_websocket.go | 3 +- exchanges/gateio/gateio_websocket.go | 3 +- exchanges/gateio/gateio_wrapper.go | 1 + exchanges/gemini/gemini_websocket.go | 18 ++- exchanges/hitbtc/hitbtc_websocket.go | 4 +- exchanges/huobi/huobi_websocket.go | 33 ++++- exchanges/kraken/kraken.go | 28 ++-- exchanges/kraken/kraken_test.go | 9 ++ exchanges/kraken/kraken_websocket.go | 19 ++- exchanges/okgroup/okgroup_websocket.go | 3 +- exchanges/poloniex/poloniex_websocket.go | 3 +- exchanges/stream/websocket.go | 61 ++++++--- exchanges/stream/websocket_connection.go | 11 +- exchanges/stream/websocket_test.go | 38 +++++- exchanges/zb/zb.go | 2 +- exchanges/zb/zb_websocket.go | 5 +- 35 files changed, 636 insertions(+), 165 deletions(-) diff --git a/exchanges/binance/binance_test.go b/exchanges/binance/binance_test.go index 2168ac09..325e5f46 100644 --- a/exchanges/binance/binance_test.go +++ b/exchanges/binance/binance_test.go @@ -2152,10 +2152,13 @@ func TestWsDepthUpdate(t *testing.T) { t.Error(err) } + b.obm.state[currency.BTC][currency.USDT][asset.Spot].fetchingBook = false + ob, err := b.Websocket.Orderbook.GetOrderbook(p, asset.Spot) if err != nil { t.Fatal(err) } + if exp, got := seedLastUpdateID, ob.LastUpdateID; got != exp { t.Fatalf("Unexpected Last update id of orderbook for old update. Exp: %d, got: %d", exp, got) } diff --git a/exchanges/binance/binance_types.go b/exchanges/binance/binance_types.go index cf7b9176..242d983a 100644 --- a/exchanges/binance/binance_types.go +++ b/exchanges/binance/binance_types.go @@ -827,10 +827,11 @@ type orderbookManager struct { } type update struct { - buffer chan *WebsocketDepthStream - fetchingBook bool - initialSync bool - lastUpdateID int64 + buffer chan *WebsocketDepthStream + fetchingBook bool + initialSync bool + needsFetchingBook bool + lastUpdateID int64 } // job defines a synchonisation job that tells a go routine to fetch an diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index e5058e17..f030a066 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -86,7 +86,9 @@ func (b *Binance) WsConnect() error { Delay: pingDelay, }) + b.Websocket.Wg.Add(1) go b.wsReadData() + b.setupOrderbookManager() return nil } @@ -97,12 +99,23 @@ func (b *Binance) setupOrderbookManager() { state: make(map[currency.Code]map[currency.Code]map[asset.Item]*update), jobs: make(chan job, maxWSOrderbookJobs), } - - for i := 0; i < maxWSOrderbookWorkers; i++ { - // 10 workers for synchronising book - b.SynchroniseWebsocketOrderbook() + } else { + // Change state on reconnect for initial sync. + for _, m1 := range b.obm.state { + for _, m2 := range m1 { + for _, update := range m2 { + update.initialSync = true + update.needsFetchingBook = true + update.lastUpdateID = 0 + } + } } } + + for i := 0; i < maxWSOrderbookWorkers; i++ { + // 10 workers for synchronising book + b.SynchroniseWebsocketOrderbook() + } } // KeepAuthKeyAlive will continuously send messages to @@ -129,7 +142,6 @@ func (b *Binance) KeepAuthKeyAlive() { // wsReadData receives and passes on websocket messages for processing func (b *Binance) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { @@ -662,20 +674,59 @@ func (b *Binance) ProcessUpdate(cp currency.Pair, a asset.Item, ws *WebsocketDep // applyBufferUpdate applies the buffer to the orderbook or initiates a new // orderbook sync by the REST protocol which is off handed to go routine. func (b *Binance) applyBufferUpdate(pair currency.Pair) error { - fetching, err := b.obm.checkIsFetchingBook(pair) + fetching, needsFetching, err := b.obm.handleFetchingBook(pair) if err != nil { return err } if fetching { return nil } - - recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) - if err != nil || (recent.Asks == nil && recent.Bids == nil) { + if needsFetching { + if b.Verbose { + log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", b.Name) + } return b.obm.fetchBookViaREST(pair) } - return b.obm.checkAndProcessUpdate(b.ProcessUpdate, pair, recent) + recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error fetching recent orderbook when applying updates: %s\n", + b.Name, + err) + } + + if recent != nil { + err = b.obm.checkAndProcessUpdate(b.ProcessUpdate, pair, recent) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error processing update - initiating new orderbook sync via REST: %s\n", + b.Name, + err) + err = b.obm.setNeedsFetchingBook(pair) + if err != nil { + return err + } + } + } + + return nil +} + +// setNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + state.needsFetchingBook = true + return nil } // SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair @@ -686,6 +737,14 @@ func (b *Binance) SynchroniseWebsocketOrderbook() { defer b.Websocket.Wg.Done() for { select { + case <-b.Websocket.ShutdownC: + for { + select { + case <-b.obm.jobs: + default: + return + } + } case j := <-b.obm.jobs: err := b.processJob(j.Pair) if err != nil { @@ -693,8 +752,6 @@ func (b *Binance) SynchroniseWebsocketOrderbook() { "%s processing websocket orderbook error %v", b.Name, err) } - case <-b.Websocket.ShutdownC: - return } } }() @@ -762,9 +819,10 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency. state = &update{ // 100ms update assuming we might have up to a 10 second delay. // There could be a potential 100 updates for the currency. - buffer: make(chan *WebsocketDepthStream, maxWSUpdateBuffer), - fetchingBook: false, - initialSync: true, + buffer: make(chan *WebsocketDepthStream, maxWSUpdateBuffer), + fetchingBook: false, + initialSync: true, + needsFetchingBook: true, } m2[a] = state } @@ -788,19 +846,30 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency. } } -// checkIsFetchingBook checks status if the book is currently being via the REST -// protocol. -func (o *orderbookManager) checkIsFetchingBook(pair currency.Pair) (bool, error) { +// handleFetchingBook checks if a full book is being fetched or needs to be +// fetched +func (o *orderbookManager) handleFetchingBook(pair currency.Pair) (fetching, needsFetching bool, err error) { o.Lock() defer o.Unlock() state, ok := o.state[pair.Base][pair.Quote][asset.Spot] if !ok { return false, + false, fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s", pair, asset.Spot) } - return state.fetchingBook, nil + + if state.fetchingBook { + return true, false, nil + } + + if state.needsFetchingBook { + state.needsFetchingBook = false + state.fetchingBook = true + return false, true, nil + } + return false, false, nil } // stopFetchingBook completes the book fetching. @@ -960,5 +1029,25 @@ bufferEmpty: // disable rest orderbook synchronisation _ = o.stopFetchingBook(pair) _ = o.completeInitialSync(pair) + _ = o.stopNeedsFetchingBook(pair) + return nil +} + +// stopNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + if !state.needsFetchingBook { + return fmt.Errorf("needs fetching book already set to false for %s %s", + pair, + asset.Spot) + } + state.needsFetchingBook = false return nil } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index beff583f..c511e1fd 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -52,6 +52,7 @@ func (b *Bitfinex) WsConnect() error { err) } + b.Websocket.Wg.Add(1) go b.wsReadData(b.Websocket.Conn) if b.Websocket.CanUseAuthenticatedEndpoints() { @@ -63,6 +64,7 @@ func (b *Bitfinex) WsConnect() error { err) b.Websocket.SetCanUseAuthenticatedEndpoints(false) } + b.Websocket.Wg.Add(1) go b.wsReadData(b.Websocket.AuthConn) err = b.WsSendAuth() if err != nil { @@ -74,13 +76,13 @@ func (b *Bitfinex) WsConnect() error { } } + b.Websocket.Wg.Add(1) go b.WsDataHandler() return nil } // wsReadData receives and passes on websocket messages for processing func (b *Bitfinex) wsReadData(ws stream.Connection) { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { resp := ws.ReadMessage() @@ -93,19 +95,34 @@ func (b *Bitfinex) wsReadData(ws stream.Connection) { // WsDataHandler handles data from wsReadData func (b *Bitfinex) WsDataHandler() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { select { - case resp := <-comms: - if resp.Type == websocket.TextMessage { + case <-b.Websocket.ShutdownC: + select { + case resp := <-comms: err := b.wsHandleData(resp.Raw) if err != nil { - b.Websocket.DataHandler <- err + select { + case b.Websocket.DataHandler <- err: + default: + log.Errorf(log.WebsocketMgr, + "%s websocket handle data error: %v", + b.Name, + err) + } } + default: } - case <-b.Websocket.ShutdownC: return + case resp := <-comms: + if resp.Type != websocket.TextMessage { + continue + } + err := b.wsHandleData(resp.Raw) + if err != nil { + b.Websocket.DataHandler <- err + } } } } diff --git a/exchanges/bithumb/bithumb_websocket.go b/exchanges/bithumb/bithumb_websocket.go index 96285cd4..c4994d61 100644 --- a/exchanges/bithumb/bithumb_websocket.go +++ b/exchanges/bithumb/bithumb_websocket.go @@ -41,24 +41,31 @@ func (b *Bithumb) WsConnect() error { b.Name, err) } + + b.Websocket.Wg.Add(1) go b.wsReadData() + b.setupOrderbookManager() return nil } // wsReadData receives and passes on websocket messages for processing func (b *Bithumb) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { - resp := b.Websocket.Conn.ReadMessage() - if resp.Raw == nil { + select { + case <-b.Websocket.ShutdownC: return - } - err := b.wsHandleData(resp.Raw) - if err != nil { - b.Websocket.DataHandler <- err + default: + resp := b.Websocket.Conn.ReadMessage() + if resp.Raw == nil { + return + } + err := b.wsHandleData(resp.Raw) + if err != nil { + b.Websocket.DataHandler <- err + } } } } diff --git a/exchanges/bithumb/bithumb_websocket_types.go b/exchanges/bithumb/bithumb_websocket_types.go index 45814945..9ff0ff8c 100644 --- a/exchanges/bithumb/bithumb_websocket_types.go +++ b/exchanges/bithumb/bithumb_websocket_types.go @@ -87,10 +87,11 @@ type orderbookManager struct { } type update struct { - buffer chan *WsOrderbooks - fetchingBook bool - initialSync bool - lastUpdated time.Time + buffer chan *WsOrderbooks + fetchingBook bool + initialSync bool + needsFetchingBook bool + lastUpdated time.Time } // job defines a synchonisation job that tells a go routine to fetch an diff --git a/exchanges/bithumb/bithumb_ws_orderbook.go b/exchanges/bithumb/bithumb_ws_orderbook.go index 9e16ba89..cd9a7c54 100644 --- a/exchanges/bithumb/bithumb_ws_orderbook.go +++ b/exchanges/bithumb/bithumb_ws_orderbook.go @@ -67,7 +67,7 @@ func (b *Bithumb) UpdateLocalBuffer(wsdp *WsOrderbooks) (bool, error) { // applyBufferUpdate applies the buffer to the orderbook or initiates a new // orderbook sync by the REST protocol which is off handed to go routine. func (b *Bithumb) applyBufferUpdate(pair currency.Pair) error { - fetching, err := b.obm.checkIsFetchingBook(pair) + fetching, needsFetching, err := b.obm.handleFetchingBook(pair) if err != nil { return err } @@ -75,12 +75,38 @@ func (b *Bithumb) applyBufferUpdate(pair currency.Pair) error { return nil } - recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) - if err != nil || (recent.Asks == nil && recent.Bids == nil) { + if needsFetching { + if b.Verbose { + log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", b.Name) + } return b.obm.fetchBookViaREST(pair) } - return b.obm.checkAndProcessUpdate(b.processBooks, pair, recent) + recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error fetching recent orderbook when applying updates: %s\n", + b.Name, + err) + } + + if recent != nil { + err = b.obm.checkAndProcessUpdate(b.processBooks, pair, recent) + if err != nil { + log.Errorf( + log.WebsocketMgr, + "%s error processing update - initiating new orderbook sync via REST: %s\n", + b.Name, + err) + err = b.obm.setNeedsFetchingBook(pair) + if err != nil { + return err + } + } + } + + return nil } // SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair @@ -91,6 +117,14 @@ func (b *Bithumb) SynchroniseWebsocketOrderbook() { defer b.Websocket.Wg.Done() for { select { + case <-b.Websocket.ShutdownC: + for { + select { + case <-b.obm.jobs: + default: + return + } + } case j := <-b.obm.jobs: err := b.processJob(j.Pair) if err != nil { @@ -98,8 +132,6 @@ func (b *Bithumb) SynchroniseWebsocketOrderbook() { "%s processing websocket orderbook error %v", b.Name, err) } - case <-b.Websocket.ShutdownC: - return } } }() @@ -149,12 +181,23 @@ func (b *Bithumb) setupOrderbookManager() { if b.obm.state == nil { b.obm.state = make(map[currency.Code]map[currency.Code]map[asset.Item]*update) b.obm.jobs = make(chan job, maxWSOrderbookJobs) - - for i := 0; i < maxWSOrderbookWorkers; i++ { - // 10 workers for synchronising book - b.SynchroniseWebsocketOrderbook() + } else { + // Change state on reconnect for initial sync. + for _, m1 := range b.obm.state { + for _, m2 := range m1 { + for _, update := range m2 { + update.initialSync = true + update.needsFetchingBook = true + update.lastUpdated = time.Time{} + } + } } } + + for i := 0; i < maxWSOrderbookWorkers; i++ { + // 10 workers for synchronising book + b.SynchroniseWebsocketOrderbook() + } } // stageWsUpdate stages websocket update to roll through updates that need to @@ -177,9 +220,10 @@ func (o *orderbookManager) stageWsUpdate(u *WsOrderbooks, pair currency.Pair, a state, ok := m2[a] if !ok { state = &update{ - buffer: make(chan *WsOrderbooks, maxWSUpdateBuffer), - fetchingBook: false, - initialSync: true, + buffer: make(chan *WsOrderbooks, maxWSUpdateBuffer), + fetchingBook: false, + initialSync: true, + needsFetchingBook: true, } m2[a] = state } @@ -201,19 +245,30 @@ func (o *orderbookManager) stageWsUpdate(u *WsOrderbooks, pair currency.Pair, a } } -// checkIsFetchingBook checks status if the book is currently being via the REST -// protocol. -func (o *orderbookManager) checkIsFetchingBook(pair currency.Pair) (bool, error) { +// handleFetchingBook checks if a full book is being fetched or needs to be +// fetched +func (o *orderbookManager) handleFetchingBook(pair currency.Pair) (fetching, needsFetching bool, err error) { o.Lock() defer o.Unlock() state, ok := o.state[pair.Base][pair.Quote][asset.Spot] if !ok { return false, + false, fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s", pair, asset.Spot) } - return state.fetchingBook, nil + + if state.fetchingBook { + return true, false, nil + } + + if state.needsFetchingBook { + state.needsFetchingBook = false + state.fetchingBook = true + return false, true, nil + } + return false, false, nil } // stopFetchingBook completes the book fetching. @@ -354,6 +409,7 @@ bufferEmpty: // disable rest orderbook synchronisation _ = o.stopFetchingBook(pair) _ = o.completeInitialSync(pair) + _ = o.stopNeedsFetchingBook(pair) return nil } @@ -389,3 +445,36 @@ func (b *Bithumb) SeedLocalCacheWithBook(p currency.Pair, o *Orderbook) error { newOrderBook.VerifyOrderbook = b.CanVerifyOrderbook return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } + +// setNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + state.needsFetchingBook = true + return nil +} + +// stopNeedsFetchingBook completes the book fetching initiation. +func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return fmt.Errorf("could not match pair %s and asset type %s in hash table", + pair, + asset.Spot) + } + if !state.needsFetchingBook { + return fmt.Errorf("needs fetching book already set to false for %s %s", + pair, + asset.Spot) + } + state.needsFetchingBook = false + return nil +} diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index d37d2490..ab2a390d 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -94,6 +94,7 @@ func (b *Bitmex) WsConnect() error { welcomeResp.Limit.Remaining) } + b.Websocket.Wg.Add(1) go b.wsReadData() err = b.websocketSendAuth() @@ -114,7 +115,6 @@ func (b *Bitmex) WsConnect() error { // wsReadData receives and passes on websocket messages for processing func (b *Bitmex) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 831ef642..ed3b7ca8 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -40,13 +40,15 @@ func (b *Bitstamp) WsConnect() error { if err != nil { b.Websocket.DataHandler <- err } + + b.Websocket.Wg.Add(1) go b.wsReadData() + return nil } // wsReadData receives and passes on websocket messages for processing func (b *Bitstamp) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { diff --git a/exchanges/bittrex/bittrex_websocket.go b/exchanges/bittrex/bittrex_websocket.go index 2a566015..55d4520f 100644 --- a/exchanges/bittrex/bittrex_websocket.go +++ b/exchanges/bittrex/bittrex_websocket.go @@ -108,7 +108,9 @@ func (b *Bittrex) WsConnect() error { // This reader routine is called prior to initiating a subscription for // efficient processing. + b.Websocket.Wg.Add(1) go b.wsReadData() + b.setupOrderbookManager() b.tickerCache = &TickerCache{ MarketSummaries: make(map[string]*MarketSummaryData), @@ -376,7 +378,6 @@ func (b *Bittrex) unsubscribeSlice(channelsToUnsubscribe []stream.ChannelSubscri // wsReadData gets and passes on websocket messages for processing func (b *Bittrex) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { diff --git a/exchanges/bittrex/bittrex_ws_orderbook.go b/exchanges/bittrex/bittrex_ws_orderbook.go index b13c6f61..35f90245 100644 --- a/exchanges/bittrex/bittrex_ws_orderbook.go +++ b/exchanges/bittrex/bittrex_ws_orderbook.go @@ -28,12 +28,22 @@ func (b *Bittrex) setupOrderbookManager() { state: make(map[currency.Code]map[currency.Code]map[asset.Item]*update), jobs: make(chan job, maxWSOrderbookJobs), } - - for i := 0; i < maxWSOrderbookWorkers; i++ { - // 10 workers for synchronising book - b.SynchroniseWebsocketOrderbook() + } else { + // Change state on reconnect for initial sync. + for _, m1 := range b.obm.state { + for _, m2 := range m1 { + for _, update := range m2 { + update.initialSync = true + update.needsFetchingBook = true + } + } } } + + for i := 0; i < maxWSOrderbookWorkers; i++ { + // 10 workers for synchronising book + b.SynchroniseWebsocketOrderbook() + } } // ProcessUpdateOB processes the websocket orderbook update @@ -183,6 +193,14 @@ func (b *Bittrex) SynchroniseWebsocketOrderbook() { defer b.Websocket.Wg.Done() for { select { + case <-b.Websocket.ShutdownC: + for { + select { + case <-b.obm.jobs: + default: + return + } + } case j := <-b.obm.jobs: err := b.processJob(j.Pair) if err != nil { @@ -190,8 +208,6 @@ func (b *Bittrex) SynchroniseWebsocketOrderbook() { "%s processing websocket orderbook error %v", b.Name, err) } - case <-b.Websocket.ShutdownC: - return } } }() diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 88b686b7..14ace369 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -39,13 +39,14 @@ func (b *BTCMarkets) WsConnect() error { if b.Verbose { log.Debugf(log.ExchangeSys, "%s Connected to Websocket.\n", b.Name) } + + b.Websocket.Wg.Add(1) go b.wsReadData() return nil } // wsReadData receives and passes on websocket messages for processing func (b *BTCMarkets) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 39543a49..7ce64984 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -41,7 +41,9 @@ func (b *BTSE) WsConnect() error { Delay: btseWebsocketTimer, }) + b.Websocket.Wg.Add(1) go b.wsReadData() + if b.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { err = b.WsAuthenticate() if err != nil { @@ -97,7 +99,6 @@ func stringToOrderStatus(status string) (order.Status, error) { // wsReadData receives and passes on websocket messages for processing func (b *BTSE) wsReadData() { - b.Websocket.Wg.Add(1) defer b.Websocket.Wg.Done() for { diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index fb9fe940..9f7be856 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -38,17 +38,14 @@ func (c *CoinbasePro) WsConnect() error { return err } + c.Websocket.Wg.Add(1) go c.wsReadData() return nil } // wsReadData receives and passes on websocket messages for processing func (c *CoinbasePro) wsReadData() { - c.Websocket.Wg.Add(1) - - defer func() { - c.Websocket.Wg.Done() - }() + defer c.Websocket.Wg.Done() for { resp := c.Websocket.Conn.ReadMessage() diff --git a/exchanges/coinbene/coinbene_websocket.go b/exchanges/coinbene/coinbene_websocket.go index 302ecac1..82392741 100644 --- a/exchanges/coinbene/coinbene_websocket.go +++ b/exchanges/coinbene/coinbene_websocket.go @@ -41,7 +41,9 @@ func (c *Coinbene) WsConnect() error { return err } + c.Websocket.Wg.Add(1) go c.wsReadData() + if c.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { err = c.Login() if err != nil { @@ -101,7 +103,6 @@ func (c *Coinbene) GenerateAuthSubs() ([]stream.ChannelSubscription, error) { // wsReadData receives and passes on websocket messages for processing func (c *Coinbene) wsReadData() { - c.Websocket.Wg.Add(1) defer c.Websocket.Wg.Done() for { resp := c.Websocket.Conn.ReadMessage() diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index b15e99a0..1449fa1a 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -48,6 +48,8 @@ func (c *COINUT) WsConnect() error { if err != nil { return err } + + c.Websocket.Wg.Add(1) go c.wsReadData() if !c.instrumentMap.IsLoaded() { @@ -71,7 +73,6 @@ func (c *COINUT) WsConnect() error { // wsReadData receives and passes on websocket messages for processing func (c *COINUT) wsReadData() { - c.Websocket.Wg.Add(1) defer c.Websocket.Wg.Done() for { diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 107d686f..f1ee3d82 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -40,7 +40,9 @@ const ( var ( // ErrAuthenticatedRequestWithoutCredentialsSet error message for authenticated request without credentials set ErrAuthenticatedRequestWithoutCredentialsSet = errors.New("authenticated HTTP request called but not supported due to unset/default API keys") - errTransportNotSet = errors.New("transport not set, cannot set timeout") + + errEndpointStringNotFound = errors.New("endpoint string not found") + errTransportNotSet = errors.New("transport not set, cannot set timeout") ) func (b *Base) checkAndInitRequester() { @@ -844,7 +846,39 @@ func (b *Base) SetAPIURL() error { val == config.WebsocketURLNonDefaultMessage { continue } + + var u URL + u, err = getURLTypeFromString(key) + if err != nil { + return err + } + + var defaultURL string + defaultURL, err = b.API.Endpoints.GetURL(u) + if err != nil { + log.Warnf( + log.ExchangeSys, + "%s: Config cannot match with default endpoint URL: [%s] with key: [%s], please remove or update core support endpoints.", + b.Name, + val, + u) + continue + } + + if defaultURL == val { + continue + } + + log.Warnf( + log.ExchangeSys, + "%s: Config is overwriting default endpoint URL values from: [%s] to: [%s] for: [%s]", + b.Name, + defaultURL, + val, + u) + checkInsecureEndpoint(val) + err = b.API.Endpoints.SetRunning(key, val) if err != nil { return err @@ -1255,36 +1289,70 @@ func (b *Base) FormatSymbol(pair currency.Pair, assetType asset.Item) (string, e func (u URL) String() string { switch u { case RestSpot: - return "RestSpotURL" + return restSpotURL case RestSpotSupplementary: - return "RestSpotSupplementaryURL" + return restSpotSupplementaryURL case RestUSDTMargined: - return "RestUSDTMarginedFuturesURL" + return restUSDTMarginedFuturesURL case RestCoinMargined: - return "RestCoinMarginedFuturesURL" + return restCoinMarginedFuturesURL case RestFutures: - return "RestFuturesURL" + return restFuturesURL case RestSandbox: - return "RestSandboxURL" + return restSandboxURL case RestSwap: - return "RestSwapURL" + return restSwapURL case WebsocketSpot: - return "WebsocketSpotURL" + return websocketSpotURL case WebsocketSpotSupplementary: - return "WebsocketSpotSupplementaryURL" + return websocketSpotSupplementaryURL case ChainAnalysis: - return "ChainAnalysisURL" + return chainAnalysisURL case EdgeCase1: - return "EdgeCase1URL" + return edgeCase1URL case EdgeCase2: - return "EdgeCase2URL" + return edgeCase2URL case EdgeCase3: - return "EdgeCase3URL" + return edgeCase3URL default: return "" } } +// getURLTypeFromString returns URL type from the endpoint string association +func getURLTypeFromString(ep string) (URL, error) { + switch ep { + case restSpotURL: + return RestSpot, nil + case restSpotSupplementaryURL: + return RestSpotSupplementary, nil + case restUSDTMarginedFuturesURL: + return RestUSDTMargined, nil + case restCoinMarginedFuturesURL: + return RestCoinMargined, nil + case restFuturesURL: + return RestFutures, nil + case restSandboxURL: + return RestSandbox, nil + case restSwapURL: + return RestSwap, nil + case websocketSpotURL: + return WebsocketSpot, nil + case websocketSpotSupplementaryURL: + return WebsocketSpotSupplementary, nil + case chainAnalysisURL: + return ChainAnalysis, nil + case edgeCase1URL: + return EdgeCase1, nil + case edgeCase2URL: + return EdgeCase2, nil + case edgeCase3URL: + return EdgeCase3, nil + default: + return Invalid, fmt.Errorf("%w for %s", errEndpointStringNotFound, ep) + } +} + // UpdateOrderExecutionLimits updates order execution limits this is overridable func (b *Base) UpdateOrderExecutionLimits(a asset.Item) error { return common.ErrNotYetImplemented diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index 1e0526d7..f274b10c 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -1320,13 +1320,15 @@ func TestSetupDefaults(t *testing.T) { b.Websocket = stream.New() b.Features.Supports.Websocket = true err = b.Websocket.Setup(&stream.WebsocketSetup{ - Enabled: false, - WebsocketTimeout: time.Second * 30, - Features: &protocol.Features{}, - DefaultURL: "ws://something.com", - RunningURL: "ws://something.com", - ExchangeName: "test", - Connector: func() error { return nil }, + Enabled: false, + WebsocketTimeout: time.Second * 30, + Features: &protocol.Features{}, + DefaultURL: "ws://something.com", + RunningURL: "ws://something.com", + ExchangeName: "test", + Connector: func() error { return nil }, + GenerateSubscriptions: func() ([]stream.ChannelSubscription, error) { return []stream.ChannelSubscription{}, nil }, + Subscriber: func(cs []stream.ChannelSubscription) error { return nil }, }) if err != nil { t.Fatal(err) @@ -1666,13 +1668,15 @@ func TestIsWebsocketEnabled(t *testing.T) { b.Websocket = stream.New() err := b.Websocket.Setup(&stream.WebsocketSetup{ - Enabled: true, - WebsocketTimeout: time.Second * 30, - Features: &protocol.Features{}, - DefaultURL: "ws://something.com", - RunningURL: "ws://something.com", - ExchangeName: "test", - Connector: func() error { return nil }, + Enabled: true, + WebsocketTimeout: time.Second * 30, + Features: &protocol.Features{}, + DefaultURL: "ws://something.com", + RunningURL: "ws://something.com", + ExchangeName: "test", + Connector: func() error { return nil }, + GenerateSubscriptions: func() ([]stream.ChannelSubscription, error) { return nil, nil }, + Subscriber: func(cs []stream.ChannelSubscription) error { return nil }, }) if err != nil { t.Error(err) @@ -2403,3 +2407,41 @@ func TestAssetWebsocketFunctionality(t *testing.T) { t.Fatal("error asset is not turned off, unexpected response") } } + +func TestGetGetURLTypeFromString(t *testing.T) { + testCases := []struct { + Endpoint string + Expected URL + Error error + }{ + {Endpoint: "RestSpotURL", Expected: RestSpot}, + {Endpoint: "RestSpotSupplementaryURL", Expected: RestSpotSupplementary}, + {Endpoint: "RestUSDTMarginedFuturesURL", Expected: RestUSDTMargined}, + {Endpoint: "RestCoinMarginedFuturesURL", Expected: RestCoinMargined}, + {Endpoint: "RestFuturesURL", Expected: RestFutures}, + {Endpoint: "RestSandboxURL", Expected: RestSandbox}, + {Endpoint: "RestSwapURL", Expected: RestSwap}, + {Endpoint: "WebsocketSpotURL", Expected: WebsocketSpot}, + {Endpoint: "WebsocketSpotSupplementaryURL", Expected: WebsocketSpotSupplementary}, + {Endpoint: "ChainAnalysisURL", Expected: ChainAnalysis}, + {Endpoint: "EdgeCase1URL", Expected: EdgeCase1}, + {Endpoint: "EdgeCase2URL", Expected: EdgeCase2}, + {Endpoint: "EdgeCase3URL", Expected: EdgeCase3}, + {Endpoint: "sillyMcSillyBilly", Expected: 0, Error: errEndpointStringNotFound}, + } + + for _, tt := range testCases { + tt := tt + t.Run(tt.Endpoint, func(t *testing.T) { + t.Parallel() + u, err := getURLTypeFromString(tt.Endpoint) + if !errors.Is(err, tt.Error) { + t.Fatalf("received: %v but expected: %v", err, tt.Error) + } + + if u != tt.Expected { + t.Fatalf("received: %v but expected: %v", u, tt.Expected) + } + }) + } +} diff --git a/exchanges/exchange_types.go b/exchanges/exchange_types.go index b2872478..daa60aca 100644 --- a/exchanges/exchange_types.go +++ b/exchanges/exchange_types.go @@ -235,7 +235,8 @@ type Base struct { // url lookup consts const ( - RestSpot URL = iota + Invalid URL = iota + RestSpot RestSpotSupplementary RestUSDTMargined RestCoinMargined @@ -248,6 +249,20 @@ const ( EdgeCase1 EdgeCase2 EdgeCase3 + + restSpotURL = "RestSpotURL" + restSpotSupplementaryURL = "RestSpotSupplementaryURL" + restUSDTMarginedFuturesURL = "RestUSDTMarginedFuturesURL" + restCoinMarginedFuturesURL = "RestCoinMarginedFuturesURL" + restFuturesURL = "RestFuturesURL" + restSandboxURL = "RestSandboxURL" + restSwapURL = "RestSwapURL" + websocketSpotURL = "WebsocketSpotURL" + websocketSpotSupplementaryURL = "WebsocketSpotSupplementaryURL" + chainAnalysisURL = "ChainAnalysisURL" + edgeCase1URL = "EdgeCase1URL" + edgeCase2URL = "EdgeCase2URL" + edgeCase3URL = "EdgeCase3URL" ) var keyURLs = []URL{RestSpot, diff --git a/exchanges/ftx/ftx_websocket.go b/exchanges/ftx/ftx_websocket.go index 8496a2a3..78c6a1ee 100644 --- a/exchanges/ftx/ftx_websocket.go +++ b/exchanges/ftx/ftx_websocket.go @@ -60,7 +60,9 @@ func (f *FTX) WsConnect() error { log.Debugf(log.ExchangeSys, "%s Connected to Websocket.\n", f.Name) } + f.Websocket.Wg.Add(1) go f.wsReadData() + if f.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { err = f.WsAuth() if err != nil { @@ -210,7 +212,6 @@ func (f *FTX) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, erro // wsReadData gets and passes on websocket messages for processing func (f *FTX) wsReadData() { - f.Websocket.Wg.Add(1) defer f.Websocket.Wg.Done() for { diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index ee2b7ed5..b96a2f61 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -39,6 +39,8 @@ func (g *Gateio) WsConnect() error { if err != nil { return err } + + g.Websocket.Wg.Add(1) go g.wsReadData() if g.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { @@ -101,7 +103,6 @@ func (g *Gateio) wsServerSignIn() error { // wsReadData receives and passes on websocket messages for processing func (g *Gateio) wsReadData() { - g.Websocket.Wg.Add(1) defer g.Websocket.Wg.Done() for { diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 7fdda57a..c72b7464 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -99,6 +99,7 @@ func (g *Gateio) SetDefaults() { MessageCorrelation: true, GetOrder: true, AccountBalance: true, + Subscribe: true, }, WithdrawPermissions: exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals, diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index 8c324b22..45f84ee9 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -47,6 +47,7 @@ func (g *Gemini) WsConnect() error { return err } + g.Websocket.Wg.Add(2) go g.wsReadData() go g.wsFunnelConnectionData(g.Websocket.Conn) @@ -218,7 +219,6 @@ func (g *Gemini) WsAuth(dialer *websocket.Dialer) error { // wsFunnelConnectionData receives data from multiple connections and passes it to wsReadData func (g *Gemini) wsFunnelConnectionData(ws stream.Connection) { - g.Websocket.Wg.Add(1) defer g.Websocket.Wg.Done() for { resp := ws.ReadMessage() @@ -231,11 +231,25 @@ func (g *Gemini) wsFunnelConnectionData(ws stream.Connection) { // wsReadData receives and passes on websocket messages for processing func (g *Gemini) wsReadData() { - g.Websocket.Wg.Add(1) defer g.Websocket.Wg.Done() for { select { case <-g.Websocket.ShutdownC: + select { + case resp := <-comms: + err := g.wsHandleData(resp.Raw) + if err != nil { + select { + case g.Websocket.DataHandler <- err: + default: + log.Errorf(log.WebsocketMgr, + "%s websocket handle data error: %v", + g.Name, + err) + } + } + default: + } return case resp := <-comms: err := g.wsHandleData(resp.Raw) diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 547f40d0..ee27c1fe 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -41,7 +41,10 @@ func (h *HitBTC) WsConnect() error { if err != nil { return err } + + h.Websocket.Wg.Add(1) go h.wsReadData() + err = h.wsLogin() if err != nil { log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", h.Name, err) @@ -52,7 +55,6 @@ func (h *HitBTC) WsConnect() error { // wsReadData receives and passes on websocket messages for processing func (h *HitBTC) wsReadData() { - h.Websocket.Wg.Add(1) defer h.Websocket.Wg.Done() for { diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index be4155de..5e4c7836 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -83,6 +83,7 @@ func (h *HUOBI) WsConnect() error { h.Websocket.SetCanUseAuthenticatedEndpoints(false) } + h.Websocket.Wg.Add(1) go h.wsReadData() return nil } @@ -92,6 +93,7 @@ func (h *HUOBI) wsDial(dialer *websocket.Dialer) error { if err != nil { return err } + h.Websocket.Wg.Add(1) go h.wsFunnelConnectionData(h.Websocket.Conn, wsMarketURL) return nil } @@ -105,6 +107,8 @@ func (h *HUOBI) wsAuthenticatedDial(dialer *websocket.Dialer) error { if err != nil { return err } + + h.Websocket.Wg.Add(1) go h.wsFunnelConnectionData(h.Websocket.AuthConn, wsAccountsOrdersURL) return nil } @@ -112,7 +116,6 @@ func (h *HUOBI) wsAuthenticatedDial(dialer *websocket.Dialer) error { // wsFunnelConnectionData manages data from multiple endpoints and passes it to // a channel func (h *HUOBI) wsFunnelConnectionData(ws stream.Connection, url string) { - h.Websocket.Wg.Add(1) defer h.Websocket.Wg.Done() for { resp := ws.ReadMessage() @@ -125,13 +128,31 @@ func (h *HUOBI) wsFunnelConnectionData(ws stream.Connection, url string) { // wsReadData receives and passes on websocket messages for processing func (h *HUOBI) wsReadData() { - h.Websocket.Wg.Add(1) defer h.Websocket.Wg.Done() for { - resp := <-comms - err := h.wsHandleData(resp.Raw) - if err != nil { - h.Websocket.DataHandler <- err + select { + case <-h.Websocket.ShutdownC: + select { + case resp := <-comms: + err := h.wsHandleData(resp.Raw) + if err != nil { + select { + case h.Websocket.DataHandler <- err: + default: + log.Errorf(log.WebsocketMgr, + "%s websocket handle data error: %v", + h.Name, + err) + } + } + default: + } + return + case resp := <-comms: + err := h.wsHandleData(resp.Raw) + if err != nil { + h.Websocket.DataHandler <- err + } } } } diff --git a/exchanges/kraken/kraken.go b/exchanges/kraken/kraken.go index 70df0075..392b19bc 100644 --- a/exchanges/kraken/kraken.go +++ b/exchanges/kraken/kraken.go @@ -350,24 +350,20 @@ func (k *Kraken) GetTrades(symbol currency.Pair) ([]RecentTrades, error) { var dataError interface{} dataError, ok = data["error"] if ok { - var dataErrorInterface interface{} - dataErrorInterface, ok = dataError.(interface{}) + var errorList []interface{} + errorList, ok = dataError.([]interface{}) if ok { - var errorList []interface{} - errorList, ok = dataErrorInterface.([]interface{}) - if ok { - var errs common.Errors - for i := range errorList { - var errString string - errString, ok = errorList[i].(string) - if !ok { - continue - } - errs = append(errs, errors.New(errString)) - } - if len(errs) > 0 { - return nil, errs + var errs common.Errors + for i := range errorList { + var errString string + errString, ok = errorList[i].(string) + if !ok { + continue } + errs = append(errs, errors.New(errString)) + } + if len(errs) > 0 { + return nil, errs } } } diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 704b6119..d42b2b04 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -522,6 +522,15 @@ func TestGetTrades(t *testing.T) { if err != nil { t.Error("GetTrades() error", err) } + + cp, err = currency.NewPairFromString("XXXXX") + if err != nil { + t.Error(err) + } + _, err = k.GetTrades(cp) + if err == nil { + t.Error("GetTrades() error: expecting error") + } } // TestGetSpread API endpoint test diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 522730c0..dbee8a1f 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -96,6 +96,7 @@ func (k *Kraken) WsConnect() error { } comms := make(chan stream.Response) + k.Websocket.Wg.Add(2) go k.wsReadData(comms) go k.wsFunnelConnectionData(k.Websocket.Conn, comms) @@ -116,6 +117,7 @@ func (k *Kraken) WsConnect() error { k.Name, err) } else { + k.Websocket.Wg.Add(1) go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) err = k.wsAuthPingHandler() if err != nil { @@ -140,7 +142,6 @@ func (k *Kraken) WsConnect() error { // wsFunnelConnectionData funnels both auth and public ws data into one manageable place func (k *Kraken) wsFunnelConnectionData(ws stream.Connection, comms chan stream.Response) { - k.Websocket.Wg.Add(1) defer k.Websocket.Wg.Done() for { resp := ws.ReadMessage() @@ -153,12 +154,26 @@ func (k *Kraken) wsFunnelConnectionData(ws stream.Connection, comms chan stream. // wsReadData receives and passes on websocket messages for processing func (k *Kraken) wsReadData(comms chan stream.Response) { - k.Websocket.Wg.Add(1) defer k.Websocket.Wg.Done() for { select { case <-k.Websocket.ShutdownC: + select { + case resp := <-comms: + err := k.wsHandleData(resp.Raw) + if err != nil { + select { + case k.Websocket.DataHandler <- err: + default: + log.Errorf(log.WebsocketMgr, + "%s websocket handle data error: %v", + k.Name, + err) + } + } + default: + } return case resp := <-comms: err := k.wsHandleData(resp.Raw) diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index 59400976..92d6c5eb 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -192,7 +192,9 @@ func (o *OKGroup) WsConnect() error { o.Websocket.GetWebsocketURL()) } + o.Websocket.Wg.Add(1) go o.WsReadData() + if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { err = o.WsLogin() if err != nil { @@ -238,7 +240,6 @@ func (o *OKGroup) WsLogin() error { // WsReadData receives and passes on websocket messages for processing func (o *OKGroup) WsReadData() { - o.Websocket.Wg.Add(1) defer o.Websocket.Wg.Done() for { diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index c3cf2771..5d72c3fa 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -68,6 +68,7 @@ func (p *Poloniex) WsConnect() error { return err } + p.Websocket.Wg.Add(1) go p.wsReadData() return nil @@ -100,9 +101,7 @@ func (p *Poloniex) loadCurrencyDetails() error { // wsReadData handles data from the websocket connection func (p *Poloniex) wsReadData() { - p.Websocket.Wg.Add(1) defer p.Websocket.Wg.Done() - for { resp := p.Websocket.Conn.ReadMessage() if resp.Raw == nil { diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 5c5df81f..ad9d8144 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -27,6 +27,7 @@ var ( errClosedConnection = errors.New("use of closed network connection") // ErrSubscriptionFailure defines an error when a subscription fails ErrSubscriptionFailure = errors.New("subscription failure") + errAlreadyRunning = errors.New("connection monitor is already running") ) // New initialises the websocket struct @@ -43,6 +44,11 @@ func New() *Websocket { } } +var ( + errSubscriberUnset = errors.New("subscriber function needs to be set") + errGenerateSubsciptionsUnset = errors.New("generate subscriptions function needs to be set") +) + // Setup sets main variables for websocket connection func (w *Websocket) Setup(s *WebsocketSetup) error { if w == nil { @@ -65,8 +71,8 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.features = s.Features - if w.features.Subscribe && s.Subscriber == nil { - return errors.New("features have been set yet channel subscriber is not set") + if s.Subscriber == nil { + return errSubscriberUnset } w.Subscriber = s.Subscriber @@ -75,6 +81,9 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { } w.Unsubscriber = s.UnSubscriber + if s.GenerateSubscriptions == nil { + return errGenerateSubsciptionsUnset + } w.GenerateSubs = s.GenerateSubscriptions w.enabled = s.Enabled @@ -206,24 +215,28 @@ func (w *Websocket) Connect() error { w.setConnectingStatus(false) w.setInit(true) - if !w.IsConnectionMonitorRunning() { - w.connectionMonitor() + err = w.connectionMonitor() + if err != nil { + log.Errorf(log.WebsocketMgr, + "%s cannot start websocket connection monitor %v", + w.GetName(), + err) } - // Resubscribe after re-connection - if len(w.subscriptions) != 0 { - err = w.Subscriber(w.subscriptions) - if err != nil { - return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) - } + subs, err := w.GenerateSubs() // regenerate state on new connection + if err != nil { + return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) + } + err = w.Subscriber(subs) + if err != nil { + return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) } - return nil } // Disable disables the exchange websocket protocol func (w *Websocket) Disable() error { - if !w.IsConnected() || !w.IsEnabled() { + if !w.IsEnabled() { return fmt.Errorf("websocket is already disabled for exchange %s", w.exchangeName) } @@ -290,11 +303,11 @@ func (w *Websocket) dataMonitor() { } // connectionMonitor ensures that the WS keeps connecting -func (w *Websocket) connectionMonitor() { - if w.IsConnectionMonitorRunning() { - return +func (w *Websocket) connectionMonitor() error { + if w.checkAndSetMonitorRunning() { + return errAlreadyRunning } - w.setConnectionMonitorRunning(true) + go func() { timer := time.NewTimer(connectionMonitorDelay) @@ -354,6 +367,7 @@ func (w *Websocket) connectionMonitor() { } } }() + return nil } // Shutdown attempts to shut down a websocket connection and associated routines @@ -527,7 +541,10 @@ func (w *Websocket) trafficMonitor() { // Routine pausing mechanism go func(p chan<- struct{}) { time.Sleep(defaultTrafficPeriod) - p <- struct{}{} + select { + case p <- struct{}{}: + default: + } }(pause) select { case <-w.ShutdownC: @@ -607,6 +624,16 @@ func (w *Websocket) IsTrafficMonitorRunning() bool { return w.trafficMonitorRunning } +func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) { + w.connectionMutex.Lock() + defer w.connectionMutex.Unlock() + if w.connectionMonitorRunning { + return true + } + w.connectionMonitorRunning = true + return false +} + func (w *Websocket) setConnectionMonitorRunning(b bool) { w.connectionMutex.Lock() w.connectionMonitorRunning = b diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 8e1179ee..a720cbb7 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -206,7 +206,16 @@ func (w *WebsocketConnection) ReadMessage() Response { if err != nil { if isDisconnectionError(err) { w.setConnectedStatus(false) - w.readMessageErrors <- err + select { + case w.readMessageErrors <- err: + default: + // bypass if there is no receiver, as this stops it returning + // when shutdown is called. + log.Warnf(log.WebsocketMgr, + "%s failed to relay error: %v", + w.ExchangeName, + err) + } } return Response{} } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index c45486e2..7d5aaac1 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -153,8 +153,14 @@ func TestSetup(t *testing.T) { } websocketSetup.WebsocketTimeout = time.Minute err = w.Setup(websocketSetup) - if err != nil { - t.Fatal(err) + if !errors.Is(err, errGenerateSubsciptionsUnset) { + t.Fatalf("received: %v but expected: %v", err, errGenerateSubsciptionsUnset) + } + + websocketSetup.GenerateSubscriptions = func() ([]ChannelSubscription, error) { return nil, nil } + err = w.Setup(websocketSetup) + if !errors.Is(err, nil) { + t.Fatalf("received: %v but expected: %v", err, nil) } } @@ -544,23 +550,33 @@ func TestConnectionMonitorNoConnection(t *testing.T) { ws.DataHandler = make(chan interface{}, 1) ws.ShutdownC = make(chan struct{}, 1) ws.exchangeName = "hello" - ws.trafficTimeout = 1 ws.Wg = &sync.WaitGroup{} - ws.connectionMonitor() + ws.enabled = true + err := ws.connectionMonitor() + if !errors.Is(err, nil) { + t.Fatalf("received: %v, but expected: %v", err, nil) + } if !ws.IsConnectionMonitorRunning() { t.Fatal("Should not have exited") } - ws.connectionMonitor() // This one should exit + err = ws.connectionMonitor() + if !errors.Is(err, errAlreadyRunning) { + t.Fatalf("received: %v, but expected: %v", err, errAlreadyRunning) + } if !ws.IsConnectionMonitorRunning() { t.Fatal("Should not have exited") } - time.Sleep(time.Millisecond * 100) + ws.setEnabled(false) + time.Sleep(time.Second * 2) if ws.IsConnectionMonitorRunning() { t.Fatal("Should have exited") } ws.setConnectedStatus(true) // attempt shutdown when not enabled ws.setConnectingStatus(true) // throw a spanner in the works - ws.connectionMonitor() + err = ws.connectionMonitor() + if !errors.Is(err, nil) { + t.Fatalf("received: %v, but expected: %v", err, nil) + } if !ws.IsConnectionMonitorRunning() { t.Fatal("Should not have exited") } @@ -1083,6 +1099,9 @@ func TestFlushChannels(t *testing.T) { // Disable pair and flush system newgen.EnabledPairs = []currency.Pair{ currency.NewPair(currency.BTC, currency.AUD)} + web.GenerateSubs = func() ([]ChannelSubscription, error) { + return []ChannelSubscription{{Channel: "test"}}, nil + } err = web.FlushChannels() if err != nil { t.Fatal(err) @@ -1181,7 +1200,12 @@ func TestEnable(t *testing.T) { connector: connect, Wg: new(sync.WaitGroup), ShutdownC: make(chan struct{}), + GenerateSubs: func() ([]ChannelSubscription, error) { + return []ChannelSubscription{{Channel: "test"}}, nil + }, + Subscriber: func(cs []ChannelSubscription) error { return nil }, } + err := web.Enable() if err != nil { t.Fatal(err) diff --git a/exchanges/zb/zb.go b/exchanges/zb/zb.go index c306717f..b88915b2 100644 --- a/exchanges/zb/zb.go +++ b/exchanges/zb/zb.go @@ -18,7 +18,7 @@ import ( ) const ( - zbTradeURL = "http://api.zb.land" + zbTradeURL = "https://api.zb.land" zbMarketURL = "https://trade.zb.land/api" zbAPIVersion = "v1" zbData = "data" diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go index d842df09..661b3c3d 100644 --- a/exchanges/zb/zb_websocket.go +++ b/exchanges/zb/zb_websocket.go @@ -24,7 +24,7 @@ import ( ) const ( - zbWebsocketAPI = "wss://api.zb.live/websocket" + zbWebsocketAPI = "wss://api.zb.land/websocket" zWebsocketAddChannel = "addChannel" zbWebsocketRateLimit = 20 ) @@ -40,6 +40,7 @@ func (z *ZB) WsConnect() error { return err } + z.Websocket.Wg.Add(1) go z.wsReadData() return nil } @@ -47,9 +48,7 @@ func (z *ZB) WsConnect() error { // wsReadData handles all the websocket data coming from the websocket // connection func (z *ZB) wsReadData() { - z.Websocket.Wg.Add(1) defer z.Websocket.Wg.Done() - for { resp := z.Websocket.Conn.ReadMessage() if resp.Raw == nil {