SyncManager: Optimise and fixes (#1229)

* SyncManager: Optimise and fixes

This is a fairly invasive change which addresses the amount of work the
sync manager does each cycle and the cycle intervals.
We switch to using discrete locks for each type of work on each pair,
so each worker can take a discrete chunk of work safely.
For performance and simplicity we now use a map for the currencyPairs.

* fix reporting when a websocket is reconnected
* fix not switching REST off after websocket available again
* fix race condition in isProcessing flag

This PR still could go further by avoiding cycling through everything
each time, and by pushing some one-time work for adding enabled pairs
down to a later stage off the hot path.
This was the smallest chunk of refactoring I felt could address
everything without changing too much.

Significant manual testing done with a variety of Timeouts to test for
edgecases and handling.

* SyncManager: Fix ticker/orderbook tracker linked

* SyncManager: Fix sync complete logging in update

* SyncManager: Fix pair format breaking sync key

Kraken seems to always switch to XBT_USDT format, but websockets still
pass around XBTUSDT format. Just to be safe this just removes the
delimiter to avoid any such issues

* SyncManager: Remove unused error

* SyncManager: Remove unused IsProcessing flag

* SyncManager: Fix Update test add() pair format

We had to unify pair format inside sync manager, so test needs to do the
same
This commit is contained in:
Gareth Kirwan
2023-07-12 01:17:39 +01:00
committed by GitHub
parent b403e12d3e
commit 54f745e943
4 changed files with 356 additions and 381 deletions

View File

@@ -77,7 +77,7 @@ type iCurrencyPairSyncer interface {
IsRunning() bool
PrintTickerSummary(*ticker.Price, string, error)
PrintOrderbookSummary(*orderbook.Base, string, error)
Update(string, currency.Pair, asset.Item, int, error) error
Update(string, currency.Pair, asset.Item, syncItemType, error) error
}
// iDatabaseConnectionManager defines a limited scoped databaseConnectionManager

View File

@@ -6,12 +6,14 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stats"
@@ -19,9 +21,11 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)
type syncItemType int
// const holds the sync item types
const (
SyncItemTicker = iota
SyncItemTicker syncItemType = iota
SyncItemOrderbook
SyncItemTrade
SyncManagerName = "exchange_syncer"
@@ -38,7 +42,6 @@ var (
DefaultSyncerTimeoutWebsocket = time.Minute
errNoSyncItemsEnabled = errors.New("no sync items enabled")
errUnknownSyncItem = errors.New("unknown sync item")
errSyncPairNotFound = errors.New("exchange currency pair syncer not found")
errCouldNotSyncNewData = errors.New("could not sync new data")
)
@@ -90,6 +93,7 @@ func setupSyncManager(c *SyncManagerConfig, exchangeManager iExchangeManager, re
fiatDisplayCurrency: c.FiatDisplayCurrency,
format: *c.PairFormatDisplay,
tickerBatchLastRequested: make(map[string]time.Time),
currencyPairs: make(map[currencyPairKey]*currencyPairSyncAgent),
}
log.Debugf(log.SyncMgr,
@@ -116,6 +120,7 @@ func (m *syncManager) Start() error {
if !atomic.CompareAndSwapInt32(&m.started, 0, 1) {
return ErrSubSystemAlreadyStarted
}
m.shutdown = make(chan bool)
m.initSyncWG.Add(1)
m.inService.Done()
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer started.")
@@ -171,30 +176,21 @@ func (m *syncManager) Start() error {
continue
}
for i := range enabledPairs {
if m.exists(exchangeName, enabledPairs[i], assetTypes[y]) {
k := currencyPairKey{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i].Format(currency.PairFormat{Uppercase: true}),
}
if e := m.get(k); e != nil {
continue
}
c := &currencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
sBase := syncBase{
IsUsingREST: usingREST || !wsAssetSupported,
IsUsingWebsocket: usingWebsocket && wsAssetSupported,
}
if m.config.SynchronizeTicker {
c.Ticker = sBase
}
if m.config.SynchronizeOrderbook {
c.Orderbook = sBase
}
if m.config.SynchronizeTrades {
c.Trade = sBase
}
m.add(c)
m.add(k, sBase)
}
}
}
@@ -244,50 +240,54 @@ func (m *syncManager) Stop() error {
if !atomic.CompareAndSwapInt32(&m.started, 1, 0) {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrSubSystemNotStarted)
}
close(m.shutdown)
m.inService.Add(1)
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopped.")
return nil
}
func (m *syncManager) get(exchangeName string, p currency.Pair, a asset.Item) (*currencyPairSyncAgent, error) {
func (m *syncManager) get(k currencyPairKey) *currencyPairSyncAgent {
m.mux.Lock()
defer m.mux.Unlock()
for x := range m.currencyPairs {
if m.currencyPairs[x].Exchange == exchangeName &&
m.currencyPairs[x].Pair.Equal(p) &&
m.currencyPairs[x].AssetType == a {
return &m.currencyPairs[x], nil
}
}
return nil, fmt.Errorf("%v %v %v %w", exchangeName, a, p, errSyncPairNotFound)
return m.currencyPairs[k]
}
func (m *syncManager) exists(exchangeName string, p currency.Pair, a asset.Item) bool {
m.mux.Lock()
defer m.mux.Unlock()
for x := range m.currencyPairs {
if m.currencyPairs[x].Exchange == exchangeName &&
m.currencyPairs[x].Pair.Equal(p) &&
m.currencyPairs[x].AssetType == a {
return true
}
func newCurrencyPairSyncAgent(k currencyPairKey) *currencyPairSyncAgent {
return &currencyPairSyncAgent{
currencyPairKey: k,
Created: time.Now(),
locks: make([]sync.Mutex, SyncItemTrade+1),
trackers: make([]*syncBase, SyncItemTrade+1),
}
return false
}
func (m *syncManager) add(c *currencyPairSyncAgent) {
func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent {
m.mux.Lock()
defer m.mux.Unlock()
c := newCurrencyPairSyncAgent(k)
if m.config.SynchronizeTicker {
s := s
c.trackers[SyncItemTicker] = &s
}
if m.config.SynchronizeOrderbook {
s := s
c.trackers[SyncItemOrderbook] = &s
}
if m.config.SynchronizeTrades {
s := s
c.trackers[SyncItemTrade] = &s
}
if m.config.SynchronizeTicker {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added ticker sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Ticker.IsUsingWebsocket,
c.Ticker.IsUsingREST)
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemTicker].IsUsingWebsocket,
c.trackers[SyncItemTicker].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
m.initSyncWG.Add(1)
@@ -299,8 +299,8 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added orderbook sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Orderbook.IsUsingWebsocket,
c.Orderbook.IsUsingREST)
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemOrderbook].IsUsingWebsocket,
c.trackers[SyncItemOrderbook].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
m.initSyncWG.Add(1)
@@ -312,8 +312,8 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added trade sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.Trade.IsUsingWebsocket,
c.Trade.IsUsingREST)
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemTrade].IsUsingWebsocket,
c.trackers[SyncItemTrade].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
m.initSyncWG.Add(1)
@@ -321,61 +321,24 @@ func (m *syncManager) add(c *currencyPairSyncAgent) {
}
}
c.Created = time.Now()
m.currencyPairs = append(m.currencyPairs, *c)
}
func (m *syncManager) isProcessing(exchangeName string, p currency.Pair, a asset.Item, syncType int) bool {
m.mux.Lock()
defer m.mux.Unlock()
for x := range m.currencyPairs {
if m.currencyPairs[x].Exchange == exchangeName &&
m.currencyPairs[x].Pair.Equal(p) &&
m.currencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
return m.currencyPairs[x].Ticker.IsProcessing
case SyncItemOrderbook:
return m.currencyPairs[x].Orderbook.IsProcessing
case SyncItemTrade:
return m.currencyPairs[x].Trade.IsProcessing
}
}
if m.currencyPairs == nil {
m.currencyPairs = make(map[currencyPairKey]*currencyPairSyncAgent)
}
return false
}
m.currencyPairs[k] = c
func (m *syncManager) setProcessing(exchangeName string, p currency.Pair, a asset.Item, syncType int, processing bool) {
m.mux.Lock()
defer m.mux.Unlock()
for x := range m.currencyPairs {
if m.currencyPairs[x].Exchange == exchangeName &&
m.currencyPairs[x].Pair.Equal(p) &&
m.currencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
m.currencyPairs[x].Ticker.IsProcessing = processing
case SyncItemOrderbook:
m.currencyPairs[x].Orderbook.IsProcessing = processing
case SyncItemTrade:
m.currencyPairs[x].Trade.IsProcessing = processing
}
}
}
return c
}
// Update notifies the syncManager to change the last updated time for a exchange asset pair
func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item, syncType int, err error) error {
// And set IsUsingWebsocket to true. It should be used externally only from websocket updaters
func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item, syncType syncItemType, err error) error {
if m == nil {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem)
}
if atomic.LoadInt32(&m.started) == 0 {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrSubSystemNotStarted)
}
if atomic.LoadInt32(&m.initSyncStarted) != 1 {
return nil
}
@@ -397,72 +360,66 @@ func (m *syncManager) Update(exchangeName string, p currency.Pair, a asset.Item,
return fmt.Errorf("%v %w", syncType, errUnknownSyncItem)
}
m.mux.Lock()
defer m.mux.Unlock()
for x := range m.currencyPairs {
if m.currencyPairs[x].Exchange == exchangeName &&
m.currencyPairs[x].Pair.Equal(p) &&
m.currencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
origHadData := m.currencyPairs[x].Ticker.HaveData
m.currencyPairs[x].Ticker.LastUpdated = time.Now()
if err != nil {
m.currencyPairs[x].Ticker.NumErrors++
}
m.currencyPairs[x].Ticker.HaveData = true
m.currencyPairs[x].Ticker.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
createdCounter)
m.initSyncWG.Done()
}
return nil
case SyncItemOrderbook:
origHadData := m.currencyPairs[x].Orderbook.HaveData
m.currencyPairs[x].Orderbook.LastUpdated = time.Now()
if err != nil {
m.currencyPairs[x].Orderbook.NumErrors++
}
m.currencyPairs[x].Orderbook.HaveData = true
m.currencyPairs[x].Orderbook.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
createdCounter)
m.initSyncWG.Done()
}
return nil
case SyncItemTrade:
origHadData := m.currencyPairs[x].Trade.HaveData
m.currencyPairs[x].Trade.LastUpdated = time.Now()
if err != nil {
m.currencyPairs[x].Trade.NumErrors++
}
m.currencyPairs[x].Trade.HaveData = true
m.currencyPairs[x].Trade.IsProcessing = false
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].",
exchangeName,
m.FormatCurrency(p).String(),
removedCounter,
createdCounter)
m.initSyncWG.Done()
}
return nil
}
}
k := currencyPairKey{
AssetType: a,
Exchange: exchangeName,
Pair: p.Format(currency.PairFormat{Uppercase: true}),
}
return fmt.Errorf("%w for %s %s %s", errCouldNotSyncNewData, exchangeName, p, a)
c, exists := m.currencyPairs[k]
if !exists {
return fmt.Errorf("%w for %s %s %s %s", errCouldNotSyncNewData, k.Exchange, k.Pair, k.AssetType, syncType)
}
c.locks[syncType].Lock()
defer c.locks[syncType].Unlock()
if c.trackers[syncType] == nil {
c.trackers[syncType] = &syncBase{}
}
s := c.trackers[syncType]
if !s.IsUsingWebsocket {
s.IsUsingWebsocket = true
s.IsUsingREST = false
log.Warnf(log.SyncMgr,
"%s %s %s: %s Websocket re-enabled, switching from rest to websocket",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
syncType,
)
}
return m.update(c, syncType, err)
}
// update notifies the syncManager to change the last updated time for a exchange asset pair
func (m *syncManager) update(c *currencyPairSyncAgent, syncType syncItemType, err error) error {
if syncType < SyncItemTicker || syncType > SyncItemTrade {
return fmt.Errorf("%v %w", syncType, errUnknownSyncItem)
}
s := c.trackers[syncType]
origHadData := s.HaveData
s.LastUpdated = time.Now()
if err != nil {
s.NumErrors++
}
s.HaveData = true
if atomic.LoadInt32(&m.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s %s sync complete %v [%d/%d].",
c.Exchange,
syncType,
m.FormatCurrency(c.Pair),
removedCounter,
createdCounter)
m.initSyncWG.Done()
}
return nil
}
func (m *syncManager) worker() {
@@ -472,226 +429,84 @@ func (m *syncManager) worker() {
}
defer cleanup()
for atomic.LoadInt32(&m.started) != 0 {
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.SyncMgr, "Sync manager cannot get exchanges: %v", err)
}
for x := range exchanges {
exchangeName := exchanges[x].GetName()
supportsREST := exchanges[x].SupportsREST()
supportsRESTTickerBatching := exchanges[x].SupportsRESTTickerBatchUpdates()
var usingREST bool
var usingWebsocket bool
var switchedToRest bool
if exchanges[x].SupportsWebsocket() && exchanges[x].IsWebsocketEnabled() {
ws, err := exchanges[x].GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr,
"%s unable to get websocket pointer. Err: %s",
exchangeName,
err)
usingREST = true
}
interval := greatestCommonDivisor(m.config.TimeoutWebsocket, m.config.TimeoutREST)
if interval > time.Second {
interval = time.Second
}
t := time.NewTicker(interval)
if ws.IsConnected() {
usingWebsocket = true
} else {
usingREST = true
}
} else if supportsREST {
usingREST = true
for {
select {
case <-m.shutdown:
return
case <-t.C:
exchanges, err := m.exchangeManager.GetExchanges()
if err != nil {
log.Errorf(log.SyncMgr, "Sync manager cannot get exchanges: %v", err)
continue
}
assetTypes := exchanges[x].GetAssetTypes(true)
for y := range assetTypes {
wsAssetSupported := exchanges[x].IsAssetWebsocketSupported(assetTypes[y])
enabledPairs, err := exchanges[x].GetEnabledPairs(assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr,
"%s failed to get enabled pairs. Err: %s",
exchangeName,
err)
continue
}
for i := range enabledPairs {
if atomic.LoadInt32(&m.started) == 0 {
return
for _, e := range exchanges {
exchangeName := e.GetName()
supportsREST := e.SupportsREST()
// TODO: These vars are only used for enabling new pairs, deriving them every cycle is sub-optimal
var usingREST bool
var usingWebsocket bool
if e.SupportsWebsocket() && e.IsWebsocketEnabled() {
ws, err := e.GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr,
"%s unable to get websocket pointer. Err: %s",
exchangeName,
err)
usingREST = true
}
c, err := m.get(exchangeName, enabledPairs[i], assetTypes[y])
if err != nil {
if err == errSyncPairNotFound {
c = &currencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
if ws.IsConnected() {
usingWebsocket = true
} else {
usingREST = true
}
} else if supportsREST {
usingREST = true
}
sBase := syncBase{
assetTypes := e.GetAssetTypes(true)
for y := range assetTypes {
wsAssetSupported := e.IsAssetWebsocketSupported(assetTypes[y])
enabledPairs, err := e.GetEnabledPairs(assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr,
"%s failed to get enabled pairs. Err: %s",
e.GetName(),
err)
continue
}
for i := range enabledPairs {
if atomic.LoadInt32(&m.started) == 0 {
return
}
k := currencyPairKey{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i].Format(currency.PairFormat{Uppercase: true}),
}
c := m.get(k)
if c == nil {
c = m.add(k, syncBase{
IsUsingREST: usingREST || !wsAssetSupported,
IsUsingWebsocket: usingWebsocket && wsAssetSupported,
}
if m.config.SynchronizeTicker {
c.Ticker = sBase
}
if m.config.SynchronizeOrderbook {
c.Orderbook = sBase
}
if m.config.SynchronizeTrades {
c.Trade = sBase
}
m.add(c)
} else {
log.Errorln(log.SyncMgr, err)
continue
}
}
if switchedToRest && usingWebsocket {
log.Warnf(log.SyncMgr,
"%s %s: Websocket re-enabled, switching from rest to websocket",
c.Exchange, m.FormatCurrency(enabledPairs[i]).String())
switchedToRest = false
}
if m.config.SynchronizeOrderbook {
if !m.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemOrderbook) {
if c.Orderbook.LastUpdated.IsZero() ||
(time.Since(c.Orderbook.LastUpdated) > m.config.TimeoutREST && c.Orderbook.IsUsingREST) ||
(time.Since(c.Orderbook.LastUpdated) > m.config.TimeoutWebsocket && c.Orderbook.IsUsingWebsocket) {
if c.Orderbook.IsUsingWebsocket {
if time.Since(c.Created) < m.config.TimeoutWebsocket {
continue
}
if supportsREST {
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, true)
c.Orderbook.IsUsingWebsocket = false
c.Orderbook.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
switchedToRest = true
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false)
}
}
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, true)
result, err := exchanges[x].UpdateOrderbook(context.TODO(),
c.Pair,
c.AssetType)
m.PrintOrderbookSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "orderbook_update", c.AssetType.String(), exchangeName)
}
}
updateErr := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, err)
if updateErr != nil {
log.Errorln(log.SyncMgr, updateErr)
}
} else {
time.Sleep(time.Millisecond * 50)
}
})
}
if m.config.SynchronizeTicker {
if !m.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTicker) {
if c.Ticker.LastUpdated.IsZero() ||
(time.Since(c.Ticker.LastUpdated) > m.config.TimeoutREST && c.Ticker.IsUsingREST) ||
(time.Since(c.Ticker.LastUpdated) > m.config.TimeoutWebsocket && c.Ticker.IsUsingWebsocket) {
if c.Ticker.IsUsingWebsocket {
if time.Since(c.Created) < m.config.TimeoutWebsocket {
continue
}
if supportsREST {
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, true)
c.Ticker.IsUsingWebsocket = false
c.Ticker.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(enabledPairs[i]).String(),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
switchedToRest = true
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, false)
}
}
if c.Ticker.IsUsingREST {
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, true)
var result *ticker.Price
var err error
if supportsRESTTickerBatching {
m.mux.Lock()
batchLastDone, ok := m.tickerBatchLastRequested[exchangeName]
if !ok {
m.tickerBatchLastRequested[exchangeName] = time.Time{}
}
m.mux.Unlock()
if batchLastDone.IsZero() || time.Since(batchLastDone) > m.config.TimeoutREST {
m.mux.Lock()
if m.config.Verbose {
log.Debugf(log.SyncMgr, "Initialising %s REST ticker batching", exchangeName)
}
err = exchanges[x].UpdateTickers(context.TODO(), c.AssetType)
if err == nil {
result, err = exchanges[x].FetchTicker(context.TODO(), c.Pair, c.AssetType)
}
m.tickerBatchLastRequested[exchangeName] = time.Now()
m.mux.Unlock()
} else {
if m.config.Verbose {
log.Debugf(log.SyncMgr, "%s Using recent batching cache", exchangeName)
}
result, err = exchanges[x].FetchTicker(context.TODO(),
c.Pair,
c.AssetType)
}
} else {
result, err = exchanges[x].UpdateTicker(context.TODO(),
c.Pair,
c.AssetType)
}
m.PrintTickerSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "ticker_update", c.AssetType.String(), exchangeName)
}
}
updateErr := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, err)
if updateErr != nil {
log.Errorln(log.SyncMgr, updateErr)
}
}
} else {
time.Sleep(time.Millisecond * 50)
}
}
m.syncTicker(c, e)
}
if m.config.SynchronizeOrderbook {
m.syncOrderbook(c, e)
}
if m.config.SynchronizeTrades {
if !m.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTrade) {
if c.Trade.LastUpdated.IsZero() || time.Since(c.Trade.LastUpdated) > m.config.TimeoutREST {
m.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, true)
err := m.Update(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, nil)
if err != nil {
log.Errorln(log.SyncMgr, err)
}
}
}
m.syncTrades(c)
}
}
}
@@ -700,6 +515,136 @@ func (m *syncManager) worker() {
}
}
func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchange) {
if !c.locks[SyncItemTicker].TryLock() {
return
}
defer c.locks[SyncItemTicker].Unlock()
exchangeName := e.GetName()
s := c.trackers[SyncItemTicker]
if s.IsUsingWebsocket &&
e.SupportsREST() &&
time.Since(s.LastUpdated) > m.config.TimeoutWebsocket &&
time.Since(c.Created) > m.config.TimeoutWebsocket {
// Downgrade to REST
s.IsUsingWebsocket = false
s.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
}
if s.IsUsingREST && time.Since(s.LastUpdated) > m.config.TimeoutREST {
var result *ticker.Price
var err error
if e.SupportsRESTTickerBatchUpdates() {
m.mux.Lock()
batchLastDone, ok := m.tickerBatchLastRequested[e.GetName()]
if !ok {
m.tickerBatchLastRequested[exchangeName] = time.Time{}
}
m.mux.Unlock()
if batchLastDone.IsZero() || time.Since(batchLastDone) > m.config.TimeoutREST {
m.mux.Lock()
if m.config.Verbose {
log.Debugf(log.SyncMgr, "Initialising %s REST ticker batching", exchangeName)
}
err = e.UpdateTickers(context.TODO(), c.AssetType)
if err == nil {
result, err = e.FetchTicker(context.TODO(), c.Pair, c.AssetType)
}
m.tickerBatchLastRequested[exchangeName] = time.Now()
m.mux.Unlock()
} else {
if m.config.Verbose {
log.Debugf(log.SyncMgr, "%s Using recent batching cache", exchangeName)
}
result, err = e.FetchTicker(context.TODO(),
c.Pair,
c.AssetType)
}
} else {
result, err = e.UpdateTicker(context.TODO(),
c.Pair,
c.AssetType)
}
m.PrintTickerSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "ticker_update", c.AssetType.String(), exchangeName)
}
}
updateErr := m.update(c, SyncItemTicker, err)
if updateErr != nil {
log.Errorln(log.SyncMgr, updateErr)
}
}
}
func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExchange) {
if !c.locks[SyncItemOrderbook].TryLock() {
return
}
defer c.locks[SyncItemOrderbook].Unlock()
s := c.trackers[SyncItemOrderbook]
if s.IsUsingWebsocket &&
e.SupportsREST() &&
time.Since(s.LastUpdated) > m.config.TimeoutWebsocket &&
time.Since(c.Created) > m.config.TimeoutWebsocket {
// Downgrade to REST
s.IsUsingWebsocket = false
s.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
m.config.TimeoutWebsocket,
)
}
if s.IsUsingREST && time.Since(s.LastUpdated) > m.config.TimeoutREST {
result, err := e.UpdateOrderbook(context.TODO(),
c.Pair,
c.AssetType)
m.PrintOrderbookSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "orderbook_update", c.AssetType.String(), e.GetName())
}
}
updateErr := m.update(c, SyncItemOrderbook, err)
if updateErr != nil {
log.Errorln(log.SyncMgr, updateErr)
}
}
}
func (m *syncManager) syncTrades(c *currencyPairSyncAgent) {
if !c.locks[SyncItemTrade].TryLock() {
return
}
defer c.locks[SyncItemTrade].Unlock()
if time.Since(c.trackers[SyncItemTrade].LastUpdated) > m.config.TimeoutREST {
err := m.update(c, SyncItemTrade, nil)
if err != nil {
log.Errorln(log.SyncMgr, err)
}
}
}
func printCurrencyFormat(price float64, displayCurrency currency.Code) string {
displaySymbol, err := currency.GetSymbolByCurrencyName(displayCurrency)
if err != nil {
@@ -918,3 +863,25 @@ func relayWebsocketEvent(result interface{}, event, assetType, exchangeName stri
event, err)
}
}
func greatestCommonDivisor(a, b time.Duration) time.Duration {
for b != 0 {
t := b
b = a % b
a = t
}
return a
}
func (s syncItemType) String() string {
switch s {
case SyncItemTicker:
return "Ticker"
case SyncItemOrderbook:
return "Orderbook"
case SyncItemTrade:
return "Trade"
default:
return fmt.Sprintf("Invalid syncItemType: %d", s)
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
)
@@ -283,52 +284,55 @@ func TestSyncManagerUpdate(t *testing.T) {
m.initSyncStarted = 1
// orderbook not enabled
err = m.Update("", currency.EMPTYPAIR, 1, 1, nil)
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemOrderbook, nil)
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
m.config.SynchronizeOrderbook = true
// ticker not enabled
err = m.Update("", currency.EMPTYPAIR, 1, 0, nil)
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemTicker, nil)
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
m.config.SynchronizeTicker = true
// trades not enabled
err = m.Update("", currency.EMPTYPAIR, 1, 2, nil)
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemTrade, nil)
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
m.config.SynchronizeTrades = true
err = m.Update("", currency.EMPTYPAIR, 1, 1336, nil)
err = m.Update("", currency.EMPTYPAIR, asset.Spot, 1336, nil)
if !errors.Is(err, errUnknownSyncItem) {
t.Fatalf("received %v, but expected: %v", err, errUnknownSyncItem)
}
err = m.Update("", currency.EMPTYPAIR, 1, 1, nil)
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemOrderbook, nil)
if !errors.Is(err, errCouldNotSyncNewData) {
t.Fatalf("received %v, but expected: %v", err, errCouldNotSyncNewData)
}
m.currencyPairs = append(m.currencyPairs, currencyPairSyncAgent{AssetType: 1})
m.add(currencyPairKey{
AssetType: asset.Spot,
Pair: currency.EMPTYPAIR.Format(currency.PairFormat{Uppercase: true}),
}, syncBase{})
m.initSyncWG.Add(3)
// orderbook match
err = m.Update("", currency.EMPTYPAIR, 1, 1, errors.New("test"))
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemOrderbook, errors.New("test"))
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
// ticker match
err = m.Update("", currency.EMPTYPAIR, 1, 0, errors.New("test"))
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemTicker, errors.New("test"))
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}
// trades match
err = m.Update("", currency.EMPTYPAIR, 1, 2, errors.New("test"))
err = m.Update("", currency.EMPTYPAIR, asset.Spot, SyncItemTrade, errors.New("test"))
if !errors.Is(err, nil) {
t.Fatalf("received %v, but expected: %v", err, nil)
}

View File

@@ -13,21 +13,24 @@ import (
type syncBase struct {
IsUsingWebsocket bool
IsUsingREST bool
IsProcessing bool
LastUpdated time.Time
HaveData bool
NumErrors int
}
// currencyPairSyncAgent stores the sync agent info
type currencyPairSyncAgent struct {
Created time.Time
// currencyPairKey is the map key for the sync agents
type currencyPairKey struct {
Exchange string
AssetType asset.Item
Pair currency.Pair
Ticker syncBase
Orderbook syncBase
Trade syncBase
}
// currencyPairSyncAgent stores the sync agent info
type currencyPairSyncAgent struct {
currencyPairKey
Created time.Time
trackers []*syncBase
locks []sync.Mutex
}
// SyncManagerConfig stores the currency pair synchronization manager config
@@ -49,6 +52,7 @@ type syncManager struct {
initSyncCompleted int32
initSyncStarted int32
started int32
shutdown chan bool
format currency.PairFormat
initSyncStartTime time.Time
fiatDisplayCurrency currency.Code
@@ -57,7 +61,7 @@ type syncManager struct {
initSyncWG sync.WaitGroup
inService sync.WaitGroup
currencyPairs []currencyPairSyncAgent
currencyPairs map[currencyPairKey]*currencyPairSyncAgent
tickerBatchLastRequested map[string]time.Time
remoteConfig *config.RemoteControlConfig