diff --git a/engine/syncer.go b/engine/syncer.go index dc625dca..7bedca47 100644 --- a/engine/syncer.go +++ b/engine/syncer.go @@ -375,7 +375,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { continue } if switchedToRest && usingWebsocket { - log.Infof(log.SyncMgr, + log.Warnf(log.SyncMgr, "%s %s: Websocket re-enabled, switching from rest to websocket\n", c.Exchange, FormatCurrency(enabledPairs[i]).String()) switchedToRest = false diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 151e77f9..db2e4078 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -31,7 +31,7 @@ var listenKey string var ( // maxWSUpdateBuffer defines max websocket updates to apply when an // orderbook is initially fetched - maxWSUpdateBuffer = 100 + maxWSUpdateBuffer = 150 // maxWSOrderbookJobs defines max websocket orderbook jobs in queue to fetch // an orderbook snapshot via REST maxWSOrderbookJobs = 2000 @@ -47,6 +47,7 @@ func (b *Binance) WsConnect() error { } var dialer websocket.Dialer + dialer.HandshakeTimeout = b.Config.HTTPTimeout var err error if b.Websocket.CanUseAuthenticatedEndpoints() { listenKey, err = b.GetWsAuthStreamKey() @@ -84,28 +85,9 @@ func (b *Binance) WsConnect() error { Delay: pingDelay, }) - go b.orderBookProcess() - return nil -} - -// orderBookProcess prepare orderbook handling -func (b *Binance) orderBookProcess() { - enabledPairs, err := b.GetEnabledPairs(asset.Spot) - if err != nil { - log.Errorf(log.ExchangeSys, "%s orderBookProcess, GetEnabledPairs error: %s", b.Name, err) - return - } - - for i := range enabledPairs { - err = b.SeedLocalCache(enabledPairs[i]) - if err != nil { - log.Errorf(log.ExchangeSys, "%s orderBookProcess, SeedLocalCache error: %s", b.Name, err) - return - } - } - go b.wsReadData() b.setupOrderbookManager() + return nil } func (b *Binance) setupOrderbookManager() { @@ -400,8 +382,11 @@ func (b *Binance) wsHandleData(respRaw []byte) error { err) } - err = b.UpdateLocalBuffer(&depth) + init, err := b.UpdateLocalBuffer(&depth) if err != nil { + if init { + return nil + } return fmt.Errorf("%v - UpdateLocalCache error: %s", b.Name, err) @@ -472,51 +457,39 @@ func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBoo } // UpdateLocalBuffer updates and returns the most recent iteration of the orderbook -func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) error { +func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) { enabledPairs, err := b.GetEnabledPairs(asset.Spot) if err != nil { - return err + return false, err } format, err := b.GetPairFormat(asset.Spot, true) if err != nil { - return err + return false, err } currencyPair, err := currency.NewPairFromFormattedPairs(wsdp.Pair, enabledPairs, format) if err != nil { - return err + return false, err } err = b.obm.stageWsUpdate(wsdp, currencyPair, asset.Spot) if err != nil { - return err + init, err2 := b.obm.checkIsInitialSync(currencyPair) + if err2 != nil { + return false, err2 + } + return init, err } err = b.applyBufferUpdate(currencyPair) if err != nil { - cleanupErr := b.Websocket.Orderbook.FlushOrderbook(currencyPair, asset.Spot) - if cleanupErr != nil { - log.Errorf(log.WebsocketMgr, - "%s flushing websocket error: %v", - b.Name, - cleanupErr) - } - - cleanupErr = b.obm.cleanup(currencyPair) - if cleanupErr != nil { - log.Errorf(log.WebsocketMgr, - "%s cleanup websocket orderbook error: %v", - b.Name, - cleanupErr) - } - - return err + b.flushAndCleanup(currencyPair) } - return nil + return false, err } // GenerateSubscriptions generates the default subscription set @@ -645,7 +618,7 @@ func (b *Binance) applyBufferUpdate(pair currency.Pair) error { } recent := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot) - if recent == nil { + if recent == nil || (recent.Asks == nil && recent.Bids == nil) { return b.obm.fetchBookViaREST(pair) } @@ -691,24 +664,29 @@ func (b *Binance) processJob(p currency.Pair) error { // new update to initiate this. err = b.applyBufferUpdate(p) if err != nil { - errClean := b.Websocket.Orderbook.FlushOrderbook(p, asset.Spot) - if errClean != nil { - log.Errorf(log.WebsocketMgr, - "%s flushing websocket error: %v", - b.Name, - errClean) - } - errClean = b.obm.cleanup(p) - if errClean != nil { - log.Errorf(log.WebsocketMgr, "%s cleanup websocket error: %v", - b.Name, - errClean) - } + b.flushAndCleanup(p) return err } return nil } +// flushAndCleanup flushes orderbook and clean local cache +func (b *Binance) flushAndCleanup(p currency.Pair) { + errClean := b.Websocket.Orderbook.FlushOrderbook(p, asset.Spot) + if errClean != nil { + log.Errorf(log.WebsocketMgr, + "%s flushing websocket error: %v", + b.Name, + errClean) + } + errClean = b.obm.cleanup(p) + if errClean != nil { + log.Errorf(log.WebsocketMgr, "%s cleanup websocket error: %v", + b.Name, + errClean) + } +} + // stageWsUpdate stages websocket update to roll through updates that need to // be applied to a fetched orderbook via REST. func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency.Pair, a asset.Item) error { @@ -743,6 +721,8 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency. case state.buffer <- u: return nil default: + <-state.buffer // pop one element + state.buffer <- u // to shift buffer on fail return fmt.Errorf("channel blockage for %s, asset %s and connection", pair, a) } @@ -801,6 +781,21 @@ func (o *orderbookManager) completeInitialSync(pair currency.Pair) error { return nil } +// checkIsInitialSync checks status if the book is Initial Sync being via the REST +// protocol. +func (o *orderbookManager) checkIsInitialSync(pair currency.Pair) (bool, error) { + o.Lock() + defer o.Unlock() + state, ok := o.state[pair.Base][pair.Quote][asset.Spot] + if !ok { + return false, + fmt.Errorf("checkIsInitialSync of orderbook cannot match currency pair %s asset type %s", + pair, + asset.Spot) + } + return state.initialSync, nil +} + // fetchBookViaREST pushes a job of fetching the orderbook via the REST protocol // to get an initial full book that we can apply our buffered updates too. func (o *orderbookManager) fetchBookViaREST(pair currency.Pair) error { @@ -871,7 +866,7 @@ func (u *update) validate(updt *WebsocketDepthStream, recent *orderbook.Base) (b if u.initialSync { // The first processed event should have U <= lastUpdateId+1 AND // u >= lastUpdateId+1. - if updt.FirstUpdateID > id && updt.LastUpdateID < id { + if updt.FirstUpdateID > id || updt.LastUpdateID < id { return false, fmt.Errorf("initial websocket orderbook sync failure for pair %s and asset %s", recent.Pair, asset.Spot) diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 9d55549d..8241bf72 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -558,11 +558,11 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error { if exch.HTTPTimeout <= time.Duration(0) { exch.HTTPTimeout = DefaultHTTPTimeout - } else { - err := e.SetHTTPClientTimeout(exch.HTTPTimeout) - if err != nil { - return err - } + } + + err := e.SetHTTPClientTimeout(exch.HTTPTimeout) + if err != nil { + return err } if exch.CurrencyPairs == nil { @@ -573,7 +573,7 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error { e.SetHTTPClientUserAgent(exch.HTTPUserAgent) e.SetCurrencyPairFormat() - err := e.SetConfigPairs() + err = e.SetConfigPairs() if err != nil { return err } diff --git a/exchanges/request/request.go b/exchanges/request/request.go index ba250548..295b6ff2 100644 --- a/exchanges/request/request.go +++ b/exchanges/request/request.go @@ -174,7 +174,7 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error { delay = after } - if d, ok := req.Context().Deadline(); ok && d.After(time.Now().Add(delay)) { + if d, ok := req.Context().Deadline(); ok && d.After(time.Now()) && time.Now().Add(delay).After(d) { if err != nil { return fmt.Errorf("request.go error - deadline would be exceeded by retry, err: %v", err) } diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index 14a1cae7..d275ee6c 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -378,6 +378,7 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { m3 = &orderbookHolder{ob: book, buffer: &[]Update{}} m2[book.AssetType] = m3 } else { + m3.ob.LastUpdateID = book.LastUpdateID m3.ob.Bids = book.Bids m3.ob.Asks = book.Asks }