futures: Implement GetLatestFundingRates across exchanges (#1339)

* adds funding rate implementations and improvements

* merge fixes x1

* lint

* kucoin funding rates func make

* migrate sync-manager to keys

* some kucoin work

* adds some kucoin wrapper funcs

* ehhh, todo

* kucoin position

* start of orders

* adds the kucoin tests yay

* multiplier

* nits, EWS includes order limits

* NotYetImplemented, IsPerp improvements, cleaning

* lint, test fix, huobi time

* fixes issues, improves testing

* fixes linters I WRECKED

* local lint but remote lint, lint, lint, lint

* fixes err

* skip CI

* lint

* Supported rates, binance endpoints

* fixes weird mocktest problems

* no, CZ is invalid

* fixes some new EWS test errors
This commit is contained in:
Scott
2023-11-03 10:01:32 +10:00
committed by GitHub
parent f9437dbd08
commit 70690d9a04
78 changed files with 4088 additions and 527 deletions

View File

@@ -36,7 +36,7 @@ type Engine struct {
apiServer *apiServerManager
CommunicationsManager *CommunicationManager
connectionManager *connectionManager
currencyPairSyncer *syncManager
currencyPairSyncer *SyncManager
DatabaseManager *DatabaseConnectionManager
DepositAddressManager *DepositAddressManager
eventManager *eventManager
@@ -513,7 +513,7 @@ func (bot *Engine) Start() error {
bot.Settings.SyncWorkersCount != config.DefaultSyncerWorkers {
cfg.NumWorkers = bot.Settings.SyncWorkersCount
}
if s, err := setupSyncManager(
if s, err := SetupSyncManager(
&cfg,
bot.ExchangeManager,
&bot.Config.RemoteControl,

View File

@@ -225,7 +225,7 @@ func (bot *Engine) SetSubsystem(subSystemName string, enable bool) error {
bot.Settings.SyncWorkersCount != config.DefaultSyncerWorkers {
cfg.NumWorkers = bot.Settings.SyncWorkersCount
}
bot.currencyPairSyncer, err = setupSyncManager(
bot.currencyPairSyncer, err = SetupSyncManager(
&cfg,
bot.ExchangeManager,
&bot.Config.RemoteControl,

View File

@@ -9,13 +9,11 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math/big"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@@ -1378,33 +1376,23 @@ func TestNewExchangeByNameWithDefaults(t *testing.T) {
if !errors.Is(err, ErrExchangeNotFound) {
t.Fatalf("received: '%v' but expected: '%v'", err, ErrExchangeNotFound)
}
ch := make(chan error, len(exchange.Exchanges))
wg := sync.WaitGroup{}
for x := range exchange.Exchanges {
wg.Add(1)
go func(x int) {
defer wg.Done()
exch, err := NewExchangeByNameWithDefaults(context.Background(), exchange.Exchanges[x])
name := exchange.Exchanges[x]
t.Run(name, func(t *testing.T) {
t.Parallel()
if isCITest() && common.StringDataContains(blockedCIExchanges, name) {
t.Skipf("skipping %s due to CI test restrictions", name)
}
if common.StringDataContains(unsupportedDefaultConfigExchanges, name) {
t.Skipf("skipping %s unsupported", name)
}
exch, err := NewExchangeByNameWithDefaults(context.Background(), name)
if err != nil {
ch <- err
return
t.Error(err)
}
if !strings.EqualFold(exch.GetName(), exchange.Exchanges[x]) {
ch <- fmt.Errorf("received: '%v' but expected: '%v'", exch.GetName(), exchange.Exchanges[x])
if !strings.EqualFold(exch.GetName(), name) {
t.Errorf("received: '%v' but expected: '%v'", exch.GetName(), name)
}
}(x)
}
wg.Wait()
outta:
for {
select {
case err := <-ch:
t.Error(err)
default:
break outta
}
})
}
}

View File

@@ -807,7 +807,7 @@ func (m *OrderManager) processFuturesPositions(exch exchange.IBotExchange, posit
if !isPerp {
return nil
}
frp, err := exch.GetFundingRates(context.TODO(), &fundingrate.RatesRequest{
frp, err := exch.GetHistoricalFundingRates(context.TODO(), &fundingrate.HistoricalRatesRequest{
Asset: position.Asset,
Pair: position.Pair,
StartDate: position.Orders[0].Date,

View File

@@ -4463,8 +4463,8 @@ func (s *RPCServer) GetFuturesPositionsSummary(ctx context.Context, r *gctrpc.Ge
if !stats.AverageOpenPrice.IsZero() {
positionStats.AverageOpenPrice = stats.AverageOpenPrice.String()
}
if !stats.PositionPNL.IsZero() {
positionStats.RecentPnl = stats.PositionPNL.String()
if !stats.UnrealisedPNL.IsZero() {
positionStats.RecentPnl = stats.UnrealisedPNL.String()
}
if !stats.MaintenanceMarginFraction.IsZero() {
positionStats.MarginFraction = stats.MaintenanceMarginFraction.String()
@@ -4701,7 +4701,7 @@ func (s *RPCServer) GetFundingRates(ctx context.Context, r *gctrpc.GetFundingRat
return nil, fmt.Errorf("%w %v", errPairNotEnabled, cp)
}
funding, err := exch.GetFundingRates(ctx, &fundingrate.RatesRequest{
funding, err := exch.GetHistoricalFundingRates(ctx, &fundingrate.HistoricalRatesRequest{
Asset: a,
Pair: cp,
StartDate: start,
@@ -4799,7 +4799,7 @@ func (s *RPCServer) GetLatestFundingRate(ctx context.Context, r *gctrpc.GetLates
return nil, fmt.Errorf("%w %v", errPairNotEnabled, cp)
}
funding, err := exch.GetLatestFundingRate(ctx, &fundingrate.LatestRateRequest{
fundingRates, err := exch.GetLatestFundingRates(ctx, &fundingrate.LatestRateRequest{
Asset: a,
Pair: cp,
IncludePredictedRate: r.IncludePredicted,
@@ -4807,27 +4807,30 @@ func (s *RPCServer) GetLatestFundingRate(ctx context.Context, r *gctrpc.GetLates
if err != nil {
return nil, err
}
if len(fundingRates) != 1 {
return nil, fmt.Errorf("expected 1 funding rate, received %v", len(fundingRates))
}
var response gctrpc.GetLatestFundingRateResponse
fundingData := &gctrpc.FundingData{
Exchange: r.Exchange,
Asset: r.Asset,
Pair: &gctrpc.CurrencyPair{
Delimiter: funding.Pair.Delimiter,
Base: funding.Pair.Base.String(),
Quote: funding.Pair.Quote.String(),
Delimiter: fundingRates[0].Pair.Delimiter,
Base: fundingRates[0].Pair.Base.String(),
Quote: fundingRates[0].Pair.Quote.String(),
},
LatestRate: &gctrpc.FundingRate{
Date: funding.LatestRate.Time.Format(common.SimpleTimeFormatWithTimezone),
Rate: funding.LatestRate.Rate.String(),
Date: fundingRates[0].LatestRate.Time.Format(common.SimpleTimeFormatWithTimezone),
Rate: fundingRates[0].LatestRate.Rate.String(),
},
}
if !funding.TimeOfNextRate.IsZero() {
fundingData.TimeOfNextRate = funding.TimeOfNextRate.Format(common.SimpleTimeFormatWithTimezone)
if !fundingRates[0].TimeOfNextRate.IsZero() {
fundingData.TimeOfNextRate = fundingRates[0].TimeOfNextRate.Format(common.SimpleTimeFormatWithTimezone)
}
if r.IncludePredicted {
fundingData.UpcomingRate = &gctrpc.FundingRate{
Date: funding.PredictedUpcomingRate.Time.Format(common.SimpleTimeFormatWithTimezone),
Rate: funding.PredictedUpcomingRate.Rate.String(),
Date: fundingRates[0].PredictedUpcomingRate.Time.Format(common.SimpleTimeFormatWithTimezone),
Rate: fundingRates[0].PredictedUpcomingRate.Rate.String(),
}
}
response.Rate = fundingData

View File

@@ -70,7 +70,7 @@ func (f fExchange) GetFuturesPositionSummary(context.Context, *futures.PositionS
MarkPrice: leet,
CurrentSize: leet,
AverageOpenPrice: leet,
PositionPNL: leet,
UnrealisedPNL: leet,
MaintenanceMarginFraction: leet,
FreeCollateral: leet,
TotalCollateral: leet,
@@ -141,29 +141,31 @@ func (f fExchange) GetFuturesPositionOrders(_ context.Context, req *futures.Posi
return resp, nil
}
func (f fExchange) GetLatestFundingRate(_ context.Context, request *fundingrate.LatestRateRequest) (*fundingrate.LatestRateResponse, error) {
func (f fExchange) GetLatestFundingRates(_ context.Context, request *fundingrate.LatestRateRequest) ([]fundingrate.LatestRateResponse, error) {
leet := decimal.NewFromInt(1337)
return &fundingrate.LatestRateResponse{
Exchange: f.GetName(),
Asset: request.Asset,
Pair: request.Pair,
LatestRate: fundingrate.Rate{
Time: time.Now(),
Rate: leet,
Payment: leet,
return []fundingrate.LatestRateResponse{
{
Exchange: f.GetName(),
Asset: request.Asset,
Pair: request.Pair,
LatestRate: fundingrate.Rate{
Time: time.Now(),
Rate: leet,
Payment: leet,
},
PredictedUpcomingRate: fundingrate.Rate{
Time: time.Now(),
Rate: leet,
Payment: leet,
},
TimeOfNextRate: time.Now(),
},
PredictedUpcomingRate: fundingrate.Rate{
Time: time.Now(),
Rate: leet,
Payment: leet,
},
TimeOfNextRate: time.Now(),
}, nil
}
func (f fExchange) GetFundingRates(_ context.Context, request *fundingrate.RatesRequest) (*fundingrate.Rates, error) {
func (f fExchange) GetHistoricalFundingRates(_ context.Context, request *fundingrate.HistoricalRatesRequest) (*fundingrate.HistoricalRates, error) {
leet := decimal.NewFromInt(1337)
return &fundingrate.Rates{
return &fundingrate.HistoricalRates{
Exchange: f.GetName(),
Asset: request.Asset,
Pair: request.Pair,

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
@@ -39,8 +40,8 @@ var (
errCouldNotSyncNewData = errors.New("could not sync new data")
)
// setupSyncManager starts a new CurrencyPairSyncer
func setupSyncManager(c *config.SyncManagerConfig, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*syncManager, error) {
// SetupSyncManager creates a new CurrencyPairSyncer
func SetupSyncManager(c *config.SyncManagerConfig, exchangeManager iExchangeManager, remoteConfig *config.RemoteControlConfig, websocketRoutineManagerEnabled bool) (*SyncManager, error) {
if c == nil {
return nil, fmt.Errorf("%T %w", c, common.ErrNilPointer)
}
@@ -79,15 +80,15 @@ func setupSyncManager(c *config.SyncManagerConfig, exchangeManager iExchangeMana
return nil, fmt.Errorf("%T %w", c.PairFormatDisplay, common.ErrNilPointer)
}
s := &syncManager{
s := &SyncManager{
config: *c,
remoteConfig: remoteConfig,
exchangeManager: exchangeManager,
websocketRoutineManagerEnabled: websocketRoutineManagerEnabled,
fiatDisplayCurrency: c.FiatDisplayCurrency,
format: *c.PairFormatDisplay,
tickerBatchLastRequested: make(map[string]time.Time),
currencyPairs: make(map[currencyPairKey]*currencyPairSyncAgent),
tickerBatchLastRequested: make(map[key.ExchangeAsset]time.Time),
currencyPairs: make(map[key.ExchangePairAsset]*currencyPairSyncAgent),
}
log.Debugf(log.SyncMgr,
@@ -102,12 +103,12 @@ func setupSyncManager(c *config.SyncManagerConfig, exchangeManager iExchangeMana
}
// IsRunning safely checks whether the subsystem is running
func (m *syncManager) IsRunning() bool {
func (m *SyncManager) IsRunning() bool {
return m != nil && atomic.LoadInt32(&m.started) == 1
}
// Start runs the subsystem
func (m *syncManager) Start() error {
func (m *SyncManager) Start() error {
if m == nil {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem)
}
@@ -175,10 +176,11 @@ func (m *syncManager) Start() error {
continue
}
for i := range enabledPairs {
k := currencyPairKey{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i].Format(currency.PairFormat{Uppercase: true}),
k := key.ExchangePairAsset{
Asset: assetTypes[y],
Exchange: exchangeName,
Base: enabledPairs[i].Base.Item,
Quote: enabledPairs[i].Quote.Item,
}
if e := m.get(k); e != nil {
continue
@@ -235,7 +237,7 @@ func (m *syncManager) Start() error {
}
// Stop shuts down the exchange currency pair syncer
func (m *syncManager) Stop() error {
func (m *SyncManager) Stop() error {
if m == nil {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem)
}
@@ -248,22 +250,23 @@ func (m *syncManager) Stop() error {
return nil
}
func (m *syncManager) get(k currencyPairKey) *currencyPairSyncAgent {
func (m *SyncManager) get(k key.ExchangePairAsset) *currencyPairSyncAgent {
m.mux.Lock()
defer m.mux.Unlock()
return m.currencyPairs[k]
}
func newCurrencyPairSyncAgent(k currencyPairKey) *currencyPairSyncAgent {
func newCurrencyPairSyncAgent(k key.ExchangePairAsset) *currencyPairSyncAgent {
return &currencyPairSyncAgent{
currencyPairKey: k,
Created: time.Now(),
locks: make([]sync.Mutex, SyncItemTrade+1),
trackers: make([]*syncBase, SyncItemTrade+1),
Key: k,
Pair: currency.NewPair(k.Base.Currency(), k.Quote.Currency()),
Created: time.Now(),
locks: make([]sync.Mutex, SyncItemTrade+1),
trackers: make([]*syncBase, SyncItemTrade+1),
}
}
func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent {
func (m *SyncManager) add(k key.ExchangePairAsset, s syncBase) *currencyPairSyncAgent {
m.mux.Lock()
defer m.mux.Unlock()
@@ -288,7 +291,9 @@ func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added ticker sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemTicker].IsUsingWebsocket,
c.Key.Exchange,
m.FormatCurrency(c.Pair),
c.trackers[SyncItemTicker].IsUsingWebsocket,
c.trackers[SyncItemTicker].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
@@ -301,7 +306,9 @@ func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added orderbook sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemOrderbook].IsUsingWebsocket,
k.Exchange,
m.FormatCurrency(c.Pair),
c.trackers[SyncItemOrderbook].IsUsingWebsocket,
c.trackers[SyncItemOrderbook].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
@@ -314,7 +321,9 @@ func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent
if m.config.Verbose {
log.Debugf(log.SyncMgr,
"%s: Added trade sync item %v: using websocket: %v using REST: %v",
c.Exchange, m.FormatCurrency(c.Pair).String(), c.trackers[SyncItemTrade].IsUsingWebsocket,
k.Exchange,
m.FormatCurrency(c.Pair),
c.trackers[SyncItemTrade].IsUsingWebsocket,
c.trackers[SyncItemTrade].IsUsingREST)
}
if atomic.LoadInt32(&m.initSyncCompleted) != 1 {
@@ -324,7 +333,7 @@ func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent
}
if m.currencyPairs == nil {
m.currencyPairs = make(map[currencyPairKey]*currencyPairSyncAgent)
m.currencyPairs = make(map[key.ExchangePairAsset]*currencyPairSyncAgent)
}
m.currencyPairs[k] = c
@@ -332,9 +341,9 @@ func (m *syncManager) add(k currencyPairKey, s syncBase) *currencyPairSyncAgent
return c
}
// WebsocketUpdate notifies the syncManager to change the last updated time for a exchange asset pair
// WebsocketUpdate notifies the SyncManager to change the last updated time for a exchange asset pair
// And set IsUsingWebsocket to true. It should be used externally only from websocket updaters
func (m *syncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a asset.Item, syncType syncItemType, err error) error {
func (m *SyncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a asset.Item, syncType syncItemType, err error) error {
if m == nil {
return fmt.Errorf("exchange CurrencyPairSyncer %w", ErrNilSubsystem)
}
@@ -362,15 +371,22 @@ func (m *syncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a as
return fmt.Errorf("%v %w", syncType, errUnknownSyncItem)
}
k := currencyPairKey{
AssetType: a,
Exchange: exchangeName,
Pair: p.Format(currency.PairFormat{Uppercase: true}),
k := key.ExchangePairAsset{
Asset: a,
Exchange: exchangeName,
Base: p.Base.Item,
Quote: p.Quote.Item,
}
c, exists := m.currencyPairs[k]
if !exists {
return fmt.Errorf("%w for %s %s %s %s", errCouldNotSyncNewData, k.Exchange, k.Pair, k.AssetType, syncType)
return fmt.Errorf("%w for %s %s %s %s %s",
errCouldNotSyncNewData,
k.Exchange,
k.Base,
k.Quote,
k.Asset,
syncType)
}
c.locks[syncType].Lock()
@@ -387,9 +403,9 @@ func (m *syncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a as
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: %s Websocket re-enabled, switching from rest to websocket",
c.Exchange,
k.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
strings.ToUpper(k.Asset.String()),
syncType,
)
}
@@ -398,8 +414,8 @@ func (m *syncManager) WebsocketUpdate(exchangeName string, p currency.Pair, a as
return m.update(c, syncType, err)
}
// update notifies the syncManager to change the last updated time for a exchange asset pair
func (m *syncManager) update(c *currencyPairSyncAgent, syncType syncItemType, err error) error {
// update notifies the SyncManager to change the last updated time for a exchange asset pair
func (m *SyncManager) update(c *currencyPairSyncAgent, syncType syncItemType, err error) error {
if syncType < SyncItemTicker || syncType > SyncItemTrade {
return fmt.Errorf("%v %w", syncType, errUnknownSyncItem)
}
@@ -416,7 +432,7 @@ func (m *syncManager) update(c *currencyPairSyncAgent, syncType syncItemType, er
removedCounter++
if m.config.LogInitialSyncEvents {
log.Debugf(log.SyncMgr, "%s %s sync complete %v [%d/%d].",
c.Exchange,
c.Key.Exchange,
syncType,
m.FormatCurrency(c.Pair),
removedCounter,
@@ -428,7 +444,7 @@ func (m *syncManager) update(c *currencyPairSyncAgent, syncType syncItemType, er
return nil
}
func (m *syncManager) worker() {
func (m *SyncManager) worker() {
cleanup := func() {
log.Debugln(log.SyncMgr,
"Exchange CurrencyPairSyncer worker shutting down.")
@@ -492,10 +508,11 @@ func (m *syncManager) worker() {
return
}
k := currencyPairKey{
AssetType: assetTypes[y],
Exchange: exchangeName,
Pair: enabledPairs[i].Format(currency.PairFormat{Uppercase: true}),
k := key.ExchangePairAsset{
Asset: assetTypes[y],
Exchange: exchangeName,
Base: enabledPairs[i].Base.Item,
Quote: enabledPairs[i].Quote.Item,
}
c := m.get(k)
if c == nil {
@@ -521,7 +538,7 @@ func (m *syncManager) worker() {
}
}
func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchange) {
func (m *SyncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchange) {
if !c.locks[SyncItemTicker].TryLock() {
return
}
@@ -541,9 +558,9 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: No ticker update after %s, switching from websocket to rest",
c.Exchange,
c.Key.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.AssetType.String()),
strings.ToUpper(c.Key.Asset.String()),
m.config.TimeoutWebsocket,
)
}
@@ -555,9 +572,15 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
if e.SupportsRESTTickerBatchUpdates() {
m.mux.Lock()
batchLastDone, ok := m.tickerBatchLastRequested[e.GetName()]
batchLastDone, ok := m.tickerBatchLastRequested[key.ExchangeAsset{
Exchange: c.Key.Exchange,
Asset: c.Key.Asset,
}]
if !ok {
m.tickerBatchLastRequested[exchangeName] = time.Time{}
m.tickerBatchLastRequested[key.ExchangeAsset{
Exchange: c.Key.Exchange,
Asset: c.Key.Asset,
}] = time.Time{}
}
m.mux.Unlock()
@@ -566,11 +589,14 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
if m.config.Verbose {
log.Debugf(log.SyncMgr, "Initialising %s REST ticker batching", exchangeName)
}
err = e.UpdateTickers(context.TODO(), c.AssetType)
err = e.UpdateTickers(context.TODO(), c.Key.Asset)
if err == nil {
result, err = e.FetchTicker(context.TODO(), c.Pair, c.AssetType)
result, err = e.FetchTicker(context.TODO(), c.Pair, c.Key.Asset)
}
m.tickerBatchLastRequested[exchangeName] = time.Now()
m.tickerBatchLastRequested[key.ExchangeAsset{
Exchange: c.Key.Exchange,
Asset: c.Key.Asset,
}] = time.Now()
m.mux.Unlock()
} else {
if m.config.Verbose {
@@ -578,17 +604,17 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
}
result, err = e.FetchTicker(context.TODO(),
c.Pair,
c.AssetType)
c.Key.Asset)
}
} else {
result, err = e.UpdateTicker(context.TODO(),
c.Pair,
c.AssetType)
c.Key.Asset)
}
m.PrintTickerSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "ticker_update", c.AssetType.String(), exchangeName)
relayWebsocketEvent(result, "ticker_update", c.Key.Asset.String(), exchangeName)
}
}
updateErr := m.update(c, SyncItemTicker, err)
@@ -598,7 +624,7 @@ func (m *syncManager) syncTicker(c *currencyPairSyncAgent, e exchange.IBotExchan
}
}
func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExchange) {
func (m *SyncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExchange) {
if !c.locks[SyncItemOrderbook].TryLock() {
return
}
@@ -622,9 +648,9 @@ func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExc
if m.config.LogSwitchProtocolEvents {
log.Warnf(log.SyncMgr,
"%s %s %s: No orderbook update after %s, switching from websocket to rest",
c.Exchange,
m.FormatCurrency(c.Pair).String(),
strings.ToUpper(c.AssetType.String()),
c.Key.Exchange,
m.FormatCurrency(c.Pair),
strings.ToUpper(c.Key.Asset.String()),
m.config.TimeoutWebsocket,
)
}
@@ -633,11 +659,11 @@ func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExc
if s.IsUsingREST && time.Since(s.LastUpdated) > m.config.TimeoutREST {
result, err := e.UpdateOrderbook(context.TODO(),
c.Pair,
c.AssetType)
c.Key.Asset)
m.PrintOrderbookSummary(result, "REST", err)
if err == nil {
if m.remoteConfig.WebsocketRPC.Enabled {
relayWebsocketEvent(result, "orderbook_update", c.AssetType.String(), e.GetName())
relayWebsocketEvent(result, "orderbook_update", c.Key.Asset.String(), e.GetName())
}
}
updateErr := m.update(c, SyncItemOrderbook, err)
@@ -647,7 +673,7 @@ func (m *syncManager) syncOrderbook(c *currencyPairSyncAgent, e exchange.IBotExc
}
}
func (m *syncManager) syncTrades(c *currencyPairSyncAgent) {
func (m *SyncManager) syncTrades(c *currencyPairSyncAgent) {
if !c.locks[SyncItemTrade].TryLock() {
return
}
@@ -703,7 +729,7 @@ func printConvertCurrencyFormat(origPrice float64, origCurrency, displayCurrency
}
// PrintTickerSummary outputs the ticker results
func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string, err error) {
func (m *SyncManager) PrintTickerSummary(result *ticker.Price, protocol string, err error) {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return
}
@@ -777,11 +803,11 @@ func (m *syncManager) PrintTickerSummary(result *ticker.Price, protocol string,
// FormatCurrency is a method that formats and returns a currency pair
// based on the user currency display preferences
func (m *syncManager) FormatCurrency(p currency.Pair) currency.Pair {
func (m *SyncManager) FormatCurrency(cp currency.Pair) string {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return p
return ""
}
return p.Format(m.format)
return m.format.Format(cp)
}
const (
@@ -789,7 +815,7 @@ const (
)
// PrintOrderbookSummary outputs orderbook results
func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol string, err error) {
func (m *SyncManager) PrintOrderbookSummary(result *orderbook.Base, protocol string, err error) {
if m == nil || atomic.LoadInt32(&m.started) == 0 {
return
}
@@ -863,7 +889,7 @@ func (m *syncManager) PrintOrderbookSummary(result *orderbook.Base, protocol str
// WaitForInitialSync allows for a routine to wait for an initial sync to be
// completed without exposing the underlying type. This needs to be called in a
// separate routine.
func (m *syncManager) WaitForInitialSync() error {
func (m *SyncManager) WaitForInitialSync() error {
if m == nil {
return fmt.Errorf("sync manager %w", ErrNilSubsystem)
}

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
@@ -15,42 +16,42 @@ import (
func TestSetupSyncManager(t *testing.T) {
t.Parallel()
_, err := setupSyncManager(nil, nil, nil, false)
_, err := SetupSyncManager(nil, nil, nil, false)
if !errors.Is(err, common.ErrNilPointer) {
t.Errorf("error '%v', expected '%v'", err, common.ErrNilPointer)
}
_, err = setupSyncManager(&config.SyncManagerConfig{}, nil, nil, false)
_, err = SetupSyncManager(&config.SyncManagerConfig{}, nil, nil, false)
if !errors.Is(err, errNoSyncItemsEnabled) {
t.Errorf("error '%v', expected '%v'", err, errNoSyncItemsEnabled)
}
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, nil, nil, false)
_, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, nil, nil, false)
if !errors.Is(err, errNilExchangeManager) {
t.Errorf("error '%v', expected '%v'", err, errNilExchangeManager)
}
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, nil, false)
_, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, nil, false)
if !errors.Is(err, errNilConfig) {
t.Errorf("error '%v', expected '%v'", err, errNilConfig)
}
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, currency.ErrCurrencyCodeEmpty) {
t.Errorf("error '%v', expected '%v'", err, currency.ErrCurrencyCodeEmpty)
}
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.BTC}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.BTC}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, currency.ErrFiatDisplayCurrencyIsNotFiat) {
t.Errorf("error '%v', expected '%v'", err, currency.ErrFiatDisplayCurrencyIsNotFiat)
}
_, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
_, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, common.ErrNilPointer) {
t.Errorf("error '%v', expected '%v'", err, common.ErrNilPointer)
}
m, err := setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
m, err := SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -61,7 +62,7 @@ func TestSetupSyncManager(t *testing.T) {
func TestSyncManagerStart(t *testing.T) {
t.Parallel()
m, err := setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
m, err := SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, &ExchangeManager{}, &config.RemoteControlConfig{}, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -96,7 +97,7 @@ func TestSyncManagerStart(t *testing.T) {
func TestSyncManagerStop(t *testing.T) {
t.Parallel()
var m *syncManager
var m *SyncManager
err := m.Stop()
if !errors.Is(err, ErrNilSubsystem) {
t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem)
@@ -112,7 +113,7 @@ func TestSyncManagerStop(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -150,7 +151,7 @@ func TestPrintConvertCurrencyFormat(t *testing.T) {
func TestPrintTickerSummary(t *testing.T) {
t.Parallel()
var m *syncManager
var m *SyncManager
m.PrintTickerSummary(&ticker.Price{}, "REST", nil)
em := NewExchangeManager()
@@ -163,7 +164,7 @@ func TestPrintTickerSummary(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -192,7 +193,7 @@ func TestPrintTickerSummary(t *testing.T) {
func TestPrintOrderbookSummary(t *testing.T) {
t.Parallel()
var m *syncManager
var m *SyncManager
m.PrintOrderbookSummary(nil, "REST", nil)
em := NewExchangeManager()
@@ -205,7 +206,7 @@ func TestPrintOrderbookSummary(t *testing.T) {
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
m, err = setupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
m, err = SetupSyncManager(&config.SyncManagerConfig{SynchronizeTrades: true, SynchronizeContinuously: true, FiatDisplayCurrency: currency.USD, PairFormatDisplay: &currency.EMPTYFORMAT}, em, &config.RemoteControlConfig{}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -242,13 +243,13 @@ func TestRelayWebsocketEvent(t *testing.T) {
}
func TestWaitForInitialSync(t *testing.T) {
var m *syncManager
var m *SyncManager
err := m.WaitForInitialSync()
if !errors.Is(err, ErrNilSubsystem) {
t.Fatalf("received %v, but expected: %v", err, ErrNilSubsystem)
}
m = &syncManager{}
m = &SyncManager{}
err = m.WaitForInitialSync()
if !errors.Is(err, ErrSubSystemNotStarted) {
t.Fatalf("received %v, but expected: %v", err, ErrSubSystemNotStarted)
@@ -263,13 +264,13 @@ func TestWaitForInitialSync(t *testing.T) {
func TestSyncManagerWebsocketUpdate(t *testing.T) {
t.Parallel()
var m *syncManager
var m *SyncManager
err := m.WebsocketUpdate("", currency.EMPTYPAIR, 1, 47, nil)
if !errors.Is(err, ErrNilSubsystem) {
t.Fatalf("received %v, but expected: %v", err, ErrNilSubsystem)
}
m = &syncManager{}
m = &SyncManager{}
err = m.WebsocketUpdate("", currency.EMPTYPAIR, 1, 47, nil)
if !errors.Is(err, ErrSubSystemNotStarted) {
t.Fatalf("received %v, but expected: %v", err, ErrSubSystemNotStarted)
@@ -314,9 +315,8 @@ func TestSyncManagerWebsocketUpdate(t *testing.T) {
t.Fatalf("received %v, but expected: %v", err, errCouldNotSyncNewData)
}
m.add(currencyPairKey{
AssetType: asset.Spot,
Pair: currency.EMPTYPAIR.Format(currency.PairFormat{Uppercase: true}),
m.add(key.ExchangePairAsset{
Asset: asset.Spot,
}, syncBase{})
m.initSyncWG.Add(3)
// orderbook match

View File

@@ -4,9 +4,9 @@ import (
"sync"
"time"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
// syncBase stores information
@@ -18,23 +18,17 @@ type syncBase struct {
NumErrors int
}
// currencyPairKey is the map key for the sync agents
type currencyPairKey struct {
Exchange string
AssetType asset.Item
Pair currency.Pair
}
// currencyPairSyncAgent stores the sync agent info
type currencyPairSyncAgent struct {
currencyPairKey
Key key.ExchangePairAsset
Pair currency.Pair
Created time.Time
trackers []*syncBase
locks []sync.Mutex
}
// syncManager stores the exchange currency pair syncer object
type syncManager struct {
// SyncManager stores the exchange currency pair syncer object
type SyncManager struct {
initSyncCompleted int32
initSyncStarted int32
started int32
@@ -47,8 +41,8 @@ type syncManager struct {
initSyncWG sync.WaitGroup
inService sync.WaitGroup
currencyPairs map[currencyPairKey]*currencyPairSyncAgent
tickerBatchLastRequested map[string]time.Time
currencyPairs map[key.ExchangePairAsset]*currencyPairSyncAgent
tickerBatchLastRequested map[key.ExchangeAsset]time.Time
remoteConfig *config.RemoteControlConfig
config config.SyncManagerConfig

View File

@@ -242,6 +242,10 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
}
m.syncer.PrintTickerSummary(&d[x], "websocket", err)
}
case order.Detail,
ticker.Price,
orderbook.Depth:
return errUseAPointer
case stream.KlineData:
if m.verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",

View File

@@ -26,17 +26,17 @@ func TestWebsocketRoutineManagerSetup(t *testing.T) {
if !errors.Is(err, errNilCurrencyPairSyncer) {
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairSyncer)
}
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, nil, false)
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, nil, false)
if !errors.Is(err, errNilCurrencyConfig) {
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyConfig)
}
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, &currency.Config{}, true)
_, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, &currency.Config{}, true)
if !errors.Is(err, errNilCurrencyPairFormat) {
t.Errorf("error '%v', expected '%v'", err, errNilCurrencyPairFormat)
}
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -55,7 +55,7 @@ func TestWebsocketRoutineManagerStart(t *testing.T) {
Uppercase: false,
Delimiter: "-",
}}
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, cfg, true)
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, cfg, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -75,7 +75,7 @@ func TestWebsocketRoutineManagerIsRunning(t *testing.T) {
t.Error("expected false")
}
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
m, err := setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -102,7 +102,7 @@ func TestWebsocketRoutineManagerStop(t *testing.T) {
t.Errorf("error '%v', expected '%v'", err, ErrNilSubsystem)
}
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &syncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
m, err = setupWebsocketRoutineManager(NewExchangeManager(), &OrderManager{}, &SyncManager{}, &currency.Config{CurrencyPairFormat: &currency.PairFormat{}}, false)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}
@@ -146,7 +146,7 @@ func TestWebsocketRoutineManagerHandleData(t *testing.T) {
Uppercase: false,
Delimiter: "-",
}}
m, err := setupWebsocketRoutineManager(em, om, &syncManager{}, cfg, true)
m, err := setupWebsocketRoutineManager(em, om, &SyncManager{}, cfg, true)
if !errors.Is(err, nil) {
t.Errorf("error '%v', expected '%v'", err, nil)
}

View File

@@ -14,6 +14,7 @@ var (
errNilWebsocketDataHandlerFunction = errors.New("websocket data handler function is nil")
errNilWebsocket = errors.New("websocket is nil")
errRoutineManagerNotStarted = errors.New("websocket routine manager not started")
errUseAPointer = errors.New("could not process, pass to websocket routine manager as a pointer")
)
const (