Websocket: Various refactors and test improvements (#1466)

* Websocket: Remove IsInit and simplify SetProxyAddress

IsInit was basically the same as IsConnected.
Any time Connect was called both would be set to true.
Any time we had a disconnect they'd both be set to false
Shutdown() incorrectly didn't setInit(false)

SetProxyAddress simplified to only reconnect a connected Websocket.
Any other state means it hasn't been Connected, or it's about to
reconnect anyway.
There's no handling for IsConnecting previously, either, so I've wrapped
that behind the main mutex.

* Websocket: Expand and Assertify tests

* Websocket: Simplify state transistions

* Websocket: Simplify Connecting/Connected state

* Websocket: Tests and errors for websocket

* Websocket: Make WebsocketNotEnabled a real error

This allows for testing and avoids the repetition.
If each returned error is a error.New() you can never use errors.Is()

* Websocket: Add more testable errors

* Websocket: Improve GenerateMessageID test

Testing just the last id doesn't feel very robust

* Websocket: Protect Setup() from races

* Websocket: Use atomics instead of mutex

This was spurred by looking at the setState call in trafficMonitor and
the effect on blocking and efficiency.
With the new atomic types in Go 1.19, and the small types in use here,
atomics should be safe for our usage. bools should be truly atomic,
and uint32 is atomic when the accepted value range is less than one byte/uint8 since
that can be written atomicly by concurrent processors.
Maybe that's not even a factor any more, however we don't even have to worry enough to check.

* Websocket: Fix and simplify traffic monitor

trafficMonitor had a check throttle at the end of the for loop to stop it just gobbling the (blocking) trafficAlert channel non-stop.
That makes sense, except that nothing is sent to the trafficAlert channel if there's no listener.
So that means that it's out by one second on the trafficAlert, because any traffic received during the pause is doesn't try to send a traffic alert.

The unstopped timer is deliberately leaked for later GC when shutdown.
It won't delay/block anything, and it's a trivial memory leak during an infrequent event.

Deliberately Choosing to recreate the timer each time instead of using Stop, drain and reset

* Websocket: Split traficMonitor test on behaviours

* Websocket: Remove trafficMonitor connected status

trafficMonitor does not need to set the connection to be connected.
Connect() does that. Anything after that should result in a full
shutdown and restart. It can't and shouldn't become connected
unexpectedly, and this is most likely a race anyway.

Also dropped trafficCheckInterval to 100ms to mitigate races of traffic
alerts being buffered for too long.

* Websocket: Set disconnected earlier in Shutdown

This caused a possible race where state is still connected, but we start
to trigger interested actors via ShutdownC and Wait.
They may check state and then call Shutdown again, such as
trafficMonitor

* Websocket: Wait 5s for slow tests to pass traffic draining

Keep getting failures upstream on test rigs.
Think they can be very contended, so this pushes the boundary right out
to 5s
This commit is contained in:
Gareth Kirwan
2024-02-23 08:39:25 +01:00
committed by GitHub
parent 40193bb8c3
commit 52c6b3bf0b
66 changed files with 574 additions and 862 deletions

View File

@@ -16,47 +16,43 @@ import (
)
const (
defaultJobBuffer = 5000
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
// as there are periods with large incoming traffic alerts which requires a
// timer reset, this limits work on this routine to a more effective rate
// of check.
defaultTrafficPeriod = time.Second
jobBuffer = 5000
)
// Public websocket errors
var (
// ErrSubscriptionNotFound defines an error when a subscription is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrSubscribedAlready defines an error when a channel is already subscribed
ErrSubscribedAlready = errors.New("duplicate subscription")
// ErrSubscriptionFailure defines an error when a subscription fails
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrSubscriptionNotSupported defines an error when a subscription channel is not supported by an exchange
ErrWebsocketNotEnabled = errors.New("websocket not enabled")
ErrSubscriptionNotFound = errors.New("subscription not found")
ErrSubscribedAlready = errors.New("duplicate subscription")
ErrSubscriptionFailure = errors.New("subscription failure")
ErrSubscriptionNotSupported = errors.New("subscription channel not supported ")
// ErrUnsubscribeFailure defines an error when a unsubscribe fails
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
// ErrChannelInStateAlready defines an error when a subscription channel is already in a new state
ErrChannelInStateAlready = errors.New("channel already in state")
// ErrAlreadyDisabled is returned when you double-disable the websocket
ErrAlreadyDisabled = errors.New("websocket already disabled")
// ErrNotConnected defines an error when websocket is not connected
ErrNotConnected = errors.New("websocket is not connected")
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
ErrChannelInStateAlready = errors.New("channel already in state")
ErrAlreadyDisabled = errors.New("websocket already disabled")
ErrNotConnected = errors.New("websocket is not connected")
)
// Private websocket errors
var (
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
errExchangeConfigEmpty = errors.New("exchange config is empty")
errWebsocketIsNil = errors.New("websocket is nil")
errWebsocketSetupIsNil = errors.New("websocket setup is nil")
errWebsocketAlreadyInitialised = errors.New("websocket already initialised")
errWebsocketAlreadyEnabled = errors.New("websocket already enabled")
errWebsocketFeaturesIsUnset = errors.New("websocket features is unset")
errConfigFeaturesIsNil = errors.New("exchange config features is nil")
errDefaultURLIsEmpty = errors.New("default url is empty")
errRunningURLIsEmpty = errors.New("running url cannot be empty")
errInvalidWebsocketURL = errors.New("invalid websocket url")
errExchangeConfigNameUnset = errors.New("exchange config name unset")
errExchangeConfigNameEmpty = errors.New("exchange config name empty")
errInvalidTrafficTimeout = errors.New("invalid traffic timeout")
errTrafficAlertNil = errors.New("traffic alert is nil")
errWebsocketSubscriberUnset = errors.New("websocket subscriber function needs to be set")
errWebsocketUnsubscriberUnset = errors.New("websocket unsubscriber functionality allowed but unsubscriber function not set")
errWebsocketConnectorUnset = errors.New("websocket connector function not set")
errReadMessageErrorsNil = errors.New("read message errors is nil")
errWebsocketSubscriptionsGeneratorUnset = errors.New("websocket subscriptions generator function needs to be set")
errClosedConnection = errors.New("use of closed network connection")
errSubscriptionsExceedsLimit = errors.New("subscriptions exceeds limit")
@@ -64,9 +60,18 @@ var (
errNoSubscriptionsSupplied = errors.New("no subscriptions supplied")
errChannelAlreadySubscribed = errors.New("channel already subscribed")
errInvalidChannelState = errors.New("invalid Channel state")
errSameProxyAddress = errors.New("cannot set proxy address to the same address")
errNoConnectFunc = errors.New("websocket connect func not set")
errAlreadyConnected = errors.New("websocket already connected")
errCannotShutdown = errors.New("websocket cannot shutdown")
errAlreadyReconnecting = errors.New("websocket in the process of reconnection")
errConnSetup = errors.New("error in connection setup")
)
var globalReporter Reporter
var (
globalReporter Reporter
trafficCheckInterval = 100 * time.Millisecond
)
// SetupGlobalReporter sets a reporter interface to be used
// for all exchange requests
@@ -74,13 +79,12 @@ func SetupGlobalReporter(r Reporter) {
globalReporter = r
}
// New initialises the websocket struct
func New() *Websocket {
// NewWebsocket initialises the websocket struct
func NewWebsocket() *Websocket {
return &Websocket{
Init: true,
DataHandler: make(chan interface{}, defaultJobBuffer),
ToRoutine: make(chan interface{}, defaultJobBuffer),
TrafficAlert: make(chan struct{}),
DataHandler: make(chan interface{}, jobBuffer),
ToRoutine: make(chan interface{}, jobBuffer),
TrafficAlert: make(chan struct{}, 1),
ReadMessageErrors: make(chan error),
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
@@ -98,7 +102,10 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
return errWebsocketSetupIsNil
}
if !w.Init {
w.m.Lock()
defer w.m.Unlock()
if w.IsInitialised() {
return fmt.Errorf("%s %w", w.exchangeName, errWebsocketAlreadyInitialised)
}
@@ -107,7 +114,7 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
}
if s.ExchangeConfig.Name == "" {
return errExchangeConfigNameUnset
return errExchangeConfigNameEmpty
}
w.exchangeName = s.ExchangeConfig.Name
w.verbose = s.ExchangeConfig.Verbose
@@ -120,7 +127,7 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
if s.ExchangeConfig.Features == nil {
return fmt.Errorf("%s %w", w.exchangeName, errConfigFeaturesIsNil)
}
w.enabled = s.ExchangeConfig.Features.Enabled.Websocket
w.setEnabled(s.ExchangeConfig.Features.Enabled.Websocket)
if s.Connector == nil {
return fmt.Errorf("%s %w", w.exchangeName, errWebsocketConnectorUnset)
@@ -188,28 +195,30 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
return fmt.Errorf("%s %w", w.exchangeName, errInvalidMaxSubscriptions)
}
w.MaxSubscriptionsPerConnection = s.MaxWebsocketSubscriptionsPerConnection
w.setState(disconnected)
return nil
}
// SetupNewConnection sets up an auth or unauth streaming connection
func (w *Websocket) SetupNewConnection(c ConnectionSetup) error {
if w == nil {
return errors.New("setting up new connection error: websocket is nil")
return fmt.Errorf("%w: %w", errConnSetup, errWebsocketIsNil)
}
if c == (ConnectionSetup{}) {
return errors.New("setting up new connection error: websocket connection configuration empty")
return fmt.Errorf("%w: %w", errConnSetup, errExchangeConfigEmpty)
}
if w.exchangeName == "" {
return errors.New("setting up new connection error: exchange name not set, please call setup first")
return fmt.Errorf("%w: %w", errConnSetup, errExchangeConfigNameEmpty)
}
if w.TrafficAlert == nil {
return errors.New("setting up new connection error: traffic alert is nil, please call setup first")
return fmt.Errorf("%w: %w", errConnSetup, errTrafficAlertNil)
}
if w.ReadMessageErrors == nil {
return errors.New("setting up new connection error: read message errors is nil, please call setup first")
return fmt.Errorf("%w: %w", errConnSetup, errReadMessageErrorsNil)
}
connectionURL := w.GetWebsocketURL()
@@ -253,21 +262,19 @@ func (w *Websocket) SetupNewConnection(c ConnectionSetup) error {
// function
func (w *Websocket) Connect() error {
if w.connector == nil {
return errors.New("websocket connect function not set, cannot continue")
return errNoConnectFunc
}
w.m.Lock()
defer w.m.Unlock()
if !w.IsEnabled() {
return errors.New(WebsocketNotEnabled)
return ErrWebsocketNotEnabled
}
if w.IsConnecting() {
return fmt.Errorf("%v Websocket already attempting to connect",
w.exchangeName)
return fmt.Errorf("%v %w", w.exchangeName, errAlreadyReconnecting)
}
if w.IsConnected() {
return fmt.Errorf("%v Websocket already connected",
w.exchangeName)
return fmt.Errorf("%v %w", w.exchangeName, errAlreadyConnected)
}
w.subscriptionMutex.Lock()
@@ -276,25 +283,19 @@ func (w *Websocket) Connect() error {
w.dataMonitor()
w.trafficMonitor()
w.setConnectingStatus(true)
w.setState(connecting)
err := w.connector()
if err != nil {
w.setConnectingStatus(false)
return fmt.Errorf("%v Error connecting %s",
w.exchangeName, err)
w.setState(disconnected)
return fmt.Errorf("%v Error connecting %w", w.exchangeName, err)
}
w.setConnectedStatus(true)
w.setConnectingStatus(false)
w.setInit(true)
w.setState(connected)
if !w.IsConnectionMonitorRunning() {
err = w.connectionMonitor()
if err != nil {
log.Errorf(log.WebsocketMgr,
"%s cannot start websocket connection monitor %v",
w.GetName(),
err)
log.Errorf(log.WebsocketMgr, "%s cannot start websocket connection monitor %v", w.GetName(), err)
}
}
@@ -317,9 +318,10 @@ func (w *Websocket) Connect() error {
}
// Disable disables the exchange websocket protocol
// Note that connectionMonitor will be responsible for shutting down the websocket after disabling
func (w *Websocket) Disable() error {
if !w.IsEnabled() {
return fmt.Errorf("%w for exchange '%s'", ErrAlreadyDisabled, w.exchangeName)
return fmt.Errorf("%s %w", w.exchangeName, ErrAlreadyDisabled)
}
w.setEnabled(false)
@@ -329,8 +331,7 @@ func (w *Websocket) Disable() error {
// Enable enables the exchange websocket protocol
func (w *Websocket) Enable() error {
if w.IsConnected() || w.IsEnabled() {
return fmt.Errorf("websocket is already enabled for exchange %s",
w.exchangeName)
return fmt.Errorf("%s %w", w.exchangeName, errWebsocketAlreadyEnabled)
}
w.setEnabled(true)
@@ -369,9 +370,7 @@ func (w *Websocket) dataMonitor() {
case <-w.ShutdownC:
return
default:
log.Warnf(log.WebsocketMgr,
"%s exchange backlog in websocket processing detected",
w.exchangeName)
log.Warnf(log.WebsocketMgr, "%s exchange backlog in websocket processing detected", w.exchangeName)
select {
case w.ToRoutine <- d:
case <-w.ShutdownC:
@@ -388,34 +387,25 @@ func (w *Websocket) connectionMonitor() error {
if w.checkAndSetMonitorRunning() {
return errAlreadyRunning
}
w.fieldMutex.RLock()
delay := w.connectionMonitorDelay
w.fieldMutex.RUnlock()
go func() {
timer := time.NewTimer(delay)
for {
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: running connection monitor cycle\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: running connection monitor cycle", w.exchangeName)
}
if !w.IsEnabled() {
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: connectionMonitor - websocket disabled, shutting down\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: connectionMonitor - websocket disabled, shutting down", w.exchangeName)
}
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
if err := w.Shutdown(); err != nil {
log.Errorln(log.WebsocketMgr, err)
}
}
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: connection monitor exiting\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: connection monitor exiting", w.exchangeName)
}
timer.Stop()
w.setConnectionMonitorRunning(false)
@@ -424,11 +414,8 @@ func (w *Websocket) connectionMonitor() error {
select {
case err := <-w.ReadMessageErrors:
if IsDisconnectionError(err) {
w.setInit(false)
log.Warnf(log.WebsocketMgr,
"%v websocket has been disconnected. Reason: %v",
w.exchangeName, err)
w.setConnectedStatus(false)
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
w.setState(disconnected)
}
w.DataHandler <- err
@@ -459,21 +446,16 @@ func (w *Websocket) Shutdown() error {
defer w.m.Unlock()
if !w.IsConnected() {
return fmt.Errorf("%v websocket: cannot shutdown %w",
w.exchangeName,
ErrNotConnected)
return fmt.Errorf("%v %w: %w", w.exchangeName, errCannotShutdown, ErrNotConnected)
}
// TODO: Interrupt connection and or close connection when it is re-established.
if w.IsConnecting() {
return fmt.Errorf("%v websocket: cannot shutdown, in the process of reconnection",
w.exchangeName)
return fmt.Errorf("%v %w: %w ", w.exchangeName, errCannotShutdown, errAlreadyReconnecting)
}
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: shutting down websocket\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: shutting down websocket", w.exchangeName)
}
defer w.Orderbook.FlushBuffer()
@@ -495,15 +477,13 @@ func (w *Websocket) Shutdown() error {
w.subscriptions = subscriptionMap{}
w.subscriptionMutex.Unlock()
w.setState(disconnected)
close(w.ShutdownC)
w.Wg.Wait()
w.ShutdownC = make(chan struct{})
w.setConnectedStatus(false)
w.setConnectingStatus(false)
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: completed websocket shutdown\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: completed websocket shutdown", w.exchangeName)
}
return nil
}
@@ -511,11 +491,11 @@ func (w *Websocket) Shutdown() error {
// FlushChannels flushes channel subscriptions when there is a pair/asset change
func (w *Websocket) FlushChannels() error {
if !w.IsEnabled() {
return fmt.Errorf("%s websocket: service not enabled", w.exchangeName)
return fmt.Errorf("%s %w", w.exchangeName, ErrWebsocketNotEnabled)
}
if !w.IsConnected() {
return fmt.Errorf("%s websocket: service not connected", w.exchangeName)
return fmt.Errorf("%s %w", w.exchangeName, ErrNotConnected)
}
if w.features.Subscribe {
@@ -565,9 +545,9 @@ func (w *Websocket) FlushChannels() error {
return w.Connect()
}
// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires,
// it will reconnect if the TrafficAlert channel has not received any data. The
// trafficTimer will reset on each traffic alert
// trafficMonitor waits trafficCheckInterval before checking for a trafficAlert
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
@@ -576,183 +556,121 @@ func (w *Websocket) trafficMonitor() {
w.Wg.Add(1)
go func() {
var trafficTimer = time.NewTimer(w.trafficTimeout)
pause := make(chan struct{})
t := time.NewTimer(w.trafficTimeout)
for {
select {
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr,
"%v websocket: trafficMonitor shutdown message received\n",
w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName)
}
trafficTimer.Stop()
t.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-w.TrafficAlert:
if !trafficTimer.Stop() {
select {
case <-trafficTimer.C:
default:
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
default:
}
case <-t.C:
checkAgain := w.IsConnecting()
select {
case <-w.TrafficAlert:
checkAgain = true
default:
}
if checkAgain {
t.Reset(w.trafficTimeout)
break
}
w.setConnectedStatus(true)
trafficTimer.Reset(w.trafficTimeout)
case <-trafficTimer.C: // Falls through when timer runs out
if w.verbose {
log.Warnf(log.WebsocketMgr,
"%v websocket: has not received a traffic alert in %v. Reconnecting",
w.exchangeName,
w.trafficTimeout)
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr,
"%v websocket: trafficMonitor shutdown err: %s",
w.exchangeName, err)
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
}
return
}
if w.IsConnected() {
// Routine pausing mechanism
go func(p chan<- struct{}) {
time.Sleep(defaultTrafficPeriod)
select {
case p <- struct{}{}:
default:
}
}(pause)
select {
case <-w.ShutdownC:
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-pause:
}
}
}
}()
}
func (w *Websocket) setConnectedStatus(b bool) {
w.fieldMutex.Lock()
w.connected = b
w.fieldMutex.Unlock()
func (w *Websocket) setState(s uint32) {
w.state.Store(s)
}
// IsConnected returns status of connection
// IsInitialised returns whether the websocket has been Setup() already
func (w *Websocket) IsInitialised() bool {
return w.state.Load() != uninitialised
}
// IsConnected returns whether the websocket is connected
func (w *Websocket) IsConnected() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connected
return w.state.Load() == connected
}
func (w *Websocket) setConnectingStatus(b bool) {
w.fieldMutex.Lock()
w.connecting = b
w.fieldMutex.Unlock()
}
// IsConnecting returns status of connecting
// IsConnecting returns whether the websocket is connecting
func (w *Websocket) IsConnecting() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connecting
return w.state.Load() == connecting
}
func (w *Websocket) setEnabled(b bool) {
w.fieldMutex.Lock()
w.enabled = b
w.fieldMutex.Unlock()
w.enabled.Store(b)
}
// IsEnabled returns status of enabled
// IsEnabled returns whether the websocket is enabled
func (w *Websocket) IsEnabled() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.enabled
}
func (w *Websocket) setInit(b bool) {
w.fieldMutex.Lock()
w.Init = b
w.fieldMutex.Unlock()
}
// IsInit returns status of init
func (w *Websocket) IsInit() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.Init
return w.enabled.Load()
}
func (w *Websocket) setTrafficMonitorRunning(b bool) {
w.fieldMutex.Lock()
w.trafficMonitorRunning = b
w.fieldMutex.Unlock()
w.trafficMonitorRunning.Store(b)
}
// IsTrafficMonitorRunning returns status of the traffic monitor
func (w *Websocket) IsTrafficMonitorRunning() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.trafficMonitorRunning
return w.trafficMonitorRunning.Load()
}
func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) {
w.fieldMutex.Lock()
defer w.fieldMutex.Unlock()
if w.connectionMonitorRunning {
return true
}
w.connectionMonitorRunning = true
return false
return !w.connectionMonitorRunning.CompareAndSwap(false, true)
}
func (w *Websocket) setConnectionMonitorRunning(b bool) {
w.fieldMutex.Lock()
w.connectionMonitorRunning = b
w.fieldMutex.Unlock()
w.connectionMonitorRunning.Store(b)
}
// IsConnectionMonitorRunning returns status of connection monitor
func (w *Websocket) IsConnectionMonitorRunning() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.connectionMonitorRunning
return w.connectionMonitorRunning.Load()
}
func (w *Websocket) setDataMonitorRunning(b bool) {
w.fieldMutex.Lock()
w.dataMonitorRunning = b
w.fieldMutex.Unlock()
w.dataMonitorRunning.Store(b)
}
// IsDataMonitorRunning returns status of data monitor
func (w *Websocket) IsDataMonitorRunning() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.dataMonitorRunning
return w.dataMonitorRunning.Load()
}
// CanUseAuthenticatedWebsocketForWrapper Handles a common check to
// verify whether a wrapper can use an authenticated websocket endpoint
func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool {
if w.IsConnected() && w.CanUseAuthenticatedEndpoints() {
return true
} else if w.IsConnected() && !w.CanUseAuthenticatedEndpoints() {
log.Infof(log.WebsocketMgr,
WebsocketNotAuthenticatedUsingRest,
w.exchangeName)
if w.IsConnected() {
if w.CanUseAuthenticatedEndpoints() {
return true
}
log.Infof(log.WebsocketMgr, WebsocketNotAuthenticatedUsingRest, w.exchangeName)
}
return false
}
@@ -820,28 +738,22 @@ func (w *Websocket) GetWebsocketURL() string {
// SetProxyAddress sets websocket proxy address
func (w *Websocket) SetProxyAddress(proxyAddr string) error {
w.m.Lock()
if proxyAddr != "" {
_, err := url.ParseRequestURI(proxyAddr)
if err != nil {
return fmt.Errorf("%v websocket: cannot set proxy address error '%v'",
w.exchangeName,
err)
if _, err := url.ParseRequestURI(proxyAddr); err != nil {
w.m.Unlock()
return fmt.Errorf("%v websocket: cannot set proxy address: %w", w.exchangeName, err)
}
if w.proxyAddr == proxyAddr {
return fmt.Errorf("%v websocket: cannot set proxy address to the same address '%v'",
w.exchangeName,
w.proxyAddr)
w.m.Unlock()
return fmt.Errorf("%v websocket: %w '%v'", w.exchangeName, errSameProxyAddress, w.proxyAddr)
}
log.Debugf(log.ExchangeSys,
"%s websocket: setting websocket proxy: %s\n",
w.exchangeName,
proxyAddr)
log.Debugf(log.ExchangeSys, "%s websocket: setting websocket proxy: %s", w.exchangeName, proxyAddr)
} else {
log.Debugf(log.ExchangeSys,
"%s websocket: removing websocket proxy\n",
w.exchangeName)
log.Debugf(log.ExchangeSys, "%s websocket: removing websocket proxy", w.exchangeName)
}
if w.Conn != nil {
@@ -852,15 +764,17 @@ func (w *Websocket) SetProxyAddress(proxyAddr string) error {
}
w.proxyAddr = proxyAddr
if w.IsInit() && w.IsEnabled() {
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
return err
}
if w.IsConnected() {
w.m.Unlock()
if err := w.Shutdown(); err != nil {
return err
}
return w.Connect()
}
w.m.Unlock()
return nil
}
@@ -1035,20 +949,14 @@ func (w *Websocket) GetSubscriptions() []subscription.Subscription {
return subs
}
// SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in
// a thread safe manner
func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) {
w.fieldMutex.Lock()
defer w.fieldMutex.Unlock()
w.canUseAuthenticatedEndpoints = val
// SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner
func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool) {
w.canUseAuthenticatedEndpoints.Store(b)
}
// CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in
// a thread safe manner
// CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner
func (w *Websocket) CanUseAuthenticatedEndpoints() bool {
w.fieldMutex.RLock()
defer w.fieldMutex.RUnlock()
return w.canUseAuthenticatedEndpoints
return w.canUseAuthenticatedEndpoints.Load()
}
// IsDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error

View File

@@ -50,9 +50,7 @@ func (w *WebsocketConnection) SendMessageReturnResponse(signature, request inter
return payload, nil
case <-timer.C:
timer.Stop()
return nil, fmt.Errorf("%s websocket connection: timeout waiting for response with signature: %v",
w.ExchangeName,
signature)
return nil, fmt.Errorf("%s websocket connection: timeout waiting for response with signature: %v", w.ExchangeName, signature)
}
}
@@ -72,25 +70,14 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header
w.Connection, conStatus, err = dialer.Dial(w.URL, headers)
if err != nil {
if conStatus != nil {
return fmt.Errorf("%s websocket connection: %v %v %v Error: %v",
w.ExchangeName,
w.URL,
conStatus,
conStatus.StatusCode,
err)
return fmt.Errorf("%s websocket connection: %v %v %v Error: %w", w.ExchangeName, w.URL, conStatus, conStatus.StatusCode, err)
}
return fmt.Errorf("%s websocket connection: %v Error: %v",
w.ExchangeName,
w.URL,
err)
return fmt.Errorf("%s websocket connection: %v Error: %w", w.ExchangeName, w.URL, err)
}
defer conStatus.Body.Close()
if w.Verbose {
log.Infof(log.WebsocketMgr,
"%v Websocket connected to %s\n",
w.ExchangeName,
w.URL)
log.Infof(log.WebsocketMgr, "%v Websocket connected to %s\n", w.ExchangeName, w.URL)
}
select {
case w.Traffic <- struct{}{}:
@@ -240,7 +227,7 @@ func (w *WebsocketConnection) ReadMessage() Response {
select {
case w.Traffic <- struct{}{}:
default: // causes contention, just bypass if there is no receiver.
default: // Non-Blocking write ensures 1 buffered signal per trafficCheckInterval to avoid flooding
}
var standardMessage []byte
@@ -285,7 +272,7 @@ func (w *WebsocketConnection) parseBinaryResponse(resp []byte) ([]byte, error) {
return standardMessage, reader.Close()
}
// GenerateMessageID Creates a messageID to checkout
// GenerateMessageID Creates a random message ID
func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64 {
var min int64 = 1e8
var max int64 = 2e8

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ package stream
import (
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
@@ -15,8 +16,6 @@ import (
// Websocket functionality list and state consts
const (
// WebsocketNotEnabled alerts of a disabled websocket
WebsocketNotEnabled = "exchange_websocket_not_enabled"
WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n"
Ping = "ping"
Pong = "pong"
@@ -25,18 +24,23 @@ const (
type subscriptionMap map[any]*subscription.Subscription
const (
uninitialised uint32 = iota
disconnected
connecting
connected
)
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing
type Websocket struct {
canUseAuthenticatedEndpoints bool
enabled bool
Init bool
connected bool
connecting bool
canUseAuthenticatedEndpoints atomic.Bool
enabled atomic.Bool
state atomic.Uint32
verbose bool
connectionMonitorRunning bool
trafficMonitorRunning bool
dataMonitorRunning bool
connectionMonitorRunning atomic.Bool
trafficMonitorRunning atomic.Bool
dataMonitorRunning atomic.Bool
trafficTimeout time.Duration
connectionMonitorDelay time.Duration
proxyAddr string
@@ -46,7 +50,6 @@ type Websocket struct {
runningURLAuth string
exchangeName string
m sync.Mutex
fieldMutex sync.RWMutex
connector func() error
subscriptionMutex sync.RWMutex