Files
gocryptotrader/engine/syncer.go
Adrian Gallagher 467d8d91a2 Coinbene: Add swap endpoints and code cleanup/bugfixes (#403)
* Start Coinbene SWAP implementation

* Flesh out more API endpoints

* Code cleanup

* Add more endpoints, bug fixes and order validation checks

* More endpoints, tests and bugfixes

* Remove omitempty for enabled/available pairs
2019-12-18 17:36:07 +11:00

610 lines
18 KiB
Go

package engine
import (
"errors"
"strings"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
log "github.com/thrasher-corp/gocryptotrader/logger"
)
// const holds the sync item types
const (
SyncItemTicker = iota
SyncItemOrderbook
SyncItemTrade
DefaultSyncerWorkers = 15
DefaultSyncerTimeout = time.Second * 15
)
var (
createdCounter = 0
removedCounter = 0
)
// NewCurrencyPairSyncer starts a new CurrencyPairSyncer
func NewCurrencyPairSyncer(c CurrencyPairSyncerConfig) (*ExchangeCurrencyPairSyncer, error) {
if !c.SyncOrderbook && !c.SyncTicker && !c.SyncTrades {
return nil, errors.New("no sync items enabled")
}
if c.NumWorkers <= 0 {
c.NumWorkers = DefaultSyncerWorkers
}
if c.SyncTimeout <= time.Duration(0) {
c.SyncTimeout = DefaultSyncerTimeout
}
s := ExchangeCurrencyPairSyncer{
Cfg: CurrencyPairSyncerConfig{
SyncTicker: c.SyncTicker,
SyncOrderbook: c.SyncOrderbook,
SyncTrades: c.SyncTrades,
SyncContinuously: c.SyncContinuously,
SyncTimeout: c.SyncTimeout,
NumWorkers: c.NumWorkers,
},
}
s.tickerBatchLastRequested = make(map[string]time.Time)
log.Debugf(log.SyncMgr,
"Exchange currency pair syncer config: continuous: %v ticker: %v"+
" orderbook: %v trades: %v workers: %v verbose: %v timeout: %v\n",
s.Cfg.SyncContinuously, s.Cfg.SyncTicker, s.Cfg.SyncOrderbook,
s.Cfg.SyncTrades, s.Cfg.NumWorkers, s.Cfg.Verbose, s.Cfg.SyncTimeout)
return &s, nil
}
func (e *ExchangeCurrencyPairSyncer) get(exchangeName string, p currency.Pair, a asset.Item) (*CurrencyPairSyncAgent, error) {
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == exchangeName &&
e.CurrencyPairs[x].Pair.Equal(p) &&
e.CurrencyPairs[x].AssetType == a {
return &e.CurrencyPairs[x], nil
}
}
return nil, errors.New("exchange currency pair syncer not found")
}
func (e *ExchangeCurrencyPairSyncer) exists(exchangeName string, p currency.Pair, a asset.Item) bool {
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == exchangeName &&
e.CurrencyPairs[x].Pair.Equal(p) &&
e.CurrencyPairs[x].AssetType == a {
return true
}
}
return false
}
func (e *ExchangeCurrencyPairSyncer) add(c *CurrencyPairSyncAgent) {
e.mux.Lock()
defer e.mux.Unlock()
if e.Cfg.SyncTicker {
if e.Cfg.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added ticker sync item %v: using websocket: %v using REST: %v\n",
c.Exchange, FormatCurrency(c.Pair).String(), c.Ticker.IsUsingWebsocket,
c.Ticker.IsUsingREST)
}
if atomic.LoadInt32(&e.initSyncCompleted) != 1 {
e.initSyncWG.Add(1)
createdCounter++
}
}
if e.Cfg.SyncOrderbook {
if e.Cfg.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added orderbook sync item %v: using websocket: %v using REST: %v\n",
c.Exchange, FormatCurrency(c.Pair).String(), c.Orderbook.IsUsingWebsocket,
c.Orderbook.IsUsingREST)
}
if atomic.LoadInt32(&e.initSyncCompleted) != 1 {
e.initSyncWG.Add(1)
createdCounter++
}
}
if e.Cfg.SyncTrades {
if e.Cfg.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added trade sync item %v: using websocket: %v using REST: %v\n",
c.Exchange, FormatCurrency(c.Pair).String(), c.Trade.IsUsingWebsocket,
c.Trade.IsUsingREST)
}
if atomic.LoadInt32(&e.initSyncCompleted) != 1 {
e.initSyncWG.Add(1)
createdCounter++
}
}
c.Created = time.Now()
e.CurrencyPairs = append(e.CurrencyPairs, *c)
}
func (e *ExchangeCurrencyPairSyncer) remove(c *CurrencyPairSyncAgent) {
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == c.Exchange &&
e.CurrencyPairs[x].Pair.Equal(c.Pair) &&
e.CurrencyPairs[x].AssetType == c.AssetType {
e.CurrencyPairs = append(e.CurrencyPairs[:x], e.CurrencyPairs[x+1:]...)
return
}
}
}
func (e *ExchangeCurrencyPairSyncer) isProcessing(exchangeName string, p currency.Pair, a asset.Item, syncType int) bool {
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == exchangeName &&
e.CurrencyPairs[x].Pair.Equal(p) &&
e.CurrencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
return e.CurrencyPairs[x].Ticker.IsProcessing
case SyncItemOrderbook:
return e.CurrencyPairs[x].Orderbook.IsProcessing
case SyncItemTrade:
return e.CurrencyPairs[x].Trade.IsProcessing
}
}
}
return false
}
func (e *ExchangeCurrencyPairSyncer) setProcessing(exchangeName string, p currency.Pair, a asset.Item, syncType int, processing bool) {
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == exchangeName &&
e.CurrencyPairs[x].Pair.Equal(p) &&
e.CurrencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
e.CurrencyPairs[x].Ticker.IsProcessing = processing
case SyncItemOrderbook:
e.CurrencyPairs[x].Orderbook.IsProcessing = processing
case SyncItemTrade:
e.CurrencyPairs[x].Trade.IsProcessing = processing
}
}
}
}
func (e *ExchangeCurrencyPairSyncer) update(exchangeName string, p currency.Pair, a asset.Item, syncType int, err error) {
if atomic.LoadInt32(&e.initSyncStarted) != 1 {
return
}
switch syncType {
case SyncItemOrderbook, SyncItemTrade, SyncItemTicker:
if !e.Cfg.SyncOrderbook && syncType == SyncItemOrderbook {
return
}
if !e.Cfg.SyncTicker && syncType == SyncItemTicker {
return
}
if !e.Cfg.SyncTrades && syncType == SyncItemTrade {
return
}
default:
log.Warnf(log.SyncMgr, "ExchangeCurrencyPairSyncer: unknown sync item %v\n", syncType)
return
}
e.mux.Lock()
defer e.mux.Unlock()
for x := range e.CurrencyPairs {
if e.CurrencyPairs[x].Exchange == exchangeName &&
e.CurrencyPairs[x].Pair.Equal(p) &&
e.CurrencyPairs[x].AssetType == a {
switch syncType {
case SyncItemTicker:
origHadData := e.CurrencyPairs[x].Ticker.HaveData
e.CurrencyPairs[x].Ticker.LastUpdated = time.Now()
if err != nil {
e.CurrencyPairs[x].Ticker.NumErrors++
}
e.CurrencyPairs[x].Ticker.HaveData = true
e.CurrencyPairs[x].Ticker.IsProcessing = false
if atomic.LoadInt32(&e.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s ticker sync complete %v [%d/%d].\n",
exchangeName, FormatCurrency(p).String(), removedCounter, createdCounter)
e.initSyncWG.Done()
}
case SyncItemOrderbook:
origHadData := e.CurrencyPairs[x].Orderbook.HaveData
e.CurrencyPairs[x].Orderbook.LastUpdated = time.Now()
if err != nil {
e.CurrencyPairs[x].Orderbook.NumErrors++
}
e.CurrencyPairs[x].Orderbook.HaveData = true
e.CurrencyPairs[x].Orderbook.IsProcessing = false
if atomic.LoadInt32(&e.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s orderbook sync complete %v [%d/%d].\n",
exchangeName, FormatCurrency(p).String(), removedCounter, createdCounter)
e.initSyncWG.Done()
}
case SyncItemTrade:
origHadData := e.CurrencyPairs[x].Trade.HaveData
e.CurrencyPairs[x].Trade.LastUpdated = time.Now()
if err != nil {
e.CurrencyPairs[x].Trade.NumErrors++
}
e.CurrencyPairs[x].Trade.HaveData = true
e.CurrencyPairs[x].Trade.IsProcessing = false
if atomic.LoadInt32(&e.initSyncCompleted) != 1 && !origHadData {
removedCounter++
log.Debugf(log.SyncMgr, "%s trade sync complete %v [%d/%d].\n",
exchangeName, FormatCurrency(p).String(), removedCounter, createdCounter)
e.initSyncWG.Done()
}
}
}
}
}
func (e *ExchangeCurrencyPairSyncer) worker() {
cleanup := func() {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer worker shutting down.")
}
defer cleanup()
for atomic.LoadInt32(&e.shutdown) != 1 {
for x := range Bot.Exchanges {
if !Bot.Exchanges[x].IsEnabled() {
continue
}
exchangeName := Bot.Exchanges[x].GetName()
assetTypes := Bot.Exchanges[x].GetAssetTypes()
supportsREST := Bot.Exchanges[x].SupportsREST()
supportsRESTTickerBatching := Bot.Exchanges[x].SupportsRESTTickerBatchUpdates()
var usingREST bool
var usingWebsocket bool
var switchedToRest bool
if Bot.Exchanges[x].SupportsWebsocket() && Bot.Exchanges[x].IsWebsocketEnabled() {
ws, err := Bot.Exchanges[x].GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr, "%s unable to get websocket pointer. Err: %s\n",
exchangeName, err)
usingREST = true
}
if ws.IsConnected() {
usingWebsocket = true
} else {
usingREST = true
}
} else if supportsREST {
usingREST = true
}
for y := range assetTypes {
enabledPairs := Bot.Exchanges[x].GetEnabledPairs(assetTypes[y])
for i := range enabledPairs {
if atomic.LoadInt32(&e.shutdown) == 1 {
return
}
if !e.exists(exchangeName, enabledPairs[i], assetTypes[y]) {
c := CurrencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
if e.Cfg.SyncTicker {
c.Ticker = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
if e.Cfg.SyncOrderbook {
c.Orderbook = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
if e.Cfg.SyncTrades {
c.Trade = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
e.add(&c)
}
c, err := e.get(exchangeName, enabledPairs[i], assetTypes[y])
if err != nil {
log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err)
continue
}
if switchedToRest && usingWebsocket {
log.Infof(log.SyncMgr,
"%s %s: Websocket re-enabled, switching from rest to websocket\n",
c.Exchange, FormatCurrency(enabledPairs[i]).String())
switchedToRest = false
}
if e.Cfg.SyncTicker {
if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTicker) {
if c.Ticker.LastUpdated.IsZero() || time.Since(c.Ticker.LastUpdated) > e.Cfg.SyncTimeout {
if c.Ticker.IsUsingWebsocket {
if time.Since(c.Created) < e.Cfg.SyncTimeout {
continue
}
if supportsREST {
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, true)
c.Ticker.IsUsingWebsocket = false
c.Ticker.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest\n",
c.Exchange,
FormatCurrency(enabledPairs[i]).String(),
strings.ToUpper(c.AssetType.String()),
e.Cfg.SyncTimeout,
)
switchedToRest = true
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, false)
}
}
if c.Ticker.IsUsingREST {
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, true)
var result *ticker.Price
var err error
if supportsRESTTickerBatching {
e.mux.Lock()
batchLastDone, ok := e.tickerBatchLastRequested[exchangeName]
if !ok {
e.tickerBatchLastRequested[exchangeName] = time.Time{}
}
e.mux.Unlock()
if batchLastDone.IsZero() || time.Since(batchLastDone) > e.Cfg.SyncTimeout {
e.mux.Lock()
if e.Cfg.Verbose {
log.Debugf(log.SyncMgr, "%s Init'ing REST ticker batching\n", exchangeName)
}
result, err = Bot.Exchanges[x].UpdateTicker(c.Pair, c.AssetType)
e.tickerBatchLastRequested[exchangeName] = time.Now()
e.mux.Unlock()
} else {
if e.Cfg.Verbose {
log.Debugf(log.SyncMgr, "%s Using recent batching cache\n", exchangeName)
}
result, err = Bot.Exchanges[x].FetchTicker(c.Pair, c.AssetType)
}
} else {
result, err = Bot.Exchanges[x].UpdateTicker(c.Pair, c.AssetType)
}
printTickerSummary(result, c.Pair, c.AssetType, exchangeName, err)
if err == nil {
//nolint:gocritic Bot.CommsRelayer.StageTickerData(exchangeName, c.AssetType, result)
if Bot.Config.RemoteControl.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "ticker_update", c.AssetType.String(), exchangeName)
}
}
e.update(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, err)
}
} else {
time.Sleep(time.Millisecond * 50)
}
}
}
if e.Cfg.SyncOrderbook {
if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemOrderbook) {
if c.Orderbook.LastUpdated.IsZero() || time.Since(c.Orderbook.LastUpdated) > e.Cfg.SyncTimeout {
if c.Orderbook.IsUsingWebsocket {
if time.Since(c.Created) < e.Cfg.SyncTimeout {
continue
}
if supportsREST {
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, true)
c.Orderbook.IsUsingWebsocket = false
c.Orderbook.IsUsingREST = true
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest\n",
c.Exchange,
FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
e.Cfg.SyncTimeout,
)
switchedToRest = true
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false)
}
}
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, true)
result, err := Bot.Exchanges[x].UpdateOrderbook(c.Pair, c.AssetType)
printOrderbookSummary(result, c.Pair, c.AssetType, exchangeName, err)
if err == nil {
//nolint:gocritic Bot.CommsRelayer.StageOrderbookData(exchangeName, c.AssetType, result)
if Bot.Config.RemoteControl.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "orderbook_update", c.AssetType.String(), exchangeName)
}
}
e.update(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, err)
} else {
time.Sleep(time.Millisecond * 50)
}
}
if e.Cfg.SyncTrades {
if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTrade) {
if c.Trade.LastUpdated.IsZero() || time.Since(c.Trade.LastUpdated) > e.Cfg.SyncTimeout {
e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, true)
e.update(c.Exchange, c.Pair, c.AssetType, SyncItemTrade, nil)
}
}
}
}
}
}
}
}
}
// Start starts an exchange currency pair syncer
func (e *ExchangeCurrencyPairSyncer) Start() {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer started.")
for x := range Bot.Exchanges {
if !Bot.Exchanges[x].IsEnabled() {
continue
}
exchangeName := Bot.Exchanges[x].GetName()
supportsWebsocket := Bot.Exchanges[x].SupportsWebsocket()
assetTypes := Bot.Exchanges[x].GetAssetTypes()
supportsREST := Bot.Exchanges[x].SupportsREST()
if !supportsREST && !supportsWebsocket {
log.Warnf(log.SyncMgr,
"Loaded exchange %s does not support REST or Websocket.\n",
exchangeName)
continue
}
var usingWebsocket bool
var usingREST bool
if supportsWebsocket && Bot.Exchanges[x].IsWebsocketEnabled() {
ws, err := Bot.Exchanges[x].GetWebsocket()
if err != nil {
log.Errorf(log.SyncMgr, "%s failed to get websocket. Err: %s\n",
exchangeName, err)
usingREST = true
}
if !ws.IsConnected() && !ws.IsConnecting() {
go WebsocketDataHandler(ws)
err = ws.Connect()
if err != nil {
log.Errorf(log.SyncMgr, "%s websocket failed to connect. Err: %s\n",
exchangeName, err)
usingREST = true
} else {
usingWebsocket = true
}
} else {
usingWebsocket = true
}
} else if supportsREST {
usingREST = true
}
for y := range assetTypes {
enabledPairs := Bot.Exchanges[x].GetEnabledPairs(assetTypes[y])
for i := range enabledPairs {
if e.exists(exchangeName, enabledPairs[i], assetTypes[y]) {
continue
}
c := CurrencyPairSyncAgent{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i],
}
if e.Cfg.SyncTicker {
c.Ticker = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
if e.Cfg.SyncOrderbook {
c.Orderbook = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
if e.Cfg.SyncTrades {
c.Trade = SyncBase{
IsUsingREST: usingREST,
IsUsingWebsocket: usingWebsocket,
}
}
e.add(&c)
}
}
}
if atomic.CompareAndSwapInt32(&e.initSyncStarted, 0, 1) {
log.Debugf(log.SyncMgr,
"Exchange CurrencyPairSyncer initial sync started. %d items to process.\n",
createdCounter)
e.initSyncStartTime = time.Now()
}
go func() {
e.initSyncWG.Wait()
if atomic.CompareAndSwapInt32(&e.initSyncCompleted, 0, 1) {
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync is complete.\n")
completedTime := time.Now()
log.Debugf(log.SyncMgr, "Exchange CurrencyPairSyncer initial sync took %v [%v sync items].\n",
completedTime.Sub(e.initSyncStartTime), createdCounter)
if !e.Cfg.SyncContinuously {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopping.")
e.Stop()
return
}
}
}()
if atomic.LoadInt32(&e.initSyncCompleted) == 1 && !e.Cfg.SyncContinuously {
return
}
for i := 0; i < e.Cfg.NumWorkers; i++ {
go e.worker()
}
}
// Stop shuts down the exchange currency pair syncer
func (e *ExchangeCurrencyPairSyncer) Stop() {
stopped := atomic.CompareAndSwapInt32(&e.shutdown, 0, 1)
if stopped {
log.Debugln(log.SyncMgr, "Exchange CurrencyPairSyncer stopped.")
}
}