diff --git a/config/config.go b/config/config.go index 3bb42c56..d197ec35 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,7 @@ const ( configDefaultWebsocketResponseCheckTimeout = time.Millisecond * 30 configDefaultWebsocketResponseMaxLimit = time.Second * 7 configDefaultWebsocketOrderbookBufferLimit = 5 + configDefaultWebsocketTrafficTimeout = time.Second * 30 configMaxAuthFailures = 3 defaultNTPAllowedDifference = 50000000 defaultNTPAllowedNegativeDifference = 50000000 @@ -1024,6 +1025,11 @@ func (c *Config) CheckExchangeConfigValues() error { c.Exchanges[i].Name, configDefaultWebsocketResponseMaxLimit) c.Exchanges[i].WebsocketResponseMaxLimit = configDefaultWebsocketResponseMaxLimit } + if c.Exchanges[i].WebsocketTrafficTimeout <= 0 { + log.Warnf(log.ExchangeSys, "Exchange %s Websocket response traffic timeout value not set, defaulting to %v.", + c.Exchanges[i].Name, configDefaultWebsocketTrafficTimeout) + c.Exchanges[i].WebsocketTrafficTimeout = configDefaultWebsocketTrafficTimeout + } if c.Exchanges[i].WebsocketOrderbookBufferLimit <= 0 { log.Warnf(log.ExchangeSys, "Exchange %s Websocket orderbook buffer limit value not set, defaulting to %v.", c.Exchanges[i].Name, configDefaultWebsocketOrderbookBufferLimit) diff --git a/config/config_test.go b/config/config_test.go index 419e1951..65249bfd 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1451,6 +1451,7 @@ func TestCheckExchangeConfigValues(t *testing.T) { cfg.Exchanges[0].WebsocketResponseMaxLimit = 0 cfg.Exchanges[0].WebsocketResponseCheckTimeout = 0 cfg.Exchanges[0].WebsocketOrderbookBufferLimit = 0 + cfg.Exchanges[0].WebsocketTrafficTimeout = 0 cfg.Exchanges[0].HTTPTimeout = 0 err = cfg.CheckExchangeConfigValues() if err != nil { @@ -1465,6 +1466,10 @@ func TestCheckExchangeConfigValues(t *testing.T) { t.Errorf("expected exchange %s to have updated WebsocketOrderbookBufferLimit value", cfg.Exchanges[0].Name) } + if cfg.Exchanges[0].WebsocketTrafficTimeout == 0 { + t.Errorf("expected exchange %s to have updated WebsocketTrafficTimeout value", + cfg.Exchanges[0].Name) + } if cfg.Exchanges[0].HTTPTimeout == 0 { t.Errorf("expected exchange %s to have updated HTTPTimeout value", cfg.Exchanges[0].Name) diff --git a/config/config_types.go b/config/config_types.go index 33ea91f5..5c07f39e 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -59,6 +59,7 @@ type ExchangeConfig struct { HTTPRateLimiter *HTTPRateLimitConfig `json:"httpRateLimiter,omitempty"` WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"` WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"` + WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"` WebsocketOrderbookBufferLimit int `json:"websocketOrderbookBufferLimit"` ProxyAddress string `json:"proxyAddress,omitempty"` BaseCurrencies currency.Currencies `json:"baseCurrencies"` diff --git a/engine/exchange.go b/engine/exchange.go index 3faaa836..5da34cdf 100644 --- a/engine/exchange.go +++ b/engine/exchange.go @@ -228,7 +228,6 @@ func LoadExchange(name string, useWG bool, wg *sync.WaitGroup) error { if exchCfg.Features.Supports.RESTCapabilities.AutoPairUpdates { exchCfg.Features.Enabled.AutoPairUpdates = false } - } } diff --git a/engine/routines.go b/engine/routines.go index 29466566..bcbc6902 100644 --- a/engine/routines.go +++ b/engine/routines.go @@ -354,39 +354,12 @@ func Websocketshutdown(ws *wshandler.Websocket) error { } } -// streamDiversion is a diversion switch from websocket to REST or other -// alternative feed -func streamDiversion(ws *wshandler.Websocket) { - wg.Add(1) - defer wg.Done() - - for { - select { - case <-shutdowner: - return - - case <-ws.Connected: - if Bot.Settings.Verbose { - log.Debugf(log.WebsocketMgr, "exchange %s websocket feed connected\n", ws.GetName()) - } - - case <-ws.Disconnected: - if Bot.Settings.Verbose { - log.Debugf(log.WebsocketMgr, "exchange %s websocket feed disconnected, switching to REST functionality\n", - ws.GetName()) - } - } - } -} - // WebsocketDataHandler handles websocket data coming from a websocket feed // associated with an exchange func WebsocketDataHandler(ws *wshandler.Websocket) { wg.Add(1) defer wg.Done() - go streamDiversion(ws) - for { select { case <-shutdowner: @@ -407,14 +380,7 @@ func WebsocketDataHandler(ws *wshandler.Websocket) { } case error: - switch { - case strings.Contains(d.Error(), "close 1006"): - go ws.WebsocketReset() - continue - default: - log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data) - } - + log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data) case wshandler.TradeData: // Trade Data // if Bot.Settings.Verbose { diff --git a/engine/syncer.go b/engine/syncer.go index 3cb15be2..598661f5 100644 --- a/engine/syncer.go +++ b/engine/syncer.go @@ -286,7 +286,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { supportsRESTTickerBatching := Bot.Exchanges[x].SupportsRESTTickerBatchUpdates() var usingREST bool var usingWebsocket bool - + var switchedToRest bool if Bot.Exchanges[x].SupportsWebsocket() && Bot.Exchanges[x].IsWebsocketEnabled() { ws, err := Bot.Exchanges[x].GetWebsocket() if err != nil { @@ -346,7 +346,12 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err) continue } - + if switchedToRest && usingWebsocket { + log.Infof(log.SyncMgr, + "%s %s: Websocket re-enabled, switching from rest to websocket\n", + c.Exchange, FormatCurrency(p).String()) + switchedToRest = false + } if e.Cfg.SyncTicker { if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTicker) { if c.Ticker.LastUpdated.IsZero() || time.Since(c.Ticker.LastUpdated) > defaultSyncerTimeout { @@ -362,6 +367,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Warnf(log.SyncMgr, "%s %s: No ticker update after 10 seconds, switching from websocket to rest\n", c.Exchange, FormatCurrency(p).String()) + switchedToRest = true e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, false) } } @@ -425,6 +431,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Warnf(log.SyncMgr, "%s %s: No orderbook update after 15 seconds, switching from websocket to rest\n", c.Exchange, FormatCurrency(c.Pair).String()) + switchedToRest = true e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false) } } @@ -491,7 +498,7 @@ func (e *ExchangeCurrencyPairSyncer) Start() { usingREST = true } - if !ws.IsConnected() { + if !ws.IsConnected() && !ws.IsConnecting() { go WebsocketDataHandler(ws) err = ws.Connect() diff --git a/exchanges/alphapoint/alphapoint_websocket.go b/exchanges/alphapoint/alphapoint_websocket.go index 58c6803a..05f389b2 100644 --- a/exchanges/alphapoint/alphapoint_websocket.go +++ b/exchanges/alphapoint/alphapoint_websocket.go @@ -38,6 +38,7 @@ func (a *Alphapoint) WebsocketClient() { for a.Enabled { msgType, resp, err := a.WebsocketConn.ReadMessage() if err != nil { + a.Websocket.ReadMessageErrors <- err log.Error(log.ExchangeSys, err) break } diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index e8b24c3b..ec48658b 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -21,8 +21,8 @@ const ( binanceDefaultWebsocketURL = "wss://stream.binance.com:9443" ) -// WSConnect intiates a websocket connection -func (b *Binance) WSConnect() error { +// WsConnect intiates a websocket connection +func (b *Binance) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { return errors.New(wshandler.WebsocketNotEnabled) } @@ -87,7 +87,7 @@ func (b *Binance) WsHandleData() { default: read, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -248,7 +248,7 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { newOrderBook.Pair = p newOrderBook.AssetType = asset.Spot - return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // UpdateLocalCache updates and returns the most recent iteration of the orderbook diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index 707b9858..442dcfb8 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -113,15 +113,18 @@ func (b *Binance) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WSConnect, - nil, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - binanceDefaultWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: binanceDefaultWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + }) + if err != nil { return err } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 557553e5..b9b5f569 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -133,6 +133,7 @@ func (b *Bitfinex) WsConnect() error { resp, err := b.WebsocketConn.ReadMessage() if err != nil { + b.Websocket.ReadMessageErrors <- err return fmt.Errorf("%v unable to read from Websocket. Error: %s", b.Name, err) } b.Websocket.TrafficAlert <- struct{}{} @@ -177,7 +178,7 @@ func (b *Bitfinex) WsDataHandler() { default: stream, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -481,7 +482,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books newOrderBook.AssetType = assetType newOrderBook.Bids = bid newOrderBook.Pair = p - err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return fmt.Errorf("bitfinex.go error - %s", err) } diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 4fbe2455..1a8229ed 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -114,15 +114,19 @@ func (b *Bitfinex) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitfinexWebsocket, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitfinexWebsocket, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 4af5f0f9..3524550c 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -66,8 +66,8 @@ var ( pongChan = make(chan int, 1) ) -// WsConnector initiates a new websocket connection -func (b *Bitmex) WsConnector() error { +// WsConnect initiates a new websocket connection +func (b *Bitmex) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { return errors.New(wshandler.WebsocketNotEnabled) } @@ -79,6 +79,7 @@ func (b *Bitmex) WsConnector() error { p, err := b.WebsocketConn.ReadMessage() if err != nil { + b.Websocket.ReadMessageErrors <- err return err } b.Websocket.TrafficAlert <- struct{}{} @@ -360,7 +361,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai newOrderBook.Bids = bids newOrderBook.AssetType = assetType newOrderBook.Pair = currencyPair - err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return fmt.Errorf("bitmex_websocket.go process orderbook error - %s", err) diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index 167be7e4..68e0a016 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -137,15 +137,19 @@ func (b *Bitmex) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnector, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitmexWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitmexWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 09d06ac4..5260dfd2 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -62,7 +62,7 @@ func (b *Bitstamp) WsHandleData() { default: resp, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -78,7 +78,7 @@ func (b *Bitstamp) WsHandleData() { if b.Verbose { log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.GetName()) } - go b.Websocket.WebsocketReset() + go b.Websocket.Shutdown() // Connection monitor will reconnect case "data": wsOrderBookTemp := websocketOrderBookResponse{} @@ -248,7 +248,7 @@ func (b *Bitstamp) seedOrderBook() error { newOrderBook.Pair = p[x] newOrderBook.AssetType = asset.Spot - err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/bitstamp/bitstamp_wrapper.go b/exchanges/bitstamp/bitstamp_wrapper.go index 790ca785..5f8cdc9f 100644 --- a/exchanges/bitstamp/bitstamp_wrapper.go +++ b/exchanges/bitstamp/bitstamp_wrapper.go @@ -110,15 +110,19 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitstampWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitstampWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 1ce85e65..b5476a45 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -54,7 +54,7 @@ func (b *BTSE) WsHandleData() { default: resp, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -162,7 +162,7 @@ func (b *BTSE) wsProcessSnapshot(snapshot *websocketOrderbookSnapshot) error { base.LastUpdated = time.Now() base.ExchangeName = b.Name - err := b.Websocket.Orderbook.LoadSnapshot(&base, true) + err := b.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { return err } diff --git a/exchanges/btse/btse_wrapper.go b/exchanges/btse/btse_wrapper.go index 32a102be..3cec4218 100644 --- a/exchanges/btse/btse_wrapper.go +++ b/exchanges/btse/btse_wrapper.go @@ -109,15 +109,19 @@ func (b *BTSE) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - btseWebsocket, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: btseWebsocket, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 74c3b9d0..ff34058e 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -54,7 +54,7 @@ func (c *CoinbasePro) WsHandleData() { default: resp, err := c.WebsocketConn.ReadMessage() if err != nil { - c.Websocket.DataHandler <- err + c.Websocket.ReadMessageErrors <- err return } c.Websocket.TrafficAlert <- struct{}{} @@ -217,7 +217,7 @@ func (c *CoinbasePro) ProcessSnapshot(snapshot *WebsocketOrderbookSnapshot) erro base.AssetType = asset.Spot base.Pair = pair - err := c.Websocket.Orderbook.LoadSnapshot(&base, false) + err := c.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { return err } diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index 3c207972..3079db1d 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -115,15 +115,19 @@ func (c *CoinbasePro) Setup(exch *config.ExchangeConfig) error { return err } - err = c.Websocket.Setup(c.WsConnect, - c.Subscribe, - c.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - coinbaseproWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = c.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: coinbaseproWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: c.WsConnect, + Subscriber: c.Subscribe, + UnSubscriber: c.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index bae03986..d12e03aa 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -77,7 +77,7 @@ func (c *COINUT) WsHandleData() { default: resp, err := c.WebsocketConn.ReadMessage() if err != nil { - c.Websocket.DataHandler <- err + c.Websocket.ReadMessageErrors <- err return } c.Websocket.TrafficAlert <- struct{}{} @@ -289,7 +289,7 @@ func (c *COINUT) WsProcessOrderbookSnapshot(ob *WsOrderbookSnapshot) error { ) newOrderBook.AssetType = asset.Spot - return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // WsProcessOrderbookUpdate process an orderbook update diff --git a/exchanges/coinut/coinut_wrapper.go b/exchanges/coinut/coinut_wrapper.go index 78e60142..08cc097c 100644 --- a/exchanges/coinut/coinut_wrapper.go +++ b/exchanges/coinut/coinut_wrapper.go @@ -116,15 +116,19 @@ func (c *COINUT) Setup(exch *config.ExchangeConfig) error { return err } - err = c.Websocket.Setup(c.WsConnect, - c.Subscribe, - c.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - coinutWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = c.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: coinutWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: c.WsConnect, + Subscriber: c.Subscribe, + UnSubscriber: c.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 59f3a9f8..f218aa4a 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -449,7 +449,7 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error { } if e.Features.Supports.Websocket { - e.Websocket.SetWsStatusAndConnection(exch.Features.Enabled.Websocket) + e.Websocket.Initialise() } return nil } diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index 96074091..ca3060da 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -1233,7 +1233,10 @@ func TestIsWebsocketEnabled(t *testing.T) { } b.Websocket = wshandler.New() - b.Websocket.Setup(nil, nil, nil, "", true, false, "", "", false) + err := b.Websocket.Setup(&wshandler.WebsocketSetup{Enabled: true}) + if err != nil { + t.Error(err) + } if !b.IsWebsocketEnabled() { t.Error("websocket should be enabled") } diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 585bf48f..8336d8e5 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -462,7 +462,7 @@ type WebSocketOrderQueryRecords struct { // WebsocketAuthenticationResponse contains the result of a login request type WebsocketAuthenticationResponse struct { - Error string `json:"error"` + Error string `json:"error,omitempty"` Result struct { Status string `json:"status"` } `json:"result"` diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 60853d4a..8bcb8b76 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -92,7 +92,7 @@ func (g *Gateio) WsHandleData() { default: resp, err := g.WebsocketConn.ReadMessage() if err != nil { - g.Websocket.DataHandler <- err + g.Websocket.ReadMessageErrors <- err return } g.Websocket.TrafficAlert <- struct{}{} @@ -238,8 +238,7 @@ func (g *Gateio) WsHandleData() { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = currency.NewPairFromString(c) - err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - true) + err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { g.Websocket.DataHandler <- err } diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 7bd1db10..d3d602a0 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -117,15 +117,19 @@ func (g *Gateio) Setup(exch *config.ExchangeConfig) error { return err } - err = g.Websocket.Setup(g.WsConnect, - g.Subscribe, - g.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - gateioWebsocketEndpoint, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = g.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: gateioWebsocketEndpoint, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: g.WsConnect, + Subscriber: g.Subscribe, + UnSubscriber: g.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index fc64e050..9d47d9c2 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -282,8 +282,7 @@ func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pa newOrderBook.Bids = bids newOrderBook.AssetType = asset.Spot newOrderBook.Pair = pair - err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - false) + err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { g.Websocket.DataHandler <- err return diff --git a/exchanges/gemini/gemini_wrapper.go b/exchanges/gemini/gemini_wrapper.go index 0e4d76f9..dd2338a9 100644 --- a/exchanges/gemini/gemini_wrapper.go +++ b/exchanges/gemini/gemini_wrapper.go @@ -116,15 +116,17 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error { g.API.Endpoints.URL = geminiSandboxAPIURL } - err = g.Websocket.Setup(g.WsConnect, - nil, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - geminiWebsocketEndpoint, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = g.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: geminiWebsocketEndpoint, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: g.WsConnect, + }) if err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 4e058a05..5cfda89a 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -65,7 +65,7 @@ func (h *HitBTC) WsHandleData() { default: resp, err := h.WebsocketConn.ReadMessage() if err != nil { - h.Websocket.DataHandler <- err + h.Websocket.ReadMessageErrors <- err return } h.Websocket.TrafficAlert <- struct{}{} @@ -251,7 +251,7 @@ func (h *HitBTC) WsProcessOrderbookSnapshot(ob WsOrderbook) error { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index a7cc2534..f1fcaa90 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -115,15 +115,19 @@ func (h *HitBTC) Setup(exch *config.ExchangeConfig) error { return err } - err = h.Websocket.Setup(h.WsConnect, - h.Subscribe, - h.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - hitbtcWebsocketAddress, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = h.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: hitbtcWebsocketAddress, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: h.WsConnect, + Subscriber: h.Subscribe, + UnSubscriber: h.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index ce063bd5..fb5897ec 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -313,7 +313,7 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error { newOrderBook.Asks = asks newOrderBook.Bids = bids newOrderBook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true) + err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index f2678e9e..8aa6ccce 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -119,15 +119,19 @@ func (h *HUOBI) Setup(exch *config.ExchangeConfig) error { h.API.PEMKeySupport = exch.API.PEMKeySupport h.API.Credentials.PEMKey = exch.API.Credentials.PEMKey - err = h.Websocket.Setup(h.WsConnect, - h.Subscribe, - h.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - wsMarketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = h.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: wsMarketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: h.WsConnect, + Subscriber: h.Subscribe, + UnSubscriber: h.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 7d855966..025d5aea 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -110,9 +110,7 @@ func (k *Kraken) WsHandleData() { default: resp, err := k.WebsocketConn.ReadMessage() if err != nil { - k.Websocket.DataHandler <- fmt.Errorf("%v WsHandleData: %v", - k.Name, - err) + k.Websocket.ReadMessageErrors <- err return } k.Websocket.TrafficAlert <- struct{}{} @@ -384,7 +382,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob } } base.LastUpdated = highestLastUpdate - err := k.Websocket.Orderbook.LoadSnapshot(&base, true) + err := k.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { k.Websocket.DataHandler <- err return @@ -509,7 +507,7 @@ func (k *Kraken) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip Subscription: WebsocketSubscriptionData{ Name: channelToSubscribe.Channel, }, - RequestID: k.WebsocketConn.GenerateMessageID(true), + RequestID: k.WebsocketConn.GenerateMessageID(false), } _, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp) return err @@ -523,7 +521,7 @@ func (k *Kraken) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr Subscription: WebsocketSubscriptionData{ Name: channelToSubscribe.Channel, }, - RequestID: k.WebsocketConn.GenerateMessageID(true), + RequestID: k.WebsocketConn.GenerateMessageID(false), } _, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp) return err diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 5d4912b6..3763a672 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -119,15 +119,19 @@ func (k *Kraken) Setup(exch *config.ExchangeConfig) error { return err } - err = k.Websocket.Setup(k.WsConnect, - k.Subscribe, - k.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - krakenWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = k.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: krakenWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: k.WsConnect, + Subscriber: k.Subscribe, + UnSubscriber: k.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/lakebtc/lakebtc_websocket.go b/exchanges/lakebtc/lakebtc_websocket.go index 690e2f41..3b1abae5 100644 --- a/exchanges/lakebtc/lakebtc_websocket.go +++ b/exchanges/lakebtc/lakebtc_websocket.go @@ -205,7 +205,7 @@ func (l *LakeBTC) processOrderbook(obUpdate, channel string) error { Price: price, }) } - return l.Websocket.Orderbook.LoadSnapshot(&book, true) + return l.Websocket.Orderbook.LoadSnapshot(&book) } func (l *LakeBTC) getCurrencyFromChannel(channel string) currency.Pair { diff --git a/exchanges/lakebtc/lakebtc_wrapper.go b/exchanges/lakebtc/lakebtc_wrapper.go index f865d0e6..c7c9be68 100644 --- a/exchanges/lakebtc/lakebtc_wrapper.go +++ b/exchanges/lakebtc/lakebtc_wrapper.go @@ -110,15 +110,18 @@ func (l *LakeBTC) Setup(exch *config.ExchangeConfig) error { return err } - err = l.Websocket.Setup(l.WsConnect, - l.Subscribe, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - lakeBTCWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = l.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: lakeBTCWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: l.WsConnect, + Subscriber: l.Subscribe, + }) if err != nil { return err } diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index 558252d6..fe10fee8 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -193,6 +193,9 @@ func (o *OKGroup) wsPingHandler(wg *sync.WaitGroup) { return case <-ticker.C: + if !o.Websocket.IsConnected() { + continue + } err := o.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("ping")) if o.Verbose { log.Debugf(log.ExchangeSys, "%v sending ping", o.GetName()) @@ -221,7 +224,7 @@ func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) { default: resp, err := o.WebsocketConn.ReadMessage() if err != nil { - o.Websocket.DataHandler <- err + o.Websocket.ReadMessageErrors <- err return } o.Websocket.TrafficAlert <- struct{}{} @@ -475,7 +478,7 @@ func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, i ExchangeName: o.GetName(), } - err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true) + err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/okgroup/okgroup_wrapper.go b/exchanges/okgroup/okgroup_wrapper.go index 863af600..a0ae6828 100644 --- a/exchanges/okgroup/okgroup_wrapper.go +++ b/exchanges/okgroup/okgroup_wrapper.go @@ -31,15 +31,18 @@ func (o *OKGroup) Setup(exch *config.ExchangeConfig) error { return err } - err = o.Websocket.Setup(o.WsConnect, - o.Subscribe, - o.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - o.API.Endpoints.WebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = o.Websocket.Setup(&wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: o.API.Endpoints.WebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: o.WsConnect, + Subscriber: o.Subscribe, + UnSubscriber: o.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index a6ef2ed1..50962417 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -88,7 +88,7 @@ func (p *Poloniex) WsHandleData() { default: resp, err := p.WebsocketConn.ReadMessage() if err != nil { - p.Websocket.DataHandler <- err + p.Websocket.ReadMessageErrors <- err return } p.Websocket.TrafficAlert <- struct{}{} @@ -330,7 +330,7 @@ func (p *Poloniex) WsProcessOrderbookSnapshot(ob []interface{}, symbol string) e newOrderBook.AssetType = asset.Spot newOrderBook.Pair = currency.NewPairFromString(symbol) - return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // WsProcessOrderbookUpdate processes new orderbook updates diff --git a/exchanges/poloniex/poloniex_wrapper.go b/exchanges/poloniex/poloniex_wrapper.go index 13611997..2febf687 100644 --- a/exchanges/poloniex/poloniex_wrapper.go +++ b/exchanges/poloniex/poloniex_wrapper.go @@ -113,15 +113,19 @@ func (p *Poloniex) Setup(exch *config.ExchangeConfig) error { return err } - err = p.Websocket.Setup(p.WsConnect, - p.Subscribe, - p.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - poloniexWebsocketAddress, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = p.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: poloniexWebsocketAddress, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: p.WsConnect, + Subscriber: p.Subscribe, + UnSubscriber: p.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/websocket/wshandler/wshandler.go b/exchanges/websocket/wshandler/wshandler.go index 931ee5c8..1bc9502d 100644 --- a/exchanges/websocket/wshandler/wshandler.go +++ b/exchanges/websocket/wshandler/wshandler.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io/ioutil" + "net" "net/http" "net/url" "strings" @@ -31,42 +32,28 @@ func New() *Websocket { } // Setup sets main variables for websocket connection -func (w *Websocket) Setup(connector func() error, - subscriber func(channelToSubscribe WebsocketChannelSubscription) error, - unsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error, - exchangeName string, - wsEnabled, - verbose bool, - defaultURL, - runningURL string, - authenticatedWebsocketAPISupport bool) error { - +func (w *Websocket) Setup(setupData *WebsocketSetup) error { w.DataHandler = make(chan interface{}, 1) - w.Connected = make(chan struct{}, 1) - w.Disconnected = make(chan struct{}, 1) w.TrafficAlert = make(chan struct{}, 1) - w.verbose = verbose - - w.SetChannelSubscriber(subscriber) - w.SetChannelUnsubscriber(unsubscriber) - err := w.SetWsStatusAndConnection(wsEnabled) + w.verbose = setupData.Verbose + w.SetChannelSubscriber(setupData.Subscriber) + w.SetChannelUnsubscriber(setupData.UnSubscriber) + w.enabled = setupData.Enabled + w.SetDefaultURL(setupData.DefaultURL) + w.SetConnector(setupData.Connector) + w.SetWebsocketURL(setupData.RunningURL) + w.SetExchangeName(setupData.ExchangeName) + w.SetCanUseAuthenticatedEndpoints(setupData.AuthenticatedWebsocketAPISupport) + w.trafficTimeout = setupData.WebsocketTimeout + err := w.Initialise() if err != nil { return err } - w.SetDefaultURL(defaultURL) - w.SetConnector(connector) - w.SetWebsocketURL(runningURL) - w.SetExchangeName(exchangeName) - w.SetCanUseAuthenticatedEndpoints(authenticatedWebsocketAPISupport) - - w.init = false - w.noConnectionCheckLimit = 5 - w.reconnectionLimit = 10 return nil } -// Connect intiates a websocket connection by using a package defined connection +// Connect initiates a websocket connection by using a package defined connection // function func (w *Websocket) Connect() error { w.m.Lock() @@ -75,32 +62,33 @@ func (w *Websocket) Connect() error { if !w.IsEnabled() { return errors.New(WebsocketNotEnabled) } - - if w.connected { - w.connecting = false - return errors.New("exchange_websocket.go error - already connected, cannot connect again") + if w.IsConnecting() { + return fmt.Errorf("%v Websocket already attempting to connect", + w.exchangeName) } - - w.connecting = true + if w.IsConnected() { + return fmt.Errorf("%v Websocket already connected", + w.exchangeName) + } + w.setConnectingStatus(true) w.ShutdownC = make(chan struct{}, 1) + w.ReadMessageErrors = make(chan error, 1) err := w.connector() if err != nil { - w.connecting = false - return fmt.Errorf("exchange_websocket.go connection error %s", - err) + w.setConnectingStatus(false) + return fmt.Errorf("%v Error connecting %s", + w.exchangeName, err) } - if !w.connected { - w.Connected <- struct{}{} - w.connected = true - w.connecting = false - } + w.setConnectedStatus(true) + w.setConnectingStatus(false) + w.setInit(true) var anotherWG sync.WaitGroup anotherWG.Add(1) go w.trafficMonitor(&anotherWG) anotherWG.Wait() - if !w.connectionMonitorRunning { + if !w.IsConnectionMonitorRunning() { go w.connectionMonitor() } if w.SupportsFunctionality(WebsocketSubscribeSupported) || w.SupportsFunctionality(WebsocketUnsubscribeSupported) { @@ -112,88 +100,82 @@ func (w *Websocket) Connect() error { // connectionMonitor ensures that the WS keeps connecting func (w *Websocket) connectionMonitor() { - w.m.Lock() - w.connectionMonitorRunning = true - w.m.Unlock() + if w.IsConnectionMonitorRunning() { + return + } + w.setConnectionMonitorRunning(true) + timer := time.NewTimer(connectionMonitorDelay) + defer func() { - w.connectionMonitorRunning = false + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + w.setConnectionMonitorRunning(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting", + w.exchangeName) + } }() for { - time.Sleep(connectionMonitorDelay) - w.m.Lock() - if !w.enabled { - w.m.Unlock() - w.DataHandler <- fmt.Errorf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName) - err := w.Shutdown() - if err != nil { - log.Error(log.WebsocketMgr, err) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v running connection monitor cycle", + w.exchangeName) + } + if !w.IsEnabled() { + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v connectionMonitor: websocket disabled, shutting down", w.exchangeName) + } + if w.IsConnected() { + err := w.Shutdown() + if err != nil { + log.Error(log.WebsocketMgr, err) + } } if w.verbose { - log.Debugf(log.WebsocketMgr, "%v connectionMonitor exiting", + log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting", w.exchangeName) } return } - w.m.Unlock() - err := w.checkConnection() - if err != nil { - log.Error(log.WebsocketMgr, err) - } - } -} - -// checkConnection ensures the connection is maintained -// Will reconnect on disconnect -func (w *Websocket) checkConnection() error { - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v checking connection", w.exchangeName) - } - switch { - case !w.IsConnected() && !w.IsConnecting(): - w.m.Lock() - defer w.m.Unlock() - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v no connection. Attempt %v/%v", w.exchangeName, w.noConnectionChecks, w.noConnectionCheckLimit) - } - if w.noConnectionChecks >= w.noConnectionCheckLimit { - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v resetting connection", w.exchangeName) + select { + case err := <-w.ReadMessageErrors: + // check if this error is a disconnection error + if isDisconnectionError(err) { + w.setConnectedStatus(false) + w.setConnectingStatus(false) + w.setInit(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", + w.exchangeName, err) + } + err = w.Connect() + if err != nil { + log.Error(log.WebsocketMgr, err) + } + } else { + // pass off non disconnect errors to datahandler to manage + w.DataHandler <- err } - w.connecting = true - go w.WebsocketReset() - w.noConnectionChecks = 0 + case <-timer.C: + if !w.IsConnecting() && !w.IsConnected() { + err := w.Connect() + if err != nil { + log.Error(log.WebsocketMgr, err) + } + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(connectionMonitorDelay) } - w.noConnectionChecks++ - case w.IsConnecting(): - if w.reconnectionChecks >= w.reconnectionLimit { - return fmt.Errorf("%v websocket failed to reconnect after %v seconds", - w.exchangeName, - w.reconnectionLimit*int(connectionMonitorDelay.Seconds())) - } - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v Busy reconnecting", w.exchangeName) - } - w.reconnectionChecks++ - default: - w.noConnectionChecks = 0 - w.reconnectionChecks = 0 } - return nil -} - -// IsConnected exposes websocket connection status -func (w *Websocket) IsConnected() bool { - w.m.Lock() - defer w.m.Unlock() - return w.connected -} - -// IsConnecting checks whether websocket is busy connecting -func (w *Websocket) IsConnecting() bool { - w.m.Lock() - defer w.m.Unlock() - return w.connecting } // Shutdown attempts to shut down a websocket connection and associated routines @@ -204,124 +186,145 @@ func (w *Websocket) Shutdown() error { w.Orderbook.FlushCache() w.m.Unlock() }() - if !w.connected && w.ShutdownC == nil { + if !w.IsConnected() { return fmt.Errorf("%v cannot shutdown a disconnected websocket", w.exchangeName) } if w.verbose { log.Debugf(log.WebsocketMgr, "%v shutting down websocket channels", w.exchangeName) } - timer := time.NewTimer(15 * time.Second) - c := make(chan struct{}, 1) - - go func(c chan struct{}) { - close(w.ShutdownC) - w.Wg.Wait() - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName) - } - c <- struct{}{} - }(c) - - select { - case <-c: - w.connected = false - return nil - case <-timer.C: - return fmt.Errorf("%s websocket routines failed to shutdown after 15 seconds", - w.GetName()) + close(w.ShutdownC) + w.Wg.Wait() + w.setConnectedStatus(false) + w.setConnectingStatus(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName) } + return nil } -// WebsocketReset sends the shutdown command, waits for channel/func closure and then reconnects -func (w *Websocket) WebsocketReset() { - err := w.Shutdown() - if err != nil { - // does not return here to allow connection to be made if already shut down - w.DataHandler <- fmt.Errorf("%v shutdown error: %v", w.exchangeName, err) - } - log.Infof(log.WebsocketMgr, "%v reconnecting to websocket", w.exchangeName) - w.m.Lock() - w.init = true - w.m.Unlock() - err = w.Connect() - if err != nil { - w.DataHandler <- fmt.Errorf("%v connection error: %v", w.exchangeName, err) - } -} - -// trafficMonitor monitors traffic and switches connection modes for websocket +// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires +// Will reconnect if the TrafficAlert channel has not received any data +// The trafficTimer will reset on each traffic alert func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) { w.Wg.Add(1) - wg.Done() // Makes sure we are unlocking after we add to waitgroup + wg.Done() + trafficTimer := time.NewTimer(w.trafficTimeout) defer func() { - if w.connected { - w.Disconnected <- struct{}{} + if !trafficTimer.Stop() { + select { + case <-trafficTimer.C: + default: + } } + w.setTrafficMonitorRunning(false) w.Wg.Done() }() - - // Define an initial traffic timer which will be a delay then fall over to - // WebsocketTrafficLimitTime after first response - trafficTimer := time.NewTimer(5 * time.Second) + if w.IsTrafficMonitorRunning() { + return + } + w.setTrafficMonitorRunning(true) for { select { - case <-w.ShutdownC: // Returns on shutdown channel close + case <-w.ShutdownC: if w.verbose { log.Debugf(log.WebsocketMgr, "%v trafficMonitor shutdown message received", w.exchangeName) } return - case <-w.TrafficAlert: // Resets timer on traffic - w.m.Lock() - if !w.connected { - w.Connected <- struct{}{} - w.connected = true + case <-w.TrafficAlert: + if !trafficTimer.Stop() { + select { + case <-trafficTimer.C: + default: + } } - w.m.Unlock() - trafficTimer.Reset(WebsocketTrafficLimitTime) + trafficTimer.Reset(w.trafficTimeout) case <-trafficTimer.C: // Falls through when timer runs out - newtimer := time.NewTimer(10 * time.Second) // New secondary timer set if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 5 seconds.", w.exchangeName) - } - w.m.Lock() - if w.connected { - // If connected divert traffic to rest - w.Disconnected <- struct{}{} - w.connected = false - } - w.m.Unlock() - - select { - case <-w.ShutdownC: // Returns on shutdown channel close - w.m.Lock() - w.connected = false - w.m.Unlock() - return - - case <-newtimer.C: // If secondary timer runs state timeout is sent to the data handler - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 15 seconds, exiting", w.exchangeName) - } - w.DataHandler <- fmt.Errorf("trafficMonitor %v", WebsocketStateTimeout) - return - - case <-w.TrafficAlert: // If in this time response traffic comes through - trafficTimer.Reset(WebsocketTrafficLimitTime) - w.m.Lock() - if !w.connected { - // If not connected dive rt traffic from REST to websocket - w.Connected <- struct{}{} - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has received a traffic alert. Setting status to connected", w.exchangeName) - } - w.connected = true - } - w.m.Unlock() + log.Warnf(log.WebsocketMgr, "%v has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout) } + go w.Shutdown() } } } +func (w *Websocket) setConnectedStatus(b bool) { + w.connectionMutex.Lock() + w.connected = b + w.connectionMutex.Unlock() +} + +// IsConnected returns status of connection +func (w *Websocket) IsConnected() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connected +} + +func (w *Websocket) setConnectingStatus(b bool) { + w.connectionMutex.Lock() + w.connecting = b + w.connectionMutex.Unlock() +} + +// IsConnecting returns status of connecting +func (w *Websocket) IsConnecting() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connecting +} + +func (w *Websocket) setEnabled(b bool) { + w.connectionMutex.Lock() + w.enabled = b + w.connectionMutex.Unlock() +} + +// IsEnabled returns status of enabled +func (w *Websocket) IsEnabled() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.enabled +} + +func (w *Websocket) setInit(b bool) { + w.connectionMutex.Lock() + w.init = b + w.connectionMutex.Unlock() +} + +// IsInit returns status of init +func (w *Websocket) IsInit() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.init +} + +func (w *Websocket) setTrafficMonitorRunning(b bool) { + w.connectionMutex.Lock() + w.trafficMonitorRunning = b + w.connectionMutex.Unlock() +} + +// IsTrafficMonitorRunning returns status of the traffic monitor +func (w *Websocket) IsTrafficMonitorRunning() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.trafficMonitorRunning +} + +func (w *Websocket) setConnectionMonitorRunning(b bool) { + w.connectionMutex.Lock() + w.connectionMonitorRunning = b + w.connectionMutex.Unlock() +} + +// IsConnectionMonitorRunning returns status of connection monitor +func (w *Websocket) IsConnectionMonitorRunning() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connectionMonitorRunning +} + // SetWebsocketURL sets websocket URL func (w *Websocket) SetWebsocketURL(websocketURL string) { if websocketURL == "" || websocketURL == config.WebsocketURLNonDefaultMessage { @@ -336,55 +339,28 @@ func (w *Websocket) GetWebsocketURL() string { return w.runningURL } -// SetWsStatusAndConnection sets if websocket is enabled -// it will also connect/disconnect the websocket connection -func (w *Websocket) SetWsStatusAndConnection(enabled bool) error { - w.m.Lock() - if w.enabled == enabled { - if w.init { - w.m.Unlock() +// Initialise verifies status and connects +func (w *Websocket) Initialise() error { + if w.IsEnabled() { + if w.IsInit() { return nil } - w.m.Unlock() - return fmt.Errorf("exchange_websocket.go error - already set as %t", - enabled) + return fmt.Errorf("%v Websocket already initialised", + w.exchangeName) } - w.enabled = enabled - if !w.init { - if enabled { - if w.connected { - w.m.Unlock() - return nil - } - w.m.Unlock() - return w.Connect() - } - - if !w.connected { - w.m.Unlock() - return nil - } - w.m.Unlock() - return w.Shutdown() - } - w.m.Unlock() + w.setEnabled(w.enabled) return nil } -// IsEnabled returns bool -func (w *Websocket) IsEnabled() bool { - return w.enabled -} - // SetProxyAddress sets websocket proxy address func (w *Websocket) SetProxyAddress(proxyAddr string) error { if w.proxyAddr == proxyAddr { - return errors.New("exchange_websocket.go error - Setting proxy address - same address") + return fmt.Errorf("%v Cannot set proxy address to the same address '%v'", w.exchangeName, w.proxyAddr) } w.proxyAddr = proxyAddr - if !w.init && w.enabled { - if w.connected { + if !w.IsInit() && w.IsEnabled() { + if w.IsConnected() { err := w.Shutdown() if err != nil { return err @@ -532,19 +508,28 @@ func (w *Websocket) manageSubscriptions() { for { select { case <-w.ShutdownC: + w.subscriptionMutex.Lock() w.subscribedChannels = []WebsocketChannelSubscription{} + w.subscriptionMutex.Unlock() if w.verbose { log.Debugf(log.WebsocketMgr, "%v shutdown manageSubscriptions", w.exchangeName) } return default: time.Sleep(manageSubscriptionsDelay) + if !w.IsConnected() { + w.subscriptionMutex.Lock() + w.subscribedChannels = []WebsocketChannelSubscription{} + w.subscriptionMutex.Unlock() + + continue + } if w.verbose { log.Debugf(log.WebsocketMgr, "%v checking subscriptions", w.exchangeName) } // Subscribe to channels Pending a subscription if w.SupportsFunctionality(WebsocketSubscribeSupported) { - err := w.subscribeToChannels() + err := w.appendSubscribedChannels() if err != nil { w.DataHandler <- err } @@ -559,11 +544,11 @@ func (w *Websocket) manageSubscriptions() { } } -// subscribeToChannels compares channelsToSubscribe to subscribedChannels +// appendSubscribedChannels compares channelsToSubscribe to subscribedChannels // and subscribes to any channels not present in subscribedChannels -func (w *Websocket) subscribeToChannels() error { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() +func (w *Websocket) appendSubscribedChannels() error { + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() for i := 0; i < len(w.channelsToSubscribe); i++ { channelIsSubscribed := false for j := 0; j < len(w.subscribedChannels); j++ { @@ -589,8 +574,8 @@ func (w *Websocket) subscribeToChannels() error { // unsubscribeToChannels compares subscribedChannels to channelsToSubscribe // and unsubscribes to any channels not present in channelsToSubscribe func (w *Websocket) unsubscribeToChannels() error { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() for i := 0; i < len(w.subscribedChannels); i++ { subscriptionFound := false for j := 0; j < len(w.channelsToSubscribe); j++ { @@ -622,8 +607,8 @@ func (w *Websocket) RemoveSubscribedChannels(channels []WebsocketChannelSubscrip // removeChannelToSubscribe removes an entry from w.channelsToSubscribe // so an unsubscribe event can be triggered func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelSubscription) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() channelLength := len(w.channelsToSubscribe) i := 0 for j := 0; j < len(w.channelsToSubscribe); j++ { @@ -644,8 +629,8 @@ func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelS // ResubscribeToChannel calls unsubscribe func and // removes it from subscribedChannels to trigger a subscribe event func (w *Websocket) ResubscribeToChannel(subscribedChannel WebsocketChannelSubscription) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() err := w.channelUnsubscriber(subscribedChannel) if err != nil { w.DataHandler <- err @@ -675,7 +660,6 @@ func (w *Websocket) SubscribeToChannels(channels []WebsocketChannelSubscription) w.channelsToSubscribe = append(w.channelsToSubscribe, channels[i]) } } - w.noConnectionChecks = 0 } // Equal two WebsocketChannelSubscription to determine equality @@ -693,16 +677,16 @@ func (w *Websocket) GetSubscriptions() []WebsocketChannelSubscription { // SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() w.canUseAuthenticatedEndpoints = val } // CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) CanUseAuthenticatedEndpoints() bool { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() return w.canUseAuthenticatedEndpoints } @@ -735,6 +719,10 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header } return fmt.Errorf("%v Error: %v", w.URL, err) } + if w.Verbose { + log.Infof(log.WebsocketMgr, "%v Websocket connected", w.ExchangeName) + } + w.setConnectedStatus(true) return nil } @@ -742,6 +730,9 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header func (w *WebsocketConnection) SendMessage(data interface{}) error { w.Lock() defer w.Unlock() + if !w.IsConnected() { + return fmt.Errorf("%v cannot send message to a disconnected websocket", w.ExchangeName) + } json, err := common.JSONEncode(data) if err != nil { return err @@ -801,10 +792,26 @@ func (w *WebsocketConnection) WaitForResult(id int64, wg *sync.WaitGroup) { } } +func (w *WebsocketConnection) setConnectedStatus(b bool) { + w.connectionMutex.Lock() + w.connected = b + w.connectionMutex.Unlock() +} + +// IsConnected exposes websocket connection status +func (w *WebsocketConnection) IsConnected() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connected +} + // ReadMessage reads messages, can handle text, gzip and binary func (w *WebsocketConnection) ReadMessage() (WebsocketResponse, error) { mType, resp, err := w.Connection.ReadMessage() if err != nil { + if isDisconnectionError(err) { + w.setConnectedStatus(false) + } return WebsocketResponse{}, err } var standardMessage []byte @@ -866,3 +873,15 @@ func (w *WebsocketConnection) GenerateMessageID(useNano bool) int64 { } return time.Now().Unix() } + +// isDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error +func isDisconnectionError(err error) bool { + if websocket.IsUnexpectedCloseError(err) { + return true + } + switch err.(type) { + case *websocket.CloseError, *net.OpError: + return true + } + return false +} diff --git a/exchanges/websocket/wshandler/wshandler_test.go b/exchanges/websocket/wshandler/wshandler_test.go index f3699a8b..931ec661 100644 --- a/exchanges/websocket/wshandler/wshandler_test.go +++ b/exchanges/websocket/wshandler/wshandler_test.go @@ -1,38 +1,151 @@ package wshandler import ( - "fmt" + "bytes" + "compress/flate" + "compress/gzip" + "errors" + "net" + "net/http" + "os" "strings" + "sync" "testing" "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/currency" ) -var ws *Websocket +func TestTrafficMonitorTimeout(t *testing.T) { + ws := New() + err := ws.Setup( + &WebsocketSetup{ + Enabled: true, + AuthenticatedWebsocketAPISupport: true, + WebsocketTimeout: 10000, + DefaultURL: "testDefaultURL", + ExchangeName: "exchangeName", + RunningURL: "testRunningURL", + Connector: func() error { return nil }, + Subscriber: func(test WebsocketChannelSubscription) error { return nil }, + UnSubscriber: func(test WebsocketChannelSubscription) error { return nil }, + }) + if err != nil { + t.Error(err) + } + ws.setConnectedStatus(true) + ws.TrafficAlert = make(chan struct{}, 2) + ws.ShutdownC = make(chan struct{}) + var anotherWG sync.WaitGroup + anotherWG.Add(1) + go ws.trafficMonitor(&anotherWG) + anotherWG.Wait() + ws.TrafficAlert <- struct{}{} + trafficTimer := time.NewTimer(5 * time.Second) + select { + case <-trafficTimer.C: + t.Error("should be exiting") + default: + ws.Wg.Wait() + } +} -func TestWebsocketInit(t *testing.T) { - ws = New() - if ws == nil { - t.Error("test failed - Websocket New() error") +func TestIsDisconnectionError(t *testing.T) { + isADisconnectionError := isDisconnectionError(errors.New("errorText")) + if isADisconnectionError { + t.Error("Its not") + } + isADisconnectionError = isDisconnectionError(&websocket.CloseError{ + Code: 1006, + Text: "errorText", + }) + if !isADisconnectionError { + t.Error("It is") + } + + isADisconnectionError = isDisconnectionError(&net.OpError{ + Op: "", + Net: "", + Source: nil, + Addr: nil, + Err: errors.New("errorText"), + }) + if !isADisconnectionError { + t.Error("It is") + } +} + +func TestConnectionMessageErrors(t *testing.T) { + ws := New() + ws.connected = true + ws.enabled = true + ws.ReadMessageErrors = make(chan error) + ws.DataHandler = make(chan interface{}) + ws.ShutdownC = make(chan struct{}) + ws.connector = func() error { return nil } + go ws.connectionMonitor() + timer := time.NewTimer(900 * time.Millisecond) + ws.ReadMessageErrors <- errors.New("errorText") + select { + case err := <-ws.DataHandler: + if err.(error).Error() != "errorText" { + t.Errorf("Expected 'errorText', received %v", err) + } + case <-timer.C: + t.Error("Timeout waiting for datahandler to receive error") + } + timer = time.NewTimer(900 * time.Millisecond) + ws.ReadMessageErrors <- &websocket.CloseError{ + Code: 1006, + Text: "errorText", + } +outer: + for { + select { + case <-ws.DataHandler: + t.Fatal("Error is a disconnection error") + case <-timer.C: + break outer + } } } func TestWebsocket(t *testing.T) { - if err := ws.SetProxyAddress("testProxy"); err != nil { + ws := Websocket{} + ws.setInit(true) + err := ws.Setup(&WebsocketSetup{ + ExchangeName: "test", + Enabled: true, + }) + if err != nil && err.Error() != "test Websocket already initialised" { + t.Errorf("Expected 'test Websocket already initialised', received %v", err) + } + + ws = *New() + err = ws.SetProxyAddress("testProxy") + if err != nil { t.Error("test failed - SetProxyAddress", err) } - ws.Setup(func() error { return nil }, - func(test WebsocketChannelSubscription) error { return nil }, - func(test WebsocketChannelSubscription) error { return nil }, - "testName", - true, - false, - "testDefaultURL", - "testRunningURL", - false) + err = ws.Setup( + &WebsocketSetup{ + Enabled: true, + AuthenticatedWebsocketAPISupport: true, + WebsocketTimeout: 2, + DefaultURL: "testDefaultURL", + ExchangeName: "exchangeName", + RunningURL: "testRunningURL", + Connector: func() error { return nil }, + Subscriber: func(test WebsocketChannelSubscription) error { return nil }, + UnSubscriber: func(test WebsocketChannelSubscription) error { return nil }, + }) + if err != nil { + t.Error(err) + } - // Test variable setting and retreival - if ws.GetName() != "testName" { + if ws.GetName() != "exchangeName" { t.Error("test failed - WebsocketSetup") } @@ -52,25 +165,11 @@ func TestWebsocket(t *testing.T) { t.Error("test failed - WebsocketSetup") } - // Test websocket connect and shutdown functions - comms := make(chan struct{}, 1) - go func() { - var count int - for { - if count == 4 { - close(comms) - return - } - select { - case <-ws.Connected: - count++ - case <-ws.Disconnected: - count++ - } - } - }() + if ws.trafficTimeout != time.Duration(2) { + t.Error("test failed - WebsocketSetup") + } // -- Not connected shutdown - err := ws.Shutdown() + err = ws.Shutdown() if err == nil { t.Fatal("test failed - should not be connected to able to shut down") } @@ -80,70 +179,61 @@ func TestWebsocket(t *testing.T) { if err != nil { t.Fatal("test failed - WebsocketSetup", err) } - + ws.SetWebsocketURL("ws://demos.kaazing.com/echo") // -- Already connected connect err = ws.Connect() if err == nil { t.Fatal("test failed - should not connect, already connected") } - - ws.SetWebsocketURL("") - - // -- Set true when already true - err = ws.SetWsStatusAndConnection(true) - if err == nil { - t.Fatal("test failed - setting enabled should not work") - } - - // -- Set false normal - err = ws.SetWsStatusAndConnection(false) - if err != nil { - t.Fatal("test failed - setting enabled should not work") - } - - // -- Set true normal - err = ws.SetWsStatusAndConnection(true) - if err != nil { - t.Fatal("test failed - setting enabled should not work") - } - // -- Normal shutdown err = ws.Shutdown() if err != nil { t.Fatal("test failed - WebsocketSetup", err) } - - timer := time.NewTimer(5 * time.Second) - select { - case <-comms: - case <-timer.C: - t.Fatal("test failed - WebsocketSetup - timeout") - } + ws.Wg.Wait() } func TestFunctionality(t *testing.T) { - var w Websocket - - if w.FormatFunctionality() != NoWebsocketSupportText { + ws := New() + if ws.FormatFunctionality() != NoWebsocketSupportText { t.Fatalf("Test Failed - FormatFunctionality error expected %s but received %s", - NoWebsocketSupportText, w.FormatFunctionality()) + NoWebsocketSupportText, ws.FormatFunctionality()) } - w.Functionality = 1 << 31 + ws.Functionality = 1 << 31 - if w.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" { + if ws.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" { t.Fatal("Test Failed - GetFunctionality error incorrect error returned") } - w.Functionality = WebsocketOrderbookSupported + ws.Functionality = WebsocketOrderbookSupported - if w.GetFunctionality() != WebsocketOrderbookSupported { + if ws.GetFunctionality() != WebsocketOrderbookSupported { t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned") } - if !w.SupportsFunctionality(WebsocketOrderbookSupported) { + if !ws.SupportsFunctionality(WebsocketOrderbookSupported) { t.Fatal("Test Failed - SupportsFunctionality error should be true") } + + ws.Functionality = WebsocketTickerSupported | WebsocketOrderbookSupported | WebsocketKlineSupported | + WebsocketTradeDataSupported | WebsocketAccountSupported | WebsocketAllowsRequests | + WebsocketSubscribeSupported | WebsocketUnsubscribeSupported | WebsocketAuthenticatedEndpointsSupported | + WebsocketAccountDataSupported | WebsocketSubmitOrderSupported | WebsocketCancelOrderSupported | + WebsocketWithdrawSupported | WebsocketMessageCorrelationSupported | WebsocketSequenceNumberSupported | + WebsocketDeadMansSwitchSupported + formatted := ws.FormatFunctionality() + + if !strings.Contains(formatted, WebsocketTickerSupportedText) || !strings.Contains(formatted, WebsocketOrderbookSupportedText) || + !strings.Contains(formatted, WebsocketKlineSupportedText) || !strings.Contains(formatted, WebsocketTradeDataSupportedText) || + !strings.Contains(formatted, WebsocketAccountSupportedText) || !strings.Contains(formatted, WebsocketAllowsRequestsText) || + !strings.Contains(formatted, WebsocketSubscribeSupportedText) || !strings.Contains(formatted, WebsocketUnsubscribeSupportedText) || + !strings.Contains(formatted, WebsocketAuthenticatedEndpointsSupportedText) || !strings.Contains(formatted, WebsocketAccountDataSupportedText) || + !strings.Contains(formatted, WebsocketSubmitOrderSupportedText) || !strings.Contains(formatted, WebsocketCancelOrderSupportedText) || + !strings.Contains(formatted, WebsocketWithdrawSupportedText) || !strings.Contains(formatted, WebsocketMessageCorrelationSupportedText) || + !strings.Contains(formatted, WebsocketSequenceNumberSupportedText) || !strings.Contains(formatted, WebsocketDeadMansSwitchSupportedText) { + t.Error("Failed to format and include supported websocket features") + } } // placeholderSubscriber basic function to test subscriptions @@ -162,12 +252,32 @@ func TestSubscribe(t *testing.T) { subscribedChannels: []WebsocketChannelSubscription{}, } w.SetChannelSubscriber(placeholderSubscriber) - w.subscribeToChannels() + err := w.appendSubscribedChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Subscription did not occur") } } +// TestSubscribe logic test +func TestSubscribeToChannels(t *testing.T) { + w := Websocket{ + channelsToSubscribe: []WebsocketChannelSubscription{ + { + Channel: "hello", + }, + }, + subscribedChannels: []WebsocketChannelSubscription{}, + } + w.SetChannelSubscriber(placeholderSubscriber) + w.SubscribeToChannels([]WebsocketChannelSubscription{{Channel: "hello"}, {Channel: "hello2"}}) + if len(w.channelsToSubscribe) != 2 { + t.Errorf("Subscription did not occur") + } +} + // TestUnsubscribe logic test func TestUnsubscribe(t *testing.T) { w := Websocket{ @@ -179,7 +289,10 @@ func TestUnsubscribe(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 0 { t.Errorf("Unsubscription did not occur") } @@ -200,7 +313,10 @@ func TestSubscriptionWithExistingEntry(t *testing.T) { }, } w.SetChannelSubscriber(placeholderSubscriber) - w.subscribeToChannels() + err := w.appendSubscribedChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Subscription should not have occurred") } @@ -221,7 +337,10 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Unsubscription should not have occurred") } @@ -230,67 +349,49 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) { // TestManageSubscriptionsStartStop logic test func TestManageSubscriptionsStartStop(t *testing.T) { w := Websocket{ - ShutdownC: make(chan struct{}, 1), + ShutdownC: make(chan struct{}), Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported, } go w.manageSubscriptions() - time.Sleep(time.Second) close(w.ShutdownC) + w.Wg.Wait() +} + +// TestManageSubscriptionsStartStop logic test +func TestManageSubscriptions(t *testing.T) { + w := Websocket{ + ShutdownC: make(chan struct{}), + Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported, + subscribedChannels: []WebsocketChannelSubscription{ + { + Channel: "hello", + }, + }, + } + w.SetChannelUnsubscriber(placeholderSubscriber) + w.SetChannelSubscriber(placeholderSubscriber) + w.setConnectedStatus(true) + go w.manageSubscriptions() + time.Sleep(8 * time.Second) + w.setConnectedStatus(false) + time.Sleep(manageSubscriptionsDelay) + w.subscriptionMutex.Lock() + if len(w.subscribedChannels) > 0 { + t.Error("Expected empty subscribed channels") + } + w.subscriptionMutex.Unlock() } // TestConnectionMonitorNoConnection logic test func TestConnectionMonitorNoConnection(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.exchangeName = "hello" - go w.connectionMonitor() - err := <-w.DataHandler - if !strings.EqualFold(err.(error).Error(), - fmt.Sprintf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)) { - t.Errorf("expecting error 'connectionMonitor: websocket disabled, shutting down', received '%v'", err) - } -} - -// TestWsNoConnectionTolerance logic test -func TestWsNoConnectionTolerance(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.noConnectionCheckLimit = 500 - w.checkConnection() - if w.noConnectionChecks == 0 { - t.Errorf("Expected noConnectionTolerance to increment, received '%v'", w.noConnectionChecks) - } -} - -// TestConnecting logic test -func TestConnecting(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.connecting = true - w.reconnectionLimit = 500 - w.checkConnection() - if w.reconnectionChecks != 1 { - t.Errorf("Expected reconnectionLimit to increment, received '%v'", w.reconnectionChecks) - } -} - -// TestReconnectionLimit logic test -func TestReconnectionLimit(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.connecting = true - w.reconnectionChecks = 99 - w.reconnectionLimit = 1 - err := w.checkConnection() - if err == nil { - t.Error("Expected error") + ws := New() + ws.DataHandler = make(chan interface{}, 1) + ws.ShutdownC = make(chan struct{}, 1) + ws.exchangeName = "hello" + ws.trafficTimeout = 1 + go ws.connectionMonitor() + if ws.IsConnectionMonitorRunning() { + t.Fatal("Should have exited") } } @@ -360,26 +461,259 @@ func TestSliceCopyDoesntImpactBoth(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 2 { t.Errorf("Unsubscription did not occur") } w.subscribedChannels[0].Channel = "test" if strings.EqualFold(w.subscribedChannels[0].Channel, w.channelsToSubscribe[0].Channel) { - t.Errorf("Slice has not been copies appropriately") + t.Errorf("Slice has not been copied appropriately") + } +} + +// TestSliceCopyDoesntImpactBoth logic test +func TestGetSubscriptions(t *testing.T) { + w := Websocket{ + subscribedChannels: []WebsocketChannelSubscription{ + { + Channel: "hello3", + }, + }, + } + + subs := w.GetSubscriptions() + subs[0].Channel = "noHELLO" + if strings.EqualFold(w.subscribedChannels[0].Channel, subs[0].Channel) { + t.Error("Subscriptions was not copied properly") } } // TestSetCanUseAuthenticatedEndpoints logic test func TestSetCanUseAuthenticatedEndpoints(t *testing.T) { - w := Websocket{} - result := w.CanUseAuthenticatedEndpoints() + ws := New() + result := ws.CanUseAuthenticatedEndpoints() if result { t.Error("expected `canUseAuthenticatedEndpoints` to be false") } - w.SetCanUseAuthenticatedEndpoints(true) - result = w.CanUseAuthenticatedEndpoints() + ws.SetCanUseAuthenticatedEndpoints(true) + result = ws.CanUseAuthenticatedEndpoints() if !result { t.Error("expected `canUseAuthenticatedEndpoints` to be true") } } + +func TestRemoveSubscribedChannels(t *testing.T) { + w := Websocket{ + channelsToSubscribe: []WebsocketChannelSubscription{ + { + Channel: "hello3", + }, + }, + } + + w.RemoveSubscribedChannels([]WebsocketChannelSubscription{{Channel: "hello3"}}) + if len(w.channelsToSubscribe) == 1 { + t.Error("Did not remove subscription") + } +} + +const ( + websocketTestURL = "wss://www.bitmex.com/realtime" + returnResponseURL = "wss://ws.kraken.com" + useProxyTests = false // Disabled by default. Freely available proxy servers that work all the time are difficult to find + proxyURL = "http://212.186.171.4:80" // Replace with a usable proxy server +) + +var wc *WebsocketConnection +var dialer websocket.Dialer + +type testStruct struct { + Error error + WC WebsocketConnection +} + +type testRequest struct { + Event string `json:"event"` + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair"` + Subscription testRequestData `json:"subscription,omitempty"` +} + +// testRequestData contains details on WS channel +type testRequestData struct { + Name string `json:"name,omitempty"` + Interval int64 `json:"interval,omitempty"` + Depth int64 `json:"depth,omitempty"` +} + +type testResponse struct { + RequestID int64 `json:"reqid,omitempty"` +} + +// TestMain setup test +func TestMain(m *testing.M) { + wc = &WebsocketConnection{ + ExchangeName: "test", + URL: returnResponseURL, + ResponseMaxLimit: 7000000000, + ResponseCheckTimeout: 30000000, + } + os.Exit(m.Run()) +} + +// TestDial logic test +func TestDial(t *testing.T) { + var testCases = []testStruct{ + {Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + } + for i := 0; i < len(testCases); i++ { + testData := &testCases[i] + t.Run(testData.WC.ExchangeName, func(t *testing.T) { + if testData.WC.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := testData.WC.Dial(&dialer, http.Header{}) + if err != nil { + if testData.Error != nil && err.Error() == testData.Error.Error() { + return + } + t.Fatal(err) + } + }) + } +} + +// TestSendMessage logic test +func TestSendMessage(t *testing.T) { + var testCases = []testStruct{ + {Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + } + for i := 0; i < len(testCases); i++ { + testData := &testCases[i] + t.Run(testData.WC.ExchangeName, func(t *testing.T) { + if testData.WC.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := testData.WC.Dial(&dialer, http.Header{}) + if err != nil { + if testData.Error != nil && err.Error() == testData.Error.Error() { + return + } + t.Fatal(err) + } + err = testData.WC.SendMessage("ping") + if err != nil { + t.Error(err) + } + }) + } +} + +// TestSendMessageWithResponse logic test +func TestSendMessageWithResponse(t *testing.T) { + if wc.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := wc.Dial(&dialer, http.Header{}) + if err != nil { + t.Fatal(err) + } + go readMessages(wc, t) + + request := testRequest{ + Event: "subscribe", + Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, + Subscription: testRequestData{ + Name: "ticker", + }, + RequestID: wc.GenerateMessageID(false), + } + _, err = wc.SendMessageReturnResponse(request.RequestID, request) + if err != nil { + t.Error(err) + } +} + +// TestParseBinaryResponse logic test +func TestParseBinaryResponse(t *testing.T) { + var b bytes.Buffer + w := gzip.NewWriter(&b) + _, err := w.Write([]byte("hello")) + if err != nil { + t.Error(err) + } + err = w.Close() + if err != nil { + t.Error(err) + } + var resp []byte + resp, err = wc.parseBinaryResponse(b.Bytes()) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(string(resp), "hello") { + t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp)) + } + + var b2 bytes.Buffer + w2, err2 := flate.NewWriter(&b2, 1) + if err2 != nil { + t.Error(err2) + } + _, err2 = w2.Write([]byte("hello")) + if err2 != nil { + t.Error(err) + } + err2 = w2.Close() + if err2 != nil { + t.Error(err) + } + resp2, err3 := wc.parseBinaryResponse(b2.Bytes()) + if err3 != nil { + t.Error(err3) + } + if !strings.EqualFold(string(resp2), "hello") { + t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp2)) + } +} + +// TestAddResponseWithID logic test +func TestAddResponseWithID(t *testing.T) { + wc.IDResponses = nil + wc.AddResponseWithID(0, []byte("hi")) + wc.AddResponseWithID(1, []byte("hi")) +} + +// readMessages helper func +func readMessages(wc *WebsocketConnection, t *testing.T) { + timer := time.NewTimer(20 * time.Second) + for { + select { + case <-timer.C: + return + default: + resp, err := wc.ReadMessage() + if err != nil { + t.Error(err) + return + } + var incoming testResponse + err = common.JSONDecode(resp.Raw, &incoming) + if err != nil { + t.Error(err) + return + } + if incoming.RequestID > 0 { + wc.AddResponseWithID(incoming.RequestID, resp.Raw) + return + } + } + } +} diff --git a/exchanges/websocket/wshandler/wshandler_types.go b/exchanges/websocket/wshandler/wshandler_types.go index 529c124d..6469efe4 100644 --- a/exchanges/websocket/wshandler/wshandler_types.go +++ b/exchanges/websocket/wshandler/wshandler_types.go @@ -48,51 +48,40 @@ const ( WebsocketMessageCorrelationSupportedText = "WEBSOCKET MESSAGE CORRELATION SUPPORTED" WebsocketSequenceNumberSupportedText = "WEBSOCKET SEQUENCE NUMBER SUPPORTED" WebsocketDeadMansSwitchSupportedText = "WEBSOCKET DEAD MANS SWITCH SUPPORTED" - // WebsocketNotEnabled alerts of a disabled websocket - WebsocketNotEnabled = "exchange_websocket_not_enabled" - // WebsocketTrafficLimitTime defines a standard time for no traffic from the - // websocket connection - WebsocketTrafficLimitTime = 5 * time.Second - websocketRestablishConnection = time.Second - manageSubscriptionsDelay = 5 * time.Second + WebsocketNotEnabled = "exchange_websocket_not_enabled" + manageSubscriptionsDelay = 5 * time.Second // connection monitor time delays and limits connectionMonitorDelay = 2 * time.Second - // WebsocketStateTimeout defines a const for when a websocket connection - // times out, will be handled by the routine management system - WebsocketStateTimeout = "TIMEOUT" ) // Websocket defines a return type for websocket connections via the interface // wrapper for routine processing in routines.go type Websocket struct { - proxyAddr string - defaultURL string - runningURL string - exchangeName string - enabled bool - init bool - connected bool - connecting bool - verbose bool - connector func() error - m sync.Mutex - subscriptionLock sync.Mutex - connectionMonitorRunning bool - reconnectionLimit int - noConnectionChecks int - reconnectionChecks int - noConnectionCheckLimit int - subscribedChannels []WebsocketChannelSubscription - channelsToSubscribe []WebsocketChannelSubscription - channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error - channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error - // Connected denotes a channel switch for diversion of request flow - Connected chan struct{} - // Disconnected denotes a channel switch for diversion of request flow - Disconnected chan struct{} - // DataHandler pipes websocket data to an exchange websocket data handler - DataHandler chan interface{} + // Functionality defines websocket stream capabilities + Functionality uint32 + canUseAuthenticatedEndpoints bool + enabled bool + init bool + connected bool + connecting bool + trafficMonitorRunning bool + verbose bool + connectionMonitorRunning bool + trafficTimeout time.Duration + proxyAddr string + defaultURL string + runningURL string + exchangeName string + m sync.Mutex + subscriptionMutex sync.Mutex + connectionMutex sync.RWMutex + connector func() error + subscribedChannels []WebsocketChannelSubscription + channelsToSubscribe []WebsocketChannelSubscription + channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error + channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error + DataHandler chan interface{} // ShutdownC is the main shutdown channel which controls all websocket go funcs ShutdownC chan struct{} // Orderbook is a local cache of orderbooks @@ -102,9 +91,21 @@ type Websocket struct { Wg sync.WaitGroup // TrafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} - // Functionality defines websocket stream capabilities - Functionality uint32 - canUseAuthenticatedEndpoints bool + // ReadMessageErrors will received all errors from ws.ReadMessage() and verify if its a disconnection + ReadMessageErrors chan error +} + +type WebsocketSetup struct { + Enabled bool + Verbose bool + AuthenticatedWebsocketAPISupport bool + WebsocketTimeout time.Duration + DefaultURL string + ExchangeName string + RunningURL string + Connector func() error + Subscriber func(channelToSubscribe WebsocketChannelSubscription) error + UnSubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error } // WebsocketChannelSubscription container for websocket subscriptions @@ -187,16 +188,19 @@ type WebsocketPositionUpdated struct { // WebsocketConnection contains all the data needed to send a message to a WS type WebsocketConnection struct { sync.Mutex - Verbose bool - RateLimit float64 - ExchangeName string - URL string - ProxyURL string - Wg sync.WaitGroup - Connection *websocket.Conn - Shutdown chan struct{} + Verbose bool + connected bool + connectionMutex sync.RWMutex + RateLimit float64 + ExchangeName string + URL string + ProxyURL string + Wg sync.WaitGroup + Connection *websocket.Conn + Shutdown chan struct{} // These are the request IDs and the corresponding response JSON IDResponses map[int64][]byte ResponseCheckTimeout time.Duration ResponseMaxLimit time.Duration + TrafficTimeout time.Duration } diff --git a/exchanges/websocket/wsorderbook/wsorderbook.go b/exchanges/websocket/wsorderbook/wsorderbook.go index 66b59d13..3446ddbd 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook.go +++ b/exchanges/websocket/wsorderbook/wsorderbook.go @@ -201,7 +201,7 @@ func (w *WebsocketOrderbookLocal) updateByIDAndAction(orderbookUpdate *Websocket // LoadSnapshot loads initial snapshot of ob data, overwrite allows full // ob to be completely rewritten because the exchange is a doing a full // update not an incremental one -func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, overwrite bool) error { +func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base) error { if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 { return fmt.Errorf("%v snapshot ask and bids are nil", w.exchangeName) } @@ -216,11 +216,8 @@ func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, ove if w.ob[newOrderbook.Pair][newOrderbook.AssetType] != nil && (len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Asks) > 0 || len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Bids) > 0) { - if overwrite { - w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook - return newOrderbook.Process() - } - return fmt.Errorf("%v snapshot instance already found", w.exchangeName) + w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook + return newOrderbook.Process() } w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook return newOrderbook.Process() diff --git a/exchanges/websocket/wsorderbook/wsorderbook_test.go b/exchanges/websocket/wsorderbook/wsorderbook_test.go index b3edf789..acc00095 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook_test.go +++ b/exchanges/websocket/wsorderbook/wsorderbook_test.go @@ -38,7 +38,7 @@ func createSnapshot() (obl *WebsocketOrderbookLocal, curr currency.Pair, asks, b snapShot1.AssetType = asset.Spot snapShot1.Pair = curr obl = &WebsocketOrderbookLocal{} - err = obl.LoadSnapshot(&snapShot1, false) + err = obl.LoadSnapshot(&snapShot1) return } @@ -382,8 +382,7 @@ func TestRunSnapshotWithNoData(t *testing.T) { snapShot1.Pair = curr snapShot1.ExchangeName = "test" obl.exchangeName = "test" - err := obl.LoadSnapshot(&snapShot1, - false) + err := obl.LoadSnapshot(&snapShot1) if err == nil { t.Fatal("expected an error loading a snapshot") } @@ -392,8 +391,8 @@ func TestRunSnapshotWithNoData(t *testing.T) { } } -// TestLoadSnapshotWithOverride logic test -func TestLoadSnapshotWithOverride(t *testing.T) { +// TestLoadSnapshot logic test +func TestLoadSnapshot(t *testing.T) { var obl WebsocketOrderbookLocal var snapShot1 orderbook.Base curr := currency.NewPairFromString("BTCUSD") @@ -407,21 +406,13 @@ func TestLoadSnapshotWithOverride(t *testing.T) { snapShot1.Bids = bids snapShot1.AssetType = asset.Spot snapShot1.Pair = curr - err := obl.LoadSnapshot(&snapShot1, false) - if err != nil { - t.Error(err) - } - err = obl.LoadSnapshot(&snapShot1, false) - if err == nil { - t.Error("expected error: 'snapshot instance already found'") - } - err = obl.LoadSnapshot(&snapShot1, true) + err := obl.LoadSnapshot(&snapShot1) if err != nil { t.Error(err) } } -// TestInsertWithIDs logic test +// TestFlushCache logic test func TestFlushCache(t *testing.T) { obl, curr, _, _, err := createSnapshot() if err != nil { @@ -473,7 +464,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot1.Bids = bids snapShot1.AssetType = asset.Spot snapShot1.Pair = currency.NewPairFromString("BTCUSD") - err := obl.LoadSnapshot(&snapShot1, false) + err := obl.LoadSnapshot(&snapShot1) if err != nil { t.Fatal(err) } @@ -510,7 +501,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot2.Bids = bids snapShot2.AssetType = asset.Spot snapShot2.Pair = currency.NewPairFromString("LTCUSD") - err = obl.LoadSnapshot(&snapShot2, false) + err = obl.LoadSnapshot(&snapShot2) if err != nil { t.Fatal(err) } @@ -547,7 +538,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot3.Bids = bids snapShot3.AssetType = "FUTURES" snapShot3.Pair = currency.NewPairFromString("LTCUSD") - err = obl.LoadSnapshot(&snapShot3, false) + err = obl.LoadSnapshot(&snapShot3) if err != nil { t.Fatal(err) } diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go index 7cd23866..f52c0aed 100644 --- a/exchanges/zb/zb_websocket.go +++ b/exchanges/zb/zb_websocket.go @@ -58,7 +58,7 @@ func (z *ZB) WsHandleData() { default: resp, err := z.WebsocketConn.ReadMessage() if err != nil { - z.Websocket.DataHandler <- err + z.Websocket.ReadMessageErrors <- err return } z.Websocket.TrafficAlert <- struct{}{} @@ -143,8 +143,7 @@ func (z *ZB) WsHandleData() { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = cPair - err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - true) + err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { z.Websocket.DataHandler <- err continue diff --git a/exchanges/zb/zb_wrapper.go b/exchanges/zb/zb_wrapper.go index e415043d..37a2a637 100644 --- a/exchanges/zb/zb_wrapper.go +++ b/exchanges/zb/zb_wrapper.go @@ -117,15 +117,18 @@ func (z *ZB) Setup(exch *config.ExchangeConfig) error { return err } - err = z.Websocket.Setup(z.WsConnect, - z.Subscribe, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - zbWebsocketAPI, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = z.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: zbWebsocketAPI, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: z.WsConnect, + Subscriber: z.Subscribe, + }) if err != nil { return err }