engine: Adds shutdown method to exchange manager and unload all exchanges when engine is stopped (#1112)

* engine: shutdown and unload exchange when engine is stopped

* linter: fixes

* engine/exchMan: add nil check

* engine/exchanges: add shutdown method to exchanges, rm len check lock not needed, expanded code coverage, address some nits

* exchMan: report all failed shutdowns across exchanges, implement timer and monitoring routines.

* exchMan: improve shutdown sequence and aloc.

* further improvement

* exchman: log from warn to error

* websockconnection: Suppress error return when closure is caused by library

* linter: fix

* fix racies

* add note on why not parallel tests

* glorious: nits

* spelling kween

* thrasher: nits

* engine: change print of setting using reflection, I keep forgetting to implement this so program around forgetfulness

* engine/exchange_management: remove wait group and just rely on intermediary lock

* glorious: nits

* Update common/common.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* Update main.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2023-04-05 13:07:35 +10:00
committed by GitHub
parent 4a50a72e4a
commit d23898e63a
35 changed files with 803 additions and 356 deletions

View File

@@ -100,9 +100,10 @@ func (bi *Binanceus) KeepAuthKeyAlive() {
// ClosUserDataStream closes the User data stream and remove the listen key when closing the websocket.
defer func() {
er := bi.CloseUserDataStream(context.Background())
log.Errorf(log.WebsocketMgr,
"%s closing user data stream error %v",
bi.Name, er)
if er != nil {
log.Errorf(log.WebsocketMgr, "%s closing user data stream error %v",
bi.Name, er)
}
}()
// Looping in 30 Minutes and updating the listenKey
ticks := time.NewTicker(time.Minute * 30)

View File

@@ -1578,3 +1578,15 @@ func (b *Base) GetKlineExtendedRequest(pair currency.Pair, a asset.Item, interva
return &kline.ExtendedRequest{Request: r, RangeHolder: dates}, nil
}
// Shutdown closes active websocket connections if available and then cleans up
// a REST requester instance.
func (b *Base) Shutdown() error {
if b.Websocket != nil {
err := b.Websocket.Shutdown()
if err != nil && !errors.Is(err, stream.ErrNotConnected) {
return err
}
}
return b.Requester.Shutdown()
}

View File

@@ -27,6 +27,7 @@ type IBotExchange interface {
Setup(exch *config.Exchange) error
Start(ctx context.Context, wg *sync.WaitGroup) error
SetDefaults()
Shutdown() error
GetName() string
SetEnabled(bool)
FetchTicker(ctx context.Context, p currency.Pair, a asset.Item) (*ticker.Price, error)

View File

@@ -26,6 +26,8 @@ const (
var (
// ErrSubscriptionFailure defines an error when a subscription fails
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrNotConnected defines an error when websocket is not connected
ErrNotConnected = errors.New("websocket is not connected")
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
@@ -423,10 +425,12 @@ func (w *Websocket) Shutdown() error {
defer w.m.Unlock()
if !w.IsConnected() {
return fmt.Errorf("%v websocket: cannot shutdown a disconnected websocket",
w.exchangeName)
return fmt.Errorf("%v websocket: cannot shutdown %w",
w.exchangeName,
ErrNotConnected)
}
// TODO: Interrupt connection and or close connection when it is re-established.
if w.IsConnecting() {
return fmt.Errorf("%v websocket: cannot shutdown, in the process of reconnection",
w.exchangeName)

View File

@@ -198,12 +198,13 @@ func (w *WebsocketConnection) SetupPingHandler(handler PingHandler) {
}()
}
func (w *WebsocketConnection) setConnectedStatus(b bool) {
// setConnectedStatus sets connection status if changed it will return true.
// TODO: Swap out these atomic switches and opt for sync.RWMutex.
func (w *WebsocketConnection) setConnectedStatus(b bool) bool {
if b {
atomic.StoreInt32(&w.connected, 1)
return
return atomic.SwapInt32(&w.connected, 1) == 0
}
atomic.StoreInt32(&w.connected, 0)
return atomic.SwapInt32(&w.connected, 0) == 1
}
// IsConnected exposes websocket connection status
@@ -216,16 +217,22 @@ func (w *WebsocketConnection) ReadMessage() Response {
mType, resp, err := w.Connection.ReadMessage()
if err != nil {
if isDisconnectionError(err) {
w.setConnectedStatus(false)
select {
case w.readMessageErrors <- err:
default:
// bypass if there is no receiver, as this stops it returning
// when shutdown is called.
log.Warnf(log.WebsocketMgr,
"%s failed to relay error: %v",
w.ExchangeName,
err)
if w.setConnectedStatus(false) {
// NOTE: When w.setConnectedStatus() returns true the underlying
// state was changed and this infers that the connection was
// externally closed and an error is reported else Shutdown()
// method on WebsocketConnection type has been called and can
// be skipped.
select {
case w.readMessageErrors <- err:
default:
// bypass if there is no receiver, as this stops it returning
// when shutdown is called.
log.Warnf(log.WebsocketMgr,
"%s failed to relay error: %v",
w.ExchangeName,
err)
}
}
}
return Response{}
@@ -315,6 +322,7 @@ func (w *WebsocketConnection) Shutdown() error {
if w == nil || w.Connection == nil {
return nil
}
w.setConnectedStatus(false)
return w.Connection.UnderlyingConn().Close()
}