Feature+Bugfix: Engine websocket management (#360)

* Initial commit tearing down the websocket connection management. The purpose is to remove the traffic monitoring and dropping as syncer.go is a better manager

* Adds a readwrite mutex and helper functions to minimise inline lock/unlocks and prevent races

* Creates new WebsocketType struct to contain all parameters required. Deletes WebsocketReset. Utilises ReadMessageErrors channel for all websocket readmessages to analyse when an error returned is due to a disconnect

* Fixes issue with syncer trying to connect while connecting

* Simplifies initialisation function for websocket. Reconnects and resubscribes after disconnection

* Adds WebsocketTimeout config value to dictate when the websocket traffic monitor should die. Default to two minutes of no traffic activity. Increases test coverage and updates existing tests to work with new technologic. RE-ADDS TESTS I ACCIDENTALLY DELETED FROM PREVIOUS PR

* Removes snapshot override as its always necessary when considering reconnections. Increases test coverage. Re-adds tests that were ACCIDENTALLY DELETED. Removes unused websocket channels. Bug fix for traffic monitor to shutdown via goroutine instead of killing itself

* Fixes gateio bug for authentication errors when null. Adds little entry to syncer for when websocket is switched to rest and then back, you get a log notifying of the return. Fixes okgroup bug where ws message is sent on a disconnected ws, causing panic. Renames setConnectionStatus to setConnectedStatus. Puts connection monitor log behind verbose bool

* Fixes lingering races. Fixes bug where websocket was enabled whether you liked it or not. Removes demonstration test

* Fixes log message, renames unc, removes comments

* Fixes data race

* Removes verbosity, ensures shutdown sets connection status appropriately

* Removes go routine causing CPU spike. Stops timers properly and resets timers properly

* Renames `WsEnabled` to `Enabled`. Increases test coverage. Fixes typos. Handles unhandled errors

* The forgotten lint

* With using RWlocks, removes the channel nil check and relies on !w.IsConnected() to prevent a shutdown from recurring

* Removes extra closure step in the defer as it causes all the issues

* Prevents timer channel hangups. Minimises use of websocket Connect(). Expands disconnection error definition. Removes routine disconnection error handling. Ensures only one traffic monitor can ever be run. Renames subscriptionLock to subscriptionMutext for consistency

* Extends timeout to 30 seconds to cover for non-popular exchanges and non-popular currencies

* Updates test from rebase to use new websocket setup function

* Fixes test to ensure it tests what it says it does
This commit is contained in:
Scott
2019-10-02 09:06:52 +10:00
committed by Adrian Gallagher
parent 580190638c
commit c2a33300f5
47 changed files with 1080 additions and 689 deletions

View File

@@ -41,6 +41,7 @@ const (
configDefaultWebsocketResponseCheckTimeout = time.Millisecond * 30
configDefaultWebsocketResponseMaxLimit = time.Second * 7
configDefaultWebsocketOrderbookBufferLimit = 5
configDefaultWebsocketTrafficTimeout = time.Second * 30
configMaxAuthFailures = 3
defaultNTPAllowedDifference = 50000000
defaultNTPAllowedNegativeDifference = 50000000
@@ -1024,6 +1025,11 @@ func (c *Config) CheckExchangeConfigValues() error {
c.Exchanges[i].Name, configDefaultWebsocketResponseMaxLimit)
c.Exchanges[i].WebsocketResponseMaxLimit = configDefaultWebsocketResponseMaxLimit
}
if c.Exchanges[i].WebsocketTrafficTimeout <= 0 {
log.Warnf(log.ExchangeSys, "Exchange %s Websocket response traffic timeout value not set, defaulting to %v.",
c.Exchanges[i].Name, configDefaultWebsocketTrafficTimeout)
c.Exchanges[i].WebsocketTrafficTimeout = configDefaultWebsocketTrafficTimeout
}
if c.Exchanges[i].WebsocketOrderbookBufferLimit <= 0 {
log.Warnf(log.ExchangeSys, "Exchange %s Websocket orderbook buffer limit value not set, defaulting to %v.",
c.Exchanges[i].Name, configDefaultWebsocketOrderbookBufferLimit)

View File

@@ -1451,6 +1451,7 @@ func TestCheckExchangeConfigValues(t *testing.T) {
cfg.Exchanges[0].WebsocketResponseMaxLimit = 0
cfg.Exchanges[0].WebsocketResponseCheckTimeout = 0
cfg.Exchanges[0].WebsocketOrderbookBufferLimit = 0
cfg.Exchanges[0].WebsocketTrafficTimeout = 0
cfg.Exchanges[0].HTTPTimeout = 0
err = cfg.CheckExchangeConfigValues()
if err != nil {
@@ -1465,6 +1466,10 @@ func TestCheckExchangeConfigValues(t *testing.T) {
t.Errorf("expected exchange %s to have updated WebsocketOrderbookBufferLimit value",
cfg.Exchanges[0].Name)
}
if cfg.Exchanges[0].WebsocketTrafficTimeout == 0 {
t.Errorf("expected exchange %s to have updated WebsocketTrafficTimeout value",
cfg.Exchanges[0].Name)
}
if cfg.Exchanges[0].HTTPTimeout == 0 {
t.Errorf("expected exchange %s to have updated HTTPTimeout value",
cfg.Exchanges[0].Name)

View File

@@ -59,6 +59,7 @@ type ExchangeConfig struct {
HTTPRateLimiter *HTTPRateLimitConfig `json:"httpRateLimiter,omitempty"`
WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"`
WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"`
WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"`
WebsocketOrderbookBufferLimit int `json:"websocketOrderbookBufferLimit"`
ProxyAddress string `json:"proxyAddress,omitempty"`
BaseCurrencies currency.Currencies `json:"baseCurrencies"`

View File

@@ -228,7 +228,6 @@ func LoadExchange(name string, useWG bool, wg *sync.WaitGroup) error {
if exchCfg.Features.Supports.RESTCapabilities.AutoPairUpdates {
exchCfg.Features.Enabled.AutoPairUpdates = false
}
}
}

View File

@@ -354,39 +354,12 @@ func Websocketshutdown(ws *wshandler.Websocket) error {
}
}
// streamDiversion is a diversion switch from websocket to REST or other
// alternative feed
func streamDiversion(ws *wshandler.Websocket) {
wg.Add(1)
defer wg.Done()
for {
select {
case <-shutdowner:
return
case <-ws.Connected:
if Bot.Settings.Verbose {
log.Debugf(log.WebsocketMgr, "exchange %s websocket feed connected\n", ws.GetName())
}
case <-ws.Disconnected:
if Bot.Settings.Verbose {
log.Debugf(log.WebsocketMgr, "exchange %s websocket feed disconnected, switching to REST functionality\n",
ws.GetName())
}
}
}
}
// WebsocketDataHandler handles websocket data coming from a websocket feed
// associated with an exchange
func WebsocketDataHandler(ws *wshandler.Websocket) {
wg.Add(1)
defer wg.Done()
go streamDiversion(ws)
for {
select {
case <-shutdowner:
@@ -407,14 +380,7 @@ func WebsocketDataHandler(ws *wshandler.Websocket) {
}
case error:
switch {
case strings.Contains(d.Error(), "close 1006"):
go ws.WebsocketReset()
continue
default:
log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data)
}
log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data)
case wshandler.TradeData:
// Trade Data
// if Bot.Settings.Verbose {

View File

@@ -286,7 +286,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
supportsRESTTickerBatching := Bot.Exchanges[x].SupportsRESTTickerBatchUpdates()
var usingREST bool
var usingWebsocket bool
var switchedToRest bool
if Bot.Exchanges[x].SupportsWebsocket() && Bot.Exchanges[x].IsWebsocketEnabled() {
ws, err := Bot.Exchanges[x].GetWebsocket()
if err != nil {
@@ -346,7 +346,12 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err)
continue
}
if switchedToRest && usingWebsocket {
log.Infof(log.SyncMgr,
"%s %s: Websocket re-enabled, switching from rest to websocket\n",
c.Exchange, FormatCurrency(p).String())
switchedToRest = false
}
if e.Cfg.SyncTicker {
if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTicker) {
if c.Ticker.LastUpdated.IsZero() || time.Since(c.Ticker.LastUpdated) > defaultSyncerTimeout {
@@ -362,6 +367,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
log.Warnf(log.SyncMgr,
"%s %s: No ticker update after 10 seconds, switching from websocket to rest\n",
c.Exchange, FormatCurrency(p).String())
switchedToRest = true
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, false)
}
}
@@ -425,6 +431,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
log.Warnf(log.SyncMgr,
"%s %s: No orderbook update after 15 seconds, switching from websocket to rest\n",
c.Exchange, FormatCurrency(c.Pair).String())
switchedToRest = true
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false)
}
}
@@ -491,7 +498,7 @@ func (e *ExchangeCurrencyPairSyncer) Start() {
usingREST = true
}
if !ws.IsConnected() {
if !ws.IsConnected() && !ws.IsConnecting() {
go WebsocketDataHandler(ws)
err = ws.Connect()

View File

@@ -38,6 +38,7 @@ func (a *Alphapoint) WebsocketClient() {
for a.Enabled {
msgType, resp, err := a.WebsocketConn.ReadMessage()
if err != nil {
a.Websocket.ReadMessageErrors <- err
log.Error(log.ExchangeSys, err)
break
}

View File

@@ -21,8 +21,8 @@ const (
binanceDefaultWebsocketURL = "wss://stream.binance.com:9443"
)
// WSConnect intiates a websocket connection
func (b *Binance) WSConnect() error {
// WsConnect intiates a websocket connection
func (b *Binance) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return errors.New(wshandler.WebsocketNotEnabled)
}
@@ -87,7 +87,7 @@ func (b *Binance) WsHandleData() {
default:
read, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.DataHandler <- err
b.Websocket.ReadMessageErrors <- err
return
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -248,7 +248,7 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error {
newOrderBook.Pair = p
newOrderBook.AssetType = asset.Spot
return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// UpdateLocalCache updates and returns the most recent iteration of the orderbook

View File

@@ -113,15 +113,18 @@ func (b *Binance) Setup(exch *config.ExchangeConfig) error {
return err
}
err = b.Websocket.Setup(b.WSConnect,
nil,
nil,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
binanceDefaultWebsocketURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = b.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: binanceDefaultWebsocketURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: b.WsConnect,
})
if err != nil {
return err
}

View File

@@ -133,6 +133,7 @@ func (b *Bitfinex) WsConnect() error {
resp, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.ReadMessageErrors <- err
return fmt.Errorf("%v unable to read from Websocket. Error: %s", b.Name, err)
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -177,7 +178,7 @@ func (b *Bitfinex) WsDataHandler() {
default:
stream, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.DataHandler <- err
b.Websocket.ReadMessageErrors <- err
return
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -481,7 +482,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books
newOrderBook.AssetType = assetType
newOrderBook.Bids = bid
newOrderBook.Pair = p
err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return fmt.Errorf("bitfinex.go error - %s", err)
}

View File

@@ -114,15 +114,19 @@ func (b *Bitfinex) Setup(exch *config.ExchangeConfig) error {
return err
}
err = b.Websocket.Setup(b.WsConnect,
b.Subscribe,
b.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
bitfinexWebsocket,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = b.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: bitfinexWebsocket,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
UnSubscriber: b.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -66,8 +66,8 @@ var (
pongChan = make(chan int, 1)
)
// WsConnector initiates a new websocket connection
func (b *Bitmex) WsConnector() error {
// WsConnect initiates a new websocket connection
func (b *Bitmex) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return errors.New(wshandler.WebsocketNotEnabled)
}
@@ -79,6 +79,7 @@ func (b *Bitmex) WsConnector() error {
p, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.ReadMessageErrors <- err
return err
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -360,7 +361,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai
newOrderBook.Bids = bids
newOrderBook.AssetType = assetType
newOrderBook.Pair = currencyPair
err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return fmt.Errorf("bitmex_websocket.go process orderbook error - %s",
err)

View File

@@ -137,15 +137,19 @@ func (b *Bitmex) Setup(exch *config.ExchangeConfig) error {
return err
}
err = b.Websocket.Setup(b.WsConnector,
b.Subscribe,
b.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
bitmexWSURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = b.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: bitmexWSURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
UnSubscriber: b.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -62,7 +62,7 @@ func (b *Bitstamp) WsHandleData() {
default:
resp, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.DataHandler <- err
b.Websocket.ReadMessageErrors <- err
return
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -78,7 +78,7 @@ func (b *Bitstamp) WsHandleData() {
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.GetName())
}
go b.Websocket.WebsocketReset()
go b.Websocket.Shutdown() // Connection monitor will reconnect
case "data":
wsOrderBookTemp := websocketOrderBookResponse{}
@@ -248,7 +248,7 @@ func (b *Bitstamp) seedOrderBook() error {
newOrderBook.Pair = p[x]
newOrderBook.AssetType = asset.Spot
err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
}

View File

@@ -110,15 +110,19 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) error {
return err
}
err = b.Websocket.Setup(b.WsConnect,
b.Subscribe,
b.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
bitstampWSURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = b.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: bitstampWSURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
UnSubscriber: b.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -54,7 +54,7 @@ func (b *BTSE) WsHandleData() {
default:
resp, err := b.WebsocketConn.ReadMessage()
if err != nil {
b.Websocket.DataHandler <- err
b.Websocket.ReadMessageErrors <- err
return
}
b.Websocket.TrafficAlert <- struct{}{}
@@ -162,7 +162,7 @@ func (b *BTSE) wsProcessSnapshot(snapshot *websocketOrderbookSnapshot) error {
base.LastUpdated = time.Now()
base.ExchangeName = b.Name
err := b.Websocket.Orderbook.LoadSnapshot(&base, true)
err := b.Websocket.Orderbook.LoadSnapshot(&base)
if err != nil {
return err
}

View File

@@ -109,15 +109,19 @@ func (b *BTSE) Setup(exch *config.ExchangeConfig) error {
return err
}
err = b.Websocket.Setup(b.WsConnect,
b.Subscribe,
b.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
btseWebsocket,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = b.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: btseWebsocket,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
UnSubscriber: b.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -54,7 +54,7 @@ func (c *CoinbasePro) WsHandleData() {
default:
resp, err := c.WebsocketConn.ReadMessage()
if err != nil {
c.Websocket.DataHandler <- err
c.Websocket.ReadMessageErrors <- err
return
}
c.Websocket.TrafficAlert <- struct{}{}
@@ -217,7 +217,7 @@ func (c *CoinbasePro) ProcessSnapshot(snapshot *WebsocketOrderbookSnapshot) erro
base.AssetType = asset.Spot
base.Pair = pair
err := c.Websocket.Orderbook.LoadSnapshot(&base, false)
err := c.Websocket.Orderbook.LoadSnapshot(&base)
if err != nil {
return err
}

View File

@@ -115,15 +115,19 @@ func (c *CoinbasePro) Setup(exch *config.ExchangeConfig) error {
return err
}
err = c.Websocket.Setup(c.WsConnect,
c.Subscribe,
c.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
coinbaseproWebsocketURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = c.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: coinbaseproWebsocketURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
UnSubscriber: c.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -77,7 +77,7 @@ func (c *COINUT) WsHandleData() {
default:
resp, err := c.WebsocketConn.ReadMessage()
if err != nil {
c.Websocket.DataHandler <- err
c.Websocket.ReadMessageErrors <- err
return
}
c.Websocket.TrafficAlert <- struct{}{}
@@ -289,7 +289,7 @@ func (c *COINUT) WsProcessOrderbookSnapshot(ob *WsOrderbookSnapshot) error {
)
newOrderBook.AssetType = asset.Spot
return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// WsProcessOrderbookUpdate process an orderbook update

View File

@@ -116,15 +116,19 @@ func (c *COINUT) Setup(exch *config.ExchangeConfig) error {
return err
}
err = c.Websocket.Setup(c.WsConnect,
c.Subscribe,
c.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
coinutWebsocketURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = c.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: coinutWebsocketURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
UnSubscriber: c.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -449,7 +449,7 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error {
}
if e.Features.Supports.Websocket {
e.Websocket.SetWsStatusAndConnection(exch.Features.Enabled.Websocket)
e.Websocket.Initialise()
}
return nil
}

View File

@@ -1233,7 +1233,10 @@ func TestIsWebsocketEnabled(t *testing.T) {
}
b.Websocket = wshandler.New()
b.Websocket.Setup(nil, nil, nil, "", true, false, "", "", false)
err := b.Websocket.Setup(&wshandler.WebsocketSetup{Enabled: true})
if err != nil {
t.Error(err)
}
if !b.IsWebsocketEnabled() {
t.Error("websocket should be enabled")
}

View File

@@ -462,7 +462,7 @@ type WebSocketOrderQueryRecords struct {
// WebsocketAuthenticationResponse contains the result of a login request
type WebsocketAuthenticationResponse struct {
Error string `json:"error"`
Error string `json:"error,omitempty"`
Result struct {
Status string `json:"status"`
} `json:"result"`

View File

@@ -92,7 +92,7 @@ func (g *Gateio) WsHandleData() {
default:
resp, err := g.WebsocketConn.ReadMessage()
if err != nil {
g.Websocket.DataHandler <- err
g.Websocket.ReadMessageErrors <- err
return
}
g.Websocket.TrafficAlert <- struct{}{}
@@ -238,8 +238,7 @@ func (g *Gateio) WsHandleData() {
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = currency.NewPairFromString(c)
err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
true)
err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
g.Websocket.DataHandler <- err
}

View File

@@ -117,15 +117,19 @@ func (g *Gateio) Setup(exch *config.ExchangeConfig) error {
return err
}
err = g.Websocket.Setup(g.WsConnect,
g.Subscribe,
g.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
gateioWebsocketEndpoint,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = g.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: gateioWebsocketEndpoint,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: g.WsConnect,
Subscriber: g.Subscribe,
UnSubscriber: g.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -282,8 +282,7 @@ func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pa
newOrderBook.Bids = bids
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = pair
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
false)
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
g.Websocket.DataHandler <- err
return

View File

@@ -116,15 +116,17 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error {
g.API.Endpoints.URL = geminiSandboxAPIURL
}
err = g.Websocket.Setup(g.WsConnect,
nil,
nil,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
geminiWebsocketEndpoint,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = g.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: geminiWebsocketEndpoint,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: g.WsConnect,
})
if err != nil {
return err
}

View File

@@ -65,7 +65,7 @@ func (h *HitBTC) WsHandleData() {
default:
resp, err := h.WebsocketConn.ReadMessage()
if err != nil {
h.Websocket.DataHandler <- err
h.Websocket.ReadMessageErrors <- err
return
}
h.Websocket.TrafficAlert <- struct{}{}
@@ -251,7 +251,7 @@ func (h *HitBTC) WsProcessOrderbookSnapshot(ob WsOrderbook) error {
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = p
err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
}

View File

@@ -115,15 +115,19 @@ func (h *HitBTC) Setup(exch *config.ExchangeConfig) error {
return err
}
err = h.Websocket.Setup(h.WsConnect,
h.Subscribe,
h.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
hitbtcWebsocketAddress,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = h.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: hitbtcWebsocketAddress,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: h.WsConnect,
Subscriber: h.Subscribe,
UnSubscriber: h.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -313,7 +313,7 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
newOrderBook.Asks = asks
newOrderBook.Bids = bids
newOrderBook.Pair = p
err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true)
err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
}

View File

@@ -119,15 +119,19 @@ func (h *HUOBI) Setup(exch *config.ExchangeConfig) error {
h.API.PEMKeySupport = exch.API.PEMKeySupport
h.API.Credentials.PEMKey = exch.API.Credentials.PEMKey
err = h.Websocket.Setup(h.WsConnect,
h.Subscribe,
h.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
wsMarketURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = h.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: wsMarketURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: h.WsConnect,
Subscriber: h.Subscribe,
UnSubscriber: h.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -110,9 +110,7 @@ func (k *Kraken) WsHandleData() {
default:
resp, err := k.WebsocketConn.ReadMessage()
if err != nil {
k.Websocket.DataHandler <- fmt.Errorf("%v WsHandleData: %v",
k.Name,
err)
k.Websocket.ReadMessageErrors <- err
return
}
k.Websocket.TrafficAlert <- struct{}{}
@@ -384,7 +382,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob
}
}
base.LastUpdated = highestLastUpdate
err := k.Websocket.Orderbook.LoadSnapshot(&base, true)
err := k.Websocket.Orderbook.LoadSnapshot(&base)
if err != nil {
k.Websocket.DataHandler <- err
return
@@ -509,7 +507,7 @@ func (k *Kraken) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip
Subscription: WebsocketSubscriptionData{
Name: channelToSubscribe.Channel,
},
RequestID: k.WebsocketConn.GenerateMessageID(true),
RequestID: k.WebsocketConn.GenerateMessageID(false),
}
_, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp)
return err
@@ -523,7 +521,7 @@ func (k *Kraken) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
Subscription: WebsocketSubscriptionData{
Name: channelToSubscribe.Channel,
},
RequestID: k.WebsocketConn.GenerateMessageID(true),
RequestID: k.WebsocketConn.GenerateMessageID(false),
}
_, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp)
return err

View File

@@ -119,15 +119,19 @@ func (k *Kraken) Setup(exch *config.ExchangeConfig) error {
return err
}
err = k.Websocket.Setup(k.WsConnect,
k.Subscribe,
k.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
krakenWSURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = k.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: krakenWSURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: k.WsConnect,
Subscriber: k.Subscribe,
UnSubscriber: k.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -205,7 +205,7 @@ func (l *LakeBTC) processOrderbook(obUpdate, channel string) error {
Price: price,
})
}
return l.Websocket.Orderbook.LoadSnapshot(&book, true)
return l.Websocket.Orderbook.LoadSnapshot(&book)
}
func (l *LakeBTC) getCurrencyFromChannel(channel string) currency.Pair {

View File

@@ -110,15 +110,18 @@ func (l *LakeBTC) Setup(exch *config.ExchangeConfig) error {
return err
}
err = l.Websocket.Setup(l.WsConnect,
l.Subscribe,
nil,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
lakeBTCWSURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = l.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: lakeBTCWSURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: l.WsConnect,
Subscriber: l.Subscribe,
})
if err != nil {
return err
}

View File

@@ -193,6 +193,9 @@ func (o *OKGroup) wsPingHandler(wg *sync.WaitGroup) {
return
case <-ticker.C:
if !o.Websocket.IsConnected() {
continue
}
err := o.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("ping"))
if o.Verbose {
log.Debugf(log.ExchangeSys, "%v sending ping", o.GetName())
@@ -221,7 +224,7 @@ func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) {
default:
resp, err := o.WebsocketConn.ReadMessage()
if err != nil {
o.Websocket.DataHandler <- err
o.Websocket.ReadMessageErrors <- err
return
}
o.Websocket.TrafficAlert <- struct{}{}
@@ -475,7 +478,7 @@ func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, i
ExchangeName: o.GetName(),
}
err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true)
err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
}

View File

@@ -31,15 +31,18 @@ func (o *OKGroup) Setup(exch *config.ExchangeConfig) error {
return err
}
err = o.Websocket.Setup(o.WsConnect,
o.Subscribe,
o.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
o.API.Endpoints.WebsocketURL,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = o.Websocket.Setup(&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: o.API.Endpoints.WebsocketURL,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: o.WsConnect,
Subscriber: o.Subscribe,
UnSubscriber: o.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -88,7 +88,7 @@ func (p *Poloniex) WsHandleData() {
default:
resp, err := p.WebsocketConn.ReadMessage()
if err != nil {
p.Websocket.DataHandler <- err
p.Websocket.ReadMessageErrors <- err
return
}
p.Websocket.TrafficAlert <- struct{}{}
@@ -330,7 +330,7 @@ func (p *Poloniex) WsProcessOrderbookSnapshot(ob []interface{}, symbol string) e
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = currency.NewPairFromString(symbol)
return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false)
return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// WsProcessOrderbookUpdate processes new orderbook updates

View File

@@ -113,15 +113,19 @@ func (p *Poloniex) Setup(exch *config.ExchangeConfig) error {
return err
}
err = p.Websocket.Setup(p.WsConnect,
p.Subscribe,
p.Unsubscribe,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
poloniexWebsocketAddress,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = p.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: poloniexWebsocketAddress,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: p.WsConnect,
Subscriber: p.Subscribe,
UnSubscriber: p.Unsubscribe,
})
if err != nil {
return err
}

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
@@ -31,42 +32,28 @@ func New() *Websocket {
}
// Setup sets main variables for websocket connection
func (w *Websocket) Setup(connector func() error,
subscriber func(channelToSubscribe WebsocketChannelSubscription) error,
unsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error,
exchangeName string,
wsEnabled,
verbose bool,
defaultURL,
runningURL string,
authenticatedWebsocketAPISupport bool) error {
func (w *Websocket) Setup(setupData *WebsocketSetup) error {
w.DataHandler = make(chan interface{}, 1)
w.Connected = make(chan struct{}, 1)
w.Disconnected = make(chan struct{}, 1)
w.TrafficAlert = make(chan struct{}, 1)
w.verbose = verbose
w.SetChannelSubscriber(subscriber)
w.SetChannelUnsubscriber(unsubscriber)
err := w.SetWsStatusAndConnection(wsEnabled)
w.verbose = setupData.Verbose
w.SetChannelSubscriber(setupData.Subscriber)
w.SetChannelUnsubscriber(setupData.UnSubscriber)
w.enabled = setupData.Enabled
w.SetDefaultURL(setupData.DefaultURL)
w.SetConnector(setupData.Connector)
w.SetWebsocketURL(setupData.RunningURL)
w.SetExchangeName(setupData.ExchangeName)
w.SetCanUseAuthenticatedEndpoints(setupData.AuthenticatedWebsocketAPISupport)
w.trafficTimeout = setupData.WebsocketTimeout
err := w.Initialise()
if err != nil {
return err
}
w.SetDefaultURL(defaultURL)
w.SetConnector(connector)
w.SetWebsocketURL(runningURL)
w.SetExchangeName(exchangeName)
w.SetCanUseAuthenticatedEndpoints(authenticatedWebsocketAPISupport)
w.init = false
w.noConnectionCheckLimit = 5
w.reconnectionLimit = 10
return nil
}
// Connect intiates a websocket connection by using a package defined connection
// Connect initiates a websocket connection by using a package defined connection
// function
func (w *Websocket) Connect() error {
w.m.Lock()
@@ -75,32 +62,33 @@ func (w *Websocket) Connect() error {
if !w.IsEnabled() {
return errors.New(WebsocketNotEnabled)
}
if w.connected {
w.connecting = false
return errors.New("exchange_websocket.go error - already connected, cannot connect again")
if w.IsConnecting() {
return fmt.Errorf("%v Websocket already attempting to connect",
w.exchangeName)
}
w.connecting = true
if w.IsConnected() {
return fmt.Errorf("%v Websocket already connected",
w.exchangeName)
}
w.setConnectingStatus(true)
w.ShutdownC = make(chan struct{}, 1)
w.ReadMessageErrors = make(chan error, 1)
err := w.connector()
if err != nil {
w.connecting = false
return fmt.Errorf("exchange_websocket.go connection error %s",
err)
w.setConnectingStatus(false)
return fmt.Errorf("%v Error connecting %s",
w.exchangeName, err)
}
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
w.connecting = false
}
w.setConnectedStatus(true)
w.setConnectingStatus(false)
w.setInit(true)
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go w.trafficMonitor(&anotherWG)
anotherWG.Wait()
if !w.connectionMonitorRunning {
if !w.IsConnectionMonitorRunning() {
go w.connectionMonitor()
}
if w.SupportsFunctionality(WebsocketSubscribeSupported) || w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
@@ -112,88 +100,82 @@ func (w *Websocket) Connect() error {
// connectionMonitor ensures that the WS keeps connecting
func (w *Websocket) connectionMonitor() {
w.m.Lock()
w.connectionMonitorRunning = true
w.m.Unlock()
if w.IsConnectionMonitorRunning() {
return
}
w.setConnectionMonitorRunning(true)
timer := time.NewTimer(connectionMonitorDelay)
defer func() {
w.connectionMonitorRunning = false
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
w.setConnectionMonitorRunning(false)
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting",
w.exchangeName)
}
}()
for {
time.Sleep(connectionMonitorDelay)
w.m.Lock()
if !w.enabled {
w.m.Unlock()
w.DataHandler <- fmt.Errorf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)
err := w.Shutdown()
if err != nil {
log.Error(log.WebsocketMgr, err)
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v running connection monitor cycle",
w.exchangeName)
}
if !w.IsEnabled() {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)
}
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Error(log.WebsocketMgr, err)
}
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v connectionMonitor exiting",
log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting",
w.exchangeName)
}
return
}
w.m.Unlock()
err := w.checkConnection()
if err != nil {
log.Error(log.WebsocketMgr, err)
}
}
}
// checkConnection ensures the connection is maintained
// Will reconnect on disconnect
func (w *Websocket) checkConnection() error {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v checking connection", w.exchangeName)
}
switch {
case !w.IsConnected() && !w.IsConnecting():
w.m.Lock()
defer w.m.Unlock()
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v no connection. Attempt %v/%v", w.exchangeName, w.noConnectionChecks, w.noConnectionCheckLimit)
}
if w.noConnectionChecks >= w.noConnectionCheckLimit {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v resetting connection", w.exchangeName)
select {
case err := <-w.ReadMessageErrors:
// check if this error is a disconnection error
if isDisconnectionError(err) {
w.setConnectedStatus(false)
w.setConnectingStatus(false)
w.setInit(false)
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v",
w.exchangeName, err)
}
err = w.Connect()
if err != nil {
log.Error(log.WebsocketMgr, err)
}
} else {
// pass off non disconnect errors to datahandler to manage
w.DataHandler <- err
}
w.connecting = true
go w.WebsocketReset()
w.noConnectionChecks = 0
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()
if err != nil {
log.Error(log.WebsocketMgr, err)
}
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(connectionMonitorDelay)
}
w.noConnectionChecks++
case w.IsConnecting():
if w.reconnectionChecks >= w.reconnectionLimit {
return fmt.Errorf("%v websocket failed to reconnect after %v seconds",
w.exchangeName,
w.reconnectionLimit*int(connectionMonitorDelay.Seconds()))
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v Busy reconnecting", w.exchangeName)
}
w.reconnectionChecks++
default:
w.noConnectionChecks = 0
w.reconnectionChecks = 0
}
return nil
}
// IsConnected exposes websocket connection status
func (w *Websocket) IsConnected() bool {
w.m.Lock()
defer w.m.Unlock()
return w.connected
}
// IsConnecting checks whether websocket is busy connecting
func (w *Websocket) IsConnecting() bool {
w.m.Lock()
defer w.m.Unlock()
return w.connecting
}
// Shutdown attempts to shut down a websocket connection and associated routines
@@ -204,124 +186,145 @@ func (w *Websocket) Shutdown() error {
w.Orderbook.FlushCache()
w.m.Unlock()
}()
if !w.connected && w.ShutdownC == nil {
if !w.IsConnected() {
return fmt.Errorf("%v cannot shutdown a disconnected websocket", w.exchangeName)
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v shutting down websocket channels", w.exchangeName)
}
timer := time.NewTimer(15 * time.Second)
c := make(chan struct{}, 1)
go func(c chan struct{}) {
close(w.ShutdownC)
w.Wg.Wait()
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName)
}
c <- struct{}{}
}(c)
select {
case <-c:
w.connected = false
return nil
case <-timer.C:
return fmt.Errorf("%s websocket routines failed to shutdown after 15 seconds",
w.GetName())
close(w.ShutdownC)
w.Wg.Wait()
w.setConnectedStatus(false)
w.setConnectingStatus(false)
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName)
}
return nil
}
// WebsocketReset sends the shutdown command, waits for channel/func closure and then reconnects
func (w *Websocket) WebsocketReset() {
err := w.Shutdown()
if err != nil {
// does not return here to allow connection to be made if already shut down
w.DataHandler <- fmt.Errorf("%v shutdown error: %v", w.exchangeName, err)
}
log.Infof(log.WebsocketMgr, "%v reconnecting to websocket", w.exchangeName)
w.m.Lock()
w.init = true
w.m.Unlock()
err = w.Connect()
if err != nil {
w.DataHandler <- fmt.Errorf("%v connection error: %v", w.exchangeName, err)
}
}
// trafficMonitor monitors traffic and switches connection modes for websocket
// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires
// Will reconnect if the TrafficAlert channel has not received any data
// The trafficTimer will reset on each traffic alert
func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) {
w.Wg.Add(1)
wg.Done() // Makes sure we are unlocking after we add to waitgroup
wg.Done()
trafficTimer := time.NewTimer(w.trafficTimeout)
defer func() {
if w.connected {
w.Disconnected <- struct{}{}
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
}
}
w.setTrafficMonitorRunning(false)
w.Wg.Done()
}()
// Define an initial traffic timer which will be a delay then fall over to
// WebsocketTrafficLimitTime after first response
trafficTimer := time.NewTimer(5 * time.Second)
if w.IsTrafficMonitorRunning() {
return
}
w.setTrafficMonitorRunning(true)
for {
select {
case <-w.ShutdownC: // Returns on shutdown channel close
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v trafficMonitor shutdown message received", w.exchangeName)
}
return
case <-w.TrafficAlert: // Resets timer on traffic
w.m.Lock()
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
}
}
w.m.Unlock()
trafficTimer.Reset(WebsocketTrafficLimitTime)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
newtimer := time.NewTimer(10 * time.Second) // New secondary timer set
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 5 seconds.", w.exchangeName)
}
w.m.Lock()
if w.connected {
// If connected divert traffic to rest
w.Disconnected <- struct{}{}
w.connected = false
}
w.m.Unlock()
select {
case <-w.ShutdownC: // Returns on shutdown channel close
w.m.Lock()
w.connected = false
w.m.Unlock()
return
case <-newtimer.C: // If secondary timer runs state timeout is sent to the data handler
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 15 seconds, exiting", w.exchangeName)
}
w.DataHandler <- fmt.Errorf("trafficMonitor %v", WebsocketStateTimeout)
return
case <-w.TrafficAlert: // If in this time response traffic comes through
trafficTimer.Reset(WebsocketTrafficLimitTime)
w.m.Lock()
if !w.connected {
// If not connected dive rt traffic from REST to websocket
w.Connected <- struct{}{}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v has received a traffic alert. Setting status to connected", w.exchangeName)
}
w.connected = true
}
w.m.Unlock()
log.Warnf(log.WebsocketMgr, "%v has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
go w.Shutdown()
}
}
}
func (w *Websocket) setConnectedStatus(b bool) {
w.connectionMutex.Lock()
w.connected = b
w.connectionMutex.Unlock()
}
// IsConnected returns status of connection
func (w *Websocket) IsConnected() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.connected
}
func (w *Websocket) setConnectingStatus(b bool) {
w.connectionMutex.Lock()
w.connecting = b
w.connectionMutex.Unlock()
}
// IsConnecting returns status of connecting
func (w *Websocket) IsConnecting() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.connecting
}
func (w *Websocket) setEnabled(b bool) {
w.connectionMutex.Lock()
w.enabled = b
w.connectionMutex.Unlock()
}
// IsEnabled returns status of enabled
func (w *Websocket) IsEnabled() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.enabled
}
func (w *Websocket) setInit(b bool) {
w.connectionMutex.Lock()
w.init = b
w.connectionMutex.Unlock()
}
// IsInit returns status of init
func (w *Websocket) IsInit() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.init
}
func (w *Websocket) setTrafficMonitorRunning(b bool) {
w.connectionMutex.Lock()
w.trafficMonitorRunning = b
w.connectionMutex.Unlock()
}
// IsTrafficMonitorRunning returns status of the traffic monitor
func (w *Websocket) IsTrafficMonitorRunning() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.trafficMonitorRunning
}
func (w *Websocket) setConnectionMonitorRunning(b bool) {
w.connectionMutex.Lock()
w.connectionMonitorRunning = b
w.connectionMutex.Unlock()
}
// IsConnectionMonitorRunning returns status of connection monitor
func (w *Websocket) IsConnectionMonitorRunning() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.connectionMonitorRunning
}
// SetWebsocketURL sets websocket URL
func (w *Websocket) SetWebsocketURL(websocketURL string) {
if websocketURL == "" || websocketURL == config.WebsocketURLNonDefaultMessage {
@@ -336,55 +339,28 @@ func (w *Websocket) GetWebsocketURL() string {
return w.runningURL
}
// SetWsStatusAndConnection sets if websocket is enabled
// it will also connect/disconnect the websocket connection
func (w *Websocket) SetWsStatusAndConnection(enabled bool) error {
w.m.Lock()
if w.enabled == enabled {
if w.init {
w.m.Unlock()
// Initialise verifies status and connects
func (w *Websocket) Initialise() error {
if w.IsEnabled() {
if w.IsInit() {
return nil
}
w.m.Unlock()
return fmt.Errorf("exchange_websocket.go error - already set as %t",
enabled)
return fmt.Errorf("%v Websocket already initialised",
w.exchangeName)
}
w.enabled = enabled
if !w.init {
if enabled {
if w.connected {
w.m.Unlock()
return nil
}
w.m.Unlock()
return w.Connect()
}
if !w.connected {
w.m.Unlock()
return nil
}
w.m.Unlock()
return w.Shutdown()
}
w.m.Unlock()
w.setEnabled(w.enabled)
return nil
}
// IsEnabled returns bool
func (w *Websocket) IsEnabled() bool {
return w.enabled
}
// SetProxyAddress sets websocket proxy address
func (w *Websocket) SetProxyAddress(proxyAddr string) error {
if w.proxyAddr == proxyAddr {
return errors.New("exchange_websocket.go error - Setting proxy address - same address")
return fmt.Errorf("%v Cannot set proxy address to the same address '%v'", w.exchangeName, w.proxyAddr)
}
w.proxyAddr = proxyAddr
if !w.init && w.enabled {
if w.connected {
if !w.IsInit() && w.IsEnabled() {
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
return err
@@ -532,19 +508,28 @@ func (w *Websocket) manageSubscriptions() {
for {
select {
case <-w.ShutdownC:
w.subscriptionMutex.Lock()
w.subscribedChannels = []WebsocketChannelSubscription{}
w.subscriptionMutex.Unlock()
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v shutdown manageSubscriptions", w.exchangeName)
}
return
default:
time.Sleep(manageSubscriptionsDelay)
if !w.IsConnected() {
w.subscriptionMutex.Lock()
w.subscribedChannels = []WebsocketChannelSubscription{}
w.subscriptionMutex.Unlock()
continue
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v checking subscriptions", w.exchangeName)
}
// Subscribe to channels Pending a subscription
if w.SupportsFunctionality(WebsocketSubscribeSupported) {
err := w.subscribeToChannels()
err := w.appendSubscribedChannels()
if err != nil {
w.DataHandler <- err
}
@@ -559,11 +544,11 @@ func (w *Websocket) manageSubscriptions() {
}
}
// subscribeToChannels compares channelsToSubscribe to subscribedChannels
// appendSubscribedChannels compares channelsToSubscribe to subscribedChannels
// and subscribes to any channels not present in subscribedChannels
func (w *Websocket) subscribeToChannels() error {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
func (w *Websocket) appendSubscribedChannels() error {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
for i := 0; i < len(w.channelsToSubscribe); i++ {
channelIsSubscribed := false
for j := 0; j < len(w.subscribedChannels); j++ {
@@ -589,8 +574,8 @@ func (w *Websocket) subscribeToChannels() error {
// unsubscribeToChannels compares subscribedChannels to channelsToSubscribe
// and unsubscribes to any channels not present in channelsToSubscribe
func (w *Websocket) unsubscribeToChannels() error {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
for i := 0; i < len(w.subscribedChannels); i++ {
subscriptionFound := false
for j := 0; j < len(w.channelsToSubscribe); j++ {
@@ -622,8 +607,8 @@ func (w *Websocket) RemoveSubscribedChannels(channels []WebsocketChannelSubscrip
// removeChannelToSubscribe removes an entry from w.channelsToSubscribe
// so an unsubscribe event can be triggered
func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelSubscription) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
channelLength := len(w.channelsToSubscribe)
i := 0
for j := 0; j < len(w.channelsToSubscribe); j++ {
@@ -644,8 +629,8 @@ func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelS
// ResubscribeToChannel calls unsubscribe func and
// removes it from subscribedChannels to trigger a subscribe event
func (w *Websocket) ResubscribeToChannel(subscribedChannel WebsocketChannelSubscription) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
err := w.channelUnsubscriber(subscribedChannel)
if err != nil {
w.DataHandler <- err
@@ -675,7 +660,6 @@ func (w *Websocket) SubscribeToChannels(channels []WebsocketChannelSubscription)
w.channelsToSubscribe = append(w.channelsToSubscribe, channels[i])
}
}
w.noConnectionChecks = 0
}
// Equal two WebsocketChannelSubscription to determine equality
@@ -693,16 +677,16 @@ func (w *Websocket) GetSubscriptions() []WebsocketChannelSubscription {
// SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
w.canUseAuthenticatedEndpoints = val
}
// CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) CanUseAuthenticatedEndpoints() bool {
w.subscriptionLock.Lock()
defer w.subscriptionLock.Unlock()
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
return w.canUseAuthenticatedEndpoints
}
@@ -735,6 +719,10 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header
}
return fmt.Errorf("%v Error: %v", w.URL, err)
}
if w.Verbose {
log.Infof(log.WebsocketMgr, "%v Websocket connected", w.ExchangeName)
}
w.setConnectedStatus(true)
return nil
}
@@ -742,6 +730,9 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header
func (w *WebsocketConnection) SendMessage(data interface{}) error {
w.Lock()
defer w.Unlock()
if !w.IsConnected() {
return fmt.Errorf("%v cannot send message to a disconnected websocket", w.ExchangeName)
}
json, err := common.JSONEncode(data)
if err != nil {
return err
@@ -801,10 +792,26 @@ func (w *WebsocketConnection) WaitForResult(id int64, wg *sync.WaitGroup) {
}
}
func (w *WebsocketConnection) setConnectedStatus(b bool) {
w.connectionMutex.Lock()
w.connected = b
w.connectionMutex.Unlock()
}
// IsConnected exposes websocket connection status
func (w *WebsocketConnection) IsConnected() bool {
w.connectionMutex.RLock()
defer w.connectionMutex.RUnlock()
return w.connected
}
// ReadMessage reads messages, can handle text, gzip and binary
func (w *WebsocketConnection) ReadMessage() (WebsocketResponse, error) {
mType, resp, err := w.Connection.ReadMessage()
if err != nil {
if isDisconnectionError(err) {
w.setConnectedStatus(false)
}
return WebsocketResponse{}, err
}
var standardMessage []byte
@@ -866,3 +873,15 @@ func (w *WebsocketConnection) GenerateMessageID(useNano bool) int64 {
}
return time.Now().Unix()
}
// isDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error
func isDisconnectionError(err error) bool {
if websocket.IsUnexpectedCloseError(err) {
return true
}
switch err.(type) {
case *websocket.CloseError, *net.OpError:
return true
}
return false
}

View File

@@ -1,38 +1,151 @@
package wshandler
import (
"fmt"
"bytes"
"compress/flate"
"compress/gzip"
"errors"
"net"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
)
var ws *Websocket
func TestTrafficMonitorTimeout(t *testing.T) {
ws := New()
err := ws.Setup(
&WebsocketSetup{
Enabled: true,
AuthenticatedWebsocketAPISupport: true,
WebsocketTimeout: 10000,
DefaultURL: "testDefaultURL",
ExchangeName: "exchangeName",
RunningURL: "testRunningURL",
Connector: func() error { return nil },
Subscriber: func(test WebsocketChannelSubscription) error { return nil },
UnSubscriber: func(test WebsocketChannelSubscription) error { return nil },
})
if err != nil {
t.Error(err)
}
ws.setConnectedStatus(true)
ws.TrafficAlert = make(chan struct{}, 2)
ws.ShutdownC = make(chan struct{})
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go ws.trafficMonitor(&anotherWG)
anotherWG.Wait()
ws.TrafficAlert <- struct{}{}
trafficTimer := time.NewTimer(5 * time.Second)
select {
case <-trafficTimer.C:
t.Error("should be exiting")
default:
ws.Wg.Wait()
}
}
func TestWebsocketInit(t *testing.T) {
ws = New()
if ws == nil {
t.Error("test failed - Websocket New() error")
func TestIsDisconnectionError(t *testing.T) {
isADisconnectionError := isDisconnectionError(errors.New("errorText"))
if isADisconnectionError {
t.Error("Its not")
}
isADisconnectionError = isDisconnectionError(&websocket.CloseError{
Code: 1006,
Text: "errorText",
})
if !isADisconnectionError {
t.Error("It is")
}
isADisconnectionError = isDisconnectionError(&net.OpError{
Op: "",
Net: "",
Source: nil,
Addr: nil,
Err: errors.New("errorText"),
})
if !isADisconnectionError {
t.Error("It is")
}
}
func TestConnectionMessageErrors(t *testing.T) {
ws := New()
ws.connected = true
ws.enabled = true
ws.ReadMessageErrors = make(chan error)
ws.DataHandler = make(chan interface{})
ws.ShutdownC = make(chan struct{})
ws.connector = func() error { return nil }
go ws.connectionMonitor()
timer := time.NewTimer(900 * time.Millisecond)
ws.ReadMessageErrors <- errors.New("errorText")
select {
case err := <-ws.DataHandler:
if err.(error).Error() != "errorText" {
t.Errorf("Expected 'errorText', received %v", err)
}
case <-timer.C:
t.Error("Timeout waiting for datahandler to receive error")
}
timer = time.NewTimer(900 * time.Millisecond)
ws.ReadMessageErrors <- &websocket.CloseError{
Code: 1006,
Text: "errorText",
}
outer:
for {
select {
case <-ws.DataHandler:
t.Fatal("Error is a disconnection error")
case <-timer.C:
break outer
}
}
}
func TestWebsocket(t *testing.T) {
if err := ws.SetProxyAddress("testProxy"); err != nil {
ws := Websocket{}
ws.setInit(true)
err := ws.Setup(&WebsocketSetup{
ExchangeName: "test",
Enabled: true,
})
if err != nil && err.Error() != "test Websocket already initialised" {
t.Errorf("Expected 'test Websocket already initialised', received %v", err)
}
ws = *New()
err = ws.SetProxyAddress("testProxy")
if err != nil {
t.Error("test failed - SetProxyAddress", err)
}
ws.Setup(func() error { return nil },
func(test WebsocketChannelSubscription) error { return nil },
func(test WebsocketChannelSubscription) error { return nil },
"testName",
true,
false,
"testDefaultURL",
"testRunningURL",
false)
err = ws.Setup(
&WebsocketSetup{
Enabled: true,
AuthenticatedWebsocketAPISupport: true,
WebsocketTimeout: 2,
DefaultURL: "testDefaultURL",
ExchangeName: "exchangeName",
RunningURL: "testRunningURL",
Connector: func() error { return nil },
Subscriber: func(test WebsocketChannelSubscription) error { return nil },
UnSubscriber: func(test WebsocketChannelSubscription) error { return nil },
})
if err != nil {
t.Error(err)
}
// Test variable setting and retreival
if ws.GetName() != "testName" {
if ws.GetName() != "exchangeName" {
t.Error("test failed - WebsocketSetup")
}
@@ -52,25 +165,11 @@ func TestWebsocket(t *testing.T) {
t.Error("test failed - WebsocketSetup")
}
// Test websocket connect and shutdown functions
comms := make(chan struct{}, 1)
go func() {
var count int
for {
if count == 4 {
close(comms)
return
}
select {
case <-ws.Connected:
count++
case <-ws.Disconnected:
count++
}
}
}()
if ws.trafficTimeout != time.Duration(2) {
t.Error("test failed - WebsocketSetup")
}
// -- Not connected shutdown
err := ws.Shutdown()
err = ws.Shutdown()
if err == nil {
t.Fatal("test failed - should not be connected to able to shut down")
}
@@ -80,70 +179,61 @@ func TestWebsocket(t *testing.T) {
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
}
ws.SetWebsocketURL("ws://demos.kaazing.com/echo")
// -- Already connected connect
err = ws.Connect()
if err == nil {
t.Fatal("test failed - should not connect, already connected")
}
ws.SetWebsocketURL("")
// -- Set true when already true
err = ws.SetWsStatusAndConnection(true)
if err == nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Set false normal
err = ws.SetWsStatusAndConnection(false)
if err != nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Set true normal
err = ws.SetWsStatusAndConnection(true)
if err != nil {
t.Fatal("test failed - setting enabled should not work")
}
// -- Normal shutdown
err = ws.Shutdown()
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
}
timer := time.NewTimer(5 * time.Second)
select {
case <-comms:
case <-timer.C:
t.Fatal("test failed - WebsocketSetup - timeout")
}
ws.Wg.Wait()
}
func TestFunctionality(t *testing.T) {
var w Websocket
if w.FormatFunctionality() != NoWebsocketSupportText {
ws := New()
if ws.FormatFunctionality() != NoWebsocketSupportText {
t.Fatalf("Test Failed - FormatFunctionality error expected %s but received %s",
NoWebsocketSupportText, w.FormatFunctionality())
NoWebsocketSupportText, ws.FormatFunctionality())
}
w.Functionality = 1 << 31
ws.Functionality = 1 << 31
if w.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" {
if ws.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" {
t.Fatal("Test Failed - GetFunctionality error incorrect error returned")
}
w.Functionality = WebsocketOrderbookSupported
ws.Functionality = WebsocketOrderbookSupported
if w.GetFunctionality() != WebsocketOrderbookSupported {
if ws.GetFunctionality() != WebsocketOrderbookSupported {
t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned")
}
if !w.SupportsFunctionality(WebsocketOrderbookSupported) {
if !ws.SupportsFunctionality(WebsocketOrderbookSupported) {
t.Fatal("Test Failed - SupportsFunctionality error should be true")
}
ws.Functionality = WebsocketTickerSupported | WebsocketOrderbookSupported | WebsocketKlineSupported |
WebsocketTradeDataSupported | WebsocketAccountSupported | WebsocketAllowsRequests |
WebsocketSubscribeSupported | WebsocketUnsubscribeSupported | WebsocketAuthenticatedEndpointsSupported |
WebsocketAccountDataSupported | WebsocketSubmitOrderSupported | WebsocketCancelOrderSupported |
WebsocketWithdrawSupported | WebsocketMessageCorrelationSupported | WebsocketSequenceNumberSupported |
WebsocketDeadMansSwitchSupported
formatted := ws.FormatFunctionality()
if !strings.Contains(formatted, WebsocketTickerSupportedText) || !strings.Contains(formatted, WebsocketOrderbookSupportedText) ||
!strings.Contains(formatted, WebsocketKlineSupportedText) || !strings.Contains(formatted, WebsocketTradeDataSupportedText) ||
!strings.Contains(formatted, WebsocketAccountSupportedText) || !strings.Contains(formatted, WebsocketAllowsRequestsText) ||
!strings.Contains(formatted, WebsocketSubscribeSupportedText) || !strings.Contains(formatted, WebsocketUnsubscribeSupportedText) ||
!strings.Contains(formatted, WebsocketAuthenticatedEndpointsSupportedText) || !strings.Contains(formatted, WebsocketAccountDataSupportedText) ||
!strings.Contains(formatted, WebsocketSubmitOrderSupportedText) || !strings.Contains(formatted, WebsocketCancelOrderSupportedText) ||
!strings.Contains(formatted, WebsocketWithdrawSupportedText) || !strings.Contains(formatted, WebsocketMessageCorrelationSupportedText) ||
!strings.Contains(formatted, WebsocketSequenceNumberSupportedText) || !strings.Contains(formatted, WebsocketDeadMansSwitchSupportedText) {
t.Error("Failed to format and include supported websocket features")
}
}
// placeholderSubscriber basic function to test subscriptions
@@ -162,12 +252,32 @@ func TestSubscribe(t *testing.T) {
subscribedChannels: []WebsocketChannelSubscription{},
}
w.SetChannelSubscriber(placeholderSubscriber)
w.subscribeToChannels()
err := w.appendSubscribedChannels()
if err != nil {
t.Error(err)
}
if len(w.subscribedChannels) != 1 {
t.Errorf("Subscription did not occur")
}
}
// TestSubscribe logic test
func TestSubscribeToChannels(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
subscribedChannels: []WebsocketChannelSubscription{},
}
w.SetChannelSubscriber(placeholderSubscriber)
w.SubscribeToChannels([]WebsocketChannelSubscription{{Channel: "hello"}, {Channel: "hello2"}})
if len(w.channelsToSubscribe) != 2 {
t.Errorf("Subscription did not occur")
}
}
// TestUnsubscribe logic test
func TestUnsubscribe(t *testing.T) {
w := Websocket{
@@ -179,7 +289,10 @@ func TestUnsubscribe(t *testing.T) {
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
err := w.unsubscribeToChannels()
if err != nil {
t.Error(err)
}
if len(w.subscribedChannels) != 0 {
t.Errorf("Unsubscription did not occur")
}
@@ -200,7 +313,10 @@ func TestSubscriptionWithExistingEntry(t *testing.T) {
},
}
w.SetChannelSubscriber(placeholderSubscriber)
w.subscribeToChannels()
err := w.appendSubscribedChannels()
if err != nil {
t.Error(err)
}
if len(w.subscribedChannels) != 1 {
t.Errorf("Subscription should not have occurred")
}
@@ -221,7 +337,10 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) {
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
err := w.unsubscribeToChannels()
if err != nil {
t.Error(err)
}
if len(w.subscribedChannels) != 1 {
t.Errorf("Unsubscription should not have occurred")
}
@@ -230,67 +349,49 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) {
// TestManageSubscriptionsStartStop logic test
func TestManageSubscriptionsStartStop(t *testing.T) {
w := Websocket{
ShutdownC: make(chan struct{}, 1),
ShutdownC: make(chan struct{}),
Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported,
}
go w.manageSubscriptions()
time.Sleep(time.Second)
close(w.ShutdownC)
w.Wg.Wait()
}
// TestManageSubscriptionsStartStop logic test
func TestManageSubscriptions(t *testing.T) {
w := Websocket{
ShutdownC: make(chan struct{}),
Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported,
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello",
},
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.SetChannelSubscriber(placeholderSubscriber)
w.setConnectedStatus(true)
go w.manageSubscriptions()
time.Sleep(8 * time.Second)
w.setConnectedStatus(false)
time.Sleep(manageSubscriptionsDelay)
w.subscriptionMutex.Lock()
if len(w.subscribedChannels) > 0 {
t.Error("Expected empty subscribed channels")
}
w.subscriptionMutex.Unlock()
}
// TestConnectionMonitorNoConnection logic test
func TestConnectionMonitorNoConnection(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.exchangeName = "hello"
go w.connectionMonitor()
err := <-w.DataHandler
if !strings.EqualFold(err.(error).Error(),
fmt.Sprintf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)) {
t.Errorf("expecting error 'connectionMonitor: websocket disabled, shutting down', received '%v'", err)
}
}
// TestWsNoConnectionTolerance logic test
func TestWsNoConnectionTolerance(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.noConnectionCheckLimit = 500
w.checkConnection()
if w.noConnectionChecks == 0 {
t.Errorf("Expected noConnectionTolerance to increment, received '%v'", w.noConnectionChecks)
}
}
// TestConnecting logic test
func TestConnecting(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.connecting = true
w.reconnectionLimit = 500
w.checkConnection()
if w.reconnectionChecks != 1 {
t.Errorf("Expected reconnectionLimit to increment, received '%v'", w.reconnectionChecks)
}
}
// TestReconnectionLimit logic test
func TestReconnectionLimit(t *testing.T) {
w := Websocket{}
w.DataHandler = make(chan interface{}, 1)
w.ShutdownC = make(chan struct{}, 1)
w.enabled = true
w.connecting = true
w.reconnectionChecks = 99
w.reconnectionLimit = 1
err := w.checkConnection()
if err == nil {
t.Error("Expected error")
ws := New()
ws.DataHandler = make(chan interface{}, 1)
ws.ShutdownC = make(chan struct{}, 1)
ws.exchangeName = "hello"
ws.trafficTimeout = 1
go ws.connectionMonitor()
if ws.IsConnectionMonitorRunning() {
t.Fatal("Should have exited")
}
}
@@ -360,26 +461,259 @@ func TestSliceCopyDoesntImpactBoth(t *testing.T) {
},
}
w.SetChannelUnsubscriber(placeholderSubscriber)
w.unsubscribeToChannels()
err := w.unsubscribeToChannels()
if err != nil {
t.Error(err)
}
if len(w.subscribedChannels) != 2 {
t.Errorf("Unsubscription did not occur")
}
w.subscribedChannels[0].Channel = "test"
if strings.EqualFold(w.subscribedChannels[0].Channel, w.channelsToSubscribe[0].Channel) {
t.Errorf("Slice has not been copies appropriately")
t.Errorf("Slice has not been copied appropriately")
}
}
// TestSliceCopyDoesntImpactBoth logic test
func TestGetSubscriptions(t *testing.T) {
w := Websocket{
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello3",
},
},
}
subs := w.GetSubscriptions()
subs[0].Channel = "noHELLO"
if strings.EqualFold(w.subscribedChannels[0].Channel, subs[0].Channel) {
t.Error("Subscriptions was not copied properly")
}
}
// TestSetCanUseAuthenticatedEndpoints logic test
func TestSetCanUseAuthenticatedEndpoints(t *testing.T) {
w := Websocket{}
result := w.CanUseAuthenticatedEndpoints()
ws := New()
result := ws.CanUseAuthenticatedEndpoints()
if result {
t.Error("expected `canUseAuthenticatedEndpoints` to be false")
}
w.SetCanUseAuthenticatedEndpoints(true)
result = w.CanUseAuthenticatedEndpoints()
ws.SetCanUseAuthenticatedEndpoints(true)
result = ws.CanUseAuthenticatedEndpoints()
if !result {
t.Error("expected `canUseAuthenticatedEndpoints` to be true")
}
}
func TestRemoveSubscribedChannels(t *testing.T) {
w := Websocket{
channelsToSubscribe: []WebsocketChannelSubscription{
{
Channel: "hello3",
},
},
}
w.RemoveSubscribedChannels([]WebsocketChannelSubscription{{Channel: "hello3"}})
if len(w.channelsToSubscribe) == 1 {
t.Error("Did not remove subscription")
}
}
const (
websocketTestURL = "wss://www.bitmex.com/realtime"
returnResponseURL = "wss://ws.kraken.com"
useProxyTests = false // Disabled by default. Freely available proxy servers that work all the time are difficult to find
proxyURL = "http://212.186.171.4:80" // Replace with a usable proxy server
)
var wc *WebsocketConnection
var dialer websocket.Dialer
type testStruct struct {
Error error
WC WebsocketConnection
}
type testRequest struct {
Event string `json:"event"`
RequestID int64 `json:"reqid,omitempty"`
Pairs []string `json:"pair"`
Subscription testRequestData `json:"subscription,omitempty"`
}
// testRequestData contains details on WS channel
type testRequestData struct {
Name string `json:"name,omitempty"`
Interval int64 `json:"interval,omitempty"`
Depth int64 `json:"depth,omitempty"`
}
type testResponse struct {
RequestID int64 `json:"reqid,omitempty"`
}
// TestMain setup test
func TestMain(m *testing.M) {
wc = &WebsocketConnection{
ExchangeName: "test",
URL: returnResponseURL,
ResponseMaxLimit: 7000000000,
ResponseCheckTimeout: 30000000,
}
os.Exit(m.Run())
}
// TestDial logic test
func TestDial(t *testing.T) {
var testCases = []testStruct{
{Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
{Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
{Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
}
for i := 0; i < len(testCases); i++ {
testData := &testCases[i]
t.Run(testData.WC.ExchangeName, func(t *testing.T) {
if testData.WC.ProxyURL != "" && !useProxyTests {
t.Skip("Proxy testing not enabled, skipping")
}
err := testData.WC.Dial(&dialer, http.Header{})
if err != nil {
if testData.Error != nil && err.Error() == testData.Error.Error() {
return
}
t.Fatal(err)
}
})
}
}
// TestSendMessage logic test
func TestSendMessage(t *testing.T) {
var testCases = []testStruct{
{Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
{Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
{Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}},
}
for i := 0; i < len(testCases); i++ {
testData := &testCases[i]
t.Run(testData.WC.ExchangeName, func(t *testing.T) {
if testData.WC.ProxyURL != "" && !useProxyTests {
t.Skip("Proxy testing not enabled, skipping")
}
err := testData.WC.Dial(&dialer, http.Header{})
if err != nil {
if testData.Error != nil && err.Error() == testData.Error.Error() {
return
}
t.Fatal(err)
}
err = testData.WC.SendMessage("ping")
if err != nil {
t.Error(err)
}
})
}
}
// TestSendMessageWithResponse logic test
func TestSendMessageWithResponse(t *testing.T) {
if wc.ProxyURL != "" && !useProxyTests {
t.Skip("Proxy testing not enabled, skipping")
}
err := wc.Dial(&dialer, http.Header{})
if err != nil {
t.Fatal(err)
}
go readMessages(wc, t)
request := testRequest{
Event: "subscribe",
Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()},
Subscription: testRequestData{
Name: "ticker",
},
RequestID: wc.GenerateMessageID(false),
}
_, err = wc.SendMessageReturnResponse(request.RequestID, request)
if err != nil {
t.Error(err)
}
}
// TestParseBinaryResponse logic test
func TestParseBinaryResponse(t *testing.T) {
var b bytes.Buffer
w := gzip.NewWriter(&b)
_, err := w.Write([]byte("hello"))
if err != nil {
t.Error(err)
}
err = w.Close()
if err != nil {
t.Error(err)
}
var resp []byte
resp, err = wc.parseBinaryResponse(b.Bytes())
if err != nil {
t.Error(err)
}
if !strings.EqualFold(string(resp), "hello") {
t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp))
}
var b2 bytes.Buffer
w2, err2 := flate.NewWriter(&b2, 1)
if err2 != nil {
t.Error(err2)
}
_, err2 = w2.Write([]byte("hello"))
if err2 != nil {
t.Error(err)
}
err2 = w2.Close()
if err2 != nil {
t.Error(err)
}
resp2, err3 := wc.parseBinaryResponse(b2.Bytes())
if err3 != nil {
t.Error(err3)
}
if !strings.EqualFold(string(resp2), "hello") {
t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp2))
}
}
// TestAddResponseWithID logic test
func TestAddResponseWithID(t *testing.T) {
wc.IDResponses = nil
wc.AddResponseWithID(0, []byte("hi"))
wc.AddResponseWithID(1, []byte("hi"))
}
// readMessages helper func
func readMessages(wc *WebsocketConnection, t *testing.T) {
timer := time.NewTimer(20 * time.Second)
for {
select {
case <-timer.C:
return
default:
resp, err := wc.ReadMessage()
if err != nil {
t.Error(err)
return
}
var incoming testResponse
err = common.JSONDecode(resp.Raw, &incoming)
if err != nil {
t.Error(err)
return
}
if incoming.RequestID > 0 {
wc.AddResponseWithID(incoming.RequestID, resp.Raw)
return
}
}
}
}

View File

@@ -48,51 +48,40 @@ const (
WebsocketMessageCorrelationSupportedText = "WEBSOCKET MESSAGE CORRELATION SUPPORTED"
WebsocketSequenceNumberSupportedText = "WEBSOCKET SEQUENCE NUMBER SUPPORTED"
WebsocketDeadMansSwitchSupportedText = "WEBSOCKET DEAD MANS SWITCH SUPPORTED"
// WebsocketNotEnabled alerts of a disabled websocket
WebsocketNotEnabled = "exchange_websocket_not_enabled"
// WebsocketTrafficLimitTime defines a standard time for no traffic from the
// websocket connection
WebsocketTrafficLimitTime = 5 * time.Second
websocketRestablishConnection = time.Second
manageSubscriptionsDelay = 5 * time.Second
WebsocketNotEnabled = "exchange_websocket_not_enabled"
manageSubscriptionsDelay = 5 * time.Second
// connection monitor time delays and limits
connectionMonitorDelay = 2 * time.Second
// WebsocketStateTimeout defines a const for when a websocket connection
// times out, will be handled by the routine management system
WebsocketStateTimeout = "TIMEOUT"
)
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing in routines.go
type Websocket struct {
proxyAddr string
defaultURL string
runningURL string
exchangeName string
enabled bool
init bool
connected bool
connecting bool
verbose bool
connector func() error
m sync.Mutex
subscriptionLock sync.Mutex
connectionMonitorRunning bool
reconnectionLimit int
noConnectionChecks int
reconnectionChecks int
noConnectionCheckLimit int
subscribedChannels []WebsocketChannelSubscription
channelsToSubscribe []WebsocketChannelSubscription
channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error
channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error
// Connected denotes a channel switch for diversion of request flow
Connected chan struct{}
// Disconnected denotes a channel switch for diversion of request flow
Disconnected chan struct{}
// DataHandler pipes websocket data to an exchange websocket data handler
DataHandler chan interface{}
// Functionality defines websocket stream capabilities
Functionality uint32
canUseAuthenticatedEndpoints bool
enabled bool
init bool
connected bool
connecting bool
trafficMonitorRunning bool
verbose bool
connectionMonitorRunning bool
trafficTimeout time.Duration
proxyAddr string
defaultURL string
runningURL string
exchangeName string
m sync.Mutex
subscriptionMutex sync.Mutex
connectionMutex sync.RWMutex
connector func() error
subscribedChannels []WebsocketChannelSubscription
channelsToSubscribe []WebsocketChannelSubscription
channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error
channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error
DataHandler chan interface{}
// ShutdownC is the main shutdown channel which controls all websocket go funcs
ShutdownC chan struct{}
// Orderbook is a local cache of orderbooks
@@ -102,9 +91,21 @@ type Websocket struct {
Wg sync.WaitGroup
// TrafficAlert monitors if there is a halt in traffic throughput
TrafficAlert chan struct{}
// Functionality defines websocket stream capabilities
Functionality uint32
canUseAuthenticatedEndpoints bool
// ReadMessageErrors will received all errors from ws.ReadMessage() and verify if its a disconnection
ReadMessageErrors chan error
}
type WebsocketSetup struct {
Enabled bool
Verbose bool
AuthenticatedWebsocketAPISupport bool
WebsocketTimeout time.Duration
DefaultURL string
ExchangeName string
RunningURL string
Connector func() error
Subscriber func(channelToSubscribe WebsocketChannelSubscription) error
UnSubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error
}
// WebsocketChannelSubscription container for websocket subscriptions
@@ -187,16 +188,19 @@ type WebsocketPositionUpdated struct {
// WebsocketConnection contains all the data needed to send a message to a WS
type WebsocketConnection struct {
sync.Mutex
Verbose bool
RateLimit float64
ExchangeName string
URL string
ProxyURL string
Wg sync.WaitGroup
Connection *websocket.Conn
Shutdown chan struct{}
Verbose bool
connected bool
connectionMutex sync.RWMutex
RateLimit float64
ExchangeName string
URL string
ProxyURL string
Wg sync.WaitGroup
Connection *websocket.Conn
Shutdown chan struct{}
// These are the request IDs and the corresponding response JSON
IDResponses map[int64][]byte
ResponseCheckTimeout time.Duration
ResponseMaxLimit time.Duration
TrafficTimeout time.Duration
}

View File

@@ -201,7 +201,7 @@ func (w *WebsocketOrderbookLocal) updateByIDAndAction(orderbookUpdate *Websocket
// LoadSnapshot loads initial snapshot of ob data, overwrite allows full
// ob to be completely rewritten because the exchange is a doing a full
// update not an incremental one
func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, overwrite bool) error {
func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base) error {
if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 {
return fmt.Errorf("%v snapshot ask and bids are nil", w.exchangeName)
}
@@ -216,11 +216,8 @@ func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, ove
if w.ob[newOrderbook.Pair][newOrderbook.AssetType] != nil &&
(len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Asks) > 0 ||
len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Bids) > 0) {
if overwrite {
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
return newOrderbook.Process()
}
return fmt.Errorf("%v snapshot instance already found", w.exchangeName)
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
return newOrderbook.Process()
}
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
return newOrderbook.Process()

View File

@@ -38,7 +38,7 @@ func createSnapshot() (obl *WebsocketOrderbookLocal, curr currency.Pair, asks, b
snapShot1.AssetType = asset.Spot
snapShot1.Pair = curr
obl = &WebsocketOrderbookLocal{}
err = obl.LoadSnapshot(&snapShot1, false)
err = obl.LoadSnapshot(&snapShot1)
return
}
@@ -382,8 +382,7 @@ func TestRunSnapshotWithNoData(t *testing.T) {
snapShot1.Pair = curr
snapShot1.ExchangeName = "test"
obl.exchangeName = "test"
err := obl.LoadSnapshot(&snapShot1,
false)
err := obl.LoadSnapshot(&snapShot1)
if err == nil {
t.Fatal("expected an error loading a snapshot")
}
@@ -392,8 +391,8 @@ func TestRunSnapshotWithNoData(t *testing.T) {
}
}
// TestLoadSnapshotWithOverride logic test
func TestLoadSnapshotWithOverride(t *testing.T) {
// TestLoadSnapshot logic test
func TestLoadSnapshot(t *testing.T) {
var obl WebsocketOrderbookLocal
var snapShot1 orderbook.Base
curr := currency.NewPairFromString("BTCUSD")
@@ -407,21 +406,13 @@ func TestLoadSnapshotWithOverride(t *testing.T) {
snapShot1.Bids = bids
snapShot1.AssetType = asset.Spot
snapShot1.Pair = curr
err := obl.LoadSnapshot(&snapShot1, false)
if err != nil {
t.Error(err)
}
err = obl.LoadSnapshot(&snapShot1, false)
if err == nil {
t.Error("expected error: 'snapshot instance already found'")
}
err = obl.LoadSnapshot(&snapShot1, true)
err := obl.LoadSnapshot(&snapShot1)
if err != nil {
t.Error(err)
}
}
// TestInsertWithIDs logic test
// TestFlushCache logic test
func TestFlushCache(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
@@ -473,7 +464,7 @@ func TestInsertingSnapShots(t *testing.T) {
snapShot1.Bids = bids
snapShot1.AssetType = asset.Spot
snapShot1.Pair = currency.NewPairFromString("BTCUSD")
err := obl.LoadSnapshot(&snapShot1, false)
err := obl.LoadSnapshot(&snapShot1)
if err != nil {
t.Fatal(err)
}
@@ -510,7 +501,7 @@ func TestInsertingSnapShots(t *testing.T) {
snapShot2.Bids = bids
snapShot2.AssetType = asset.Spot
snapShot2.Pair = currency.NewPairFromString("LTCUSD")
err = obl.LoadSnapshot(&snapShot2, false)
err = obl.LoadSnapshot(&snapShot2)
if err != nil {
t.Fatal(err)
}
@@ -547,7 +538,7 @@ func TestInsertingSnapShots(t *testing.T) {
snapShot3.Bids = bids
snapShot3.AssetType = "FUTURES"
snapShot3.Pair = currency.NewPairFromString("LTCUSD")
err = obl.LoadSnapshot(&snapShot3, false)
err = obl.LoadSnapshot(&snapShot3)
if err != nil {
t.Fatal(err)
}

View File

@@ -58,7 +58,7 @@ func (z *ZB) WsHandleData() {
default:
resp, err := z.WebsocketConn.ReadMessage()
if err != nil {
z.Websocket.DataHandler <- err
z.Websocket.ReadMessageErrors <- err
return
}
z.Websocket.TrafficAlert <- struct{}{}
@@ -143,8 +143,7 @@ func (z *ZB) WsHandleData() {
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = cPair
err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
true)
err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
z.Websocket.DataHandler <- err
continue

View File

@@ -117,15 +117,18 @@ func (z *ZB) Setup(exch *config.ExchangeConfig) error {
return err
}
err = z.Websocket.Setup(z.WsConnect,
z.Subscribe,
nil,
exch.Name,
exch.Features.Enabled.Websocket,
exch.Verbose,
zbWebsocketAPI,
exch.API.Endpoints.WebsocketURL,
exch.API.AuthenticatedWebsocketSupport)
err = z.Websocket.Setup(
&wshandler.WebsocketSetup{
Enabled: exch.Features.Enabled.Websocket,
Verbose: exch.Verbose,
AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport,
WebsocketTimeout: exch.WebsocketTrafficTimeout,
DefaultURL: zbWebsocketAPI,
ExchangeName: exch.Name,
RunningURL: exch.API.Endpoints.WebsocketURL,
Connector: z.WsConnect,
Subscriber: z.Subscribe,
})
if err != nil {
return err
}