From 6613c56738a09c0e23f115daae9020ccdf294cd8 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Mon, 10 Feb 2025 16:15:33 +1100 Subject: [PATCH] stream/buffer: remove time.ticker publish limiter for websocket orderbook updates (#1681) * stream/buffer: remove publish period for for signalling change through data handler * Add config upgrade and downgrade * linter: fix * GK: nits * Update config/versions/v3.go Co-authored-by: Adrian Gallagher * Update config/versions/v3.go Co-authored-by: Adrian Gallagher * Update config/versions/v3.go Co-authored-by: Adrian Gallagher * Update config/versions/v3.go Co-authored-by: Adrian Gallagher * Update config/versions/v3.go Co-authored-by: Adrian Gallagher * config: fix up commited suggestion --------- Co-authored-by: Ryan O'Hara-Reid Co-authored-by: Adrian Gallagher --- config/config.go | 8 ----- config/config_types.go | 4 --- config/versions/v3.go | 39 +++++++++++++++++++++++ config/versions/v3_test.go | 42 +++++++++++++++++++++++++ config_example.json | 9 ++---- exchanges/stream/buffer/buffer.go | 33 +------------------ exchanges/stream/buffer/buffer_types.go | 13 ++------ testdata/configtest.json | 42 +++++++++---------------- 8 files changed, 101 insertions(+), 89 deletions(-) create mode 100644 config/versions/v3.go create mode 100644 config/versions/v3_test.go diff --git a/config/config.go b/config/config.go index 9f61ed14..081c2a25 100644 --- a/config/config.go +++ b/config/config.go @@ -1002,14 +1002,6 @@ func (c *Config) CheckExchangeConfigValues() error { defaultWebsocketOrderbookBufferLimit) e.Orderbook.WebsocketBufferLimit = defaultWebsocketOrderbookBufferLimit } - if e.Orderbook.PublishPeriod == nil || e.Orderbook.PublishPeriod.Nanoseconds() < 0 { - log.Warnf(log.ConfigMgr, - "Exchange %s Websocket orderbook publish period value not set, defaulting to %v.", - e.Name, - DefaultOrderbookPublishPeriod) - publishPeriod := DefaultOrderbookPublishPeriod - e.Orderbook.PublishPeriod = &publishPeriod - } err := c.CheckPairConsistency(e.Name) if err != nil { log.Errorf(log.ConfigMgr, diff --git a/config/config_types.go b/config/config_types.go index f1f8cd84..b18e80dc 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -38,7 +38,6 @@ const ( defaultDataHistoryMonitorCheckTimer = time.Minute defaultCurrencyStateManagerDelay = time.Minute defaultMaxJobsPerCycle = 5 - DefaultOrderbookPublishPeriod = time.Second * 10 // DefaultSyncerWorkers limits the number of sync workers DefaultSyncerWorkers = 15 // DefaultSyncerTimeoutREST the default time to switch from REST to websocket protocols without a response @@ -368,7 +367,4 @@ type Orderbook struct { VerificationBypass bool `json:"verificationBypass"` WebsocketBufferLimit int `json:"websocketBufferLimit"` WebsocketBufferEnabled bool `json:"websocketBufferEnabled"` - // PublishPeriod here is a pointer because we want to distinguish - // between zeroed out and missing. - PublishPeriod *time.Duration `json:"publishPeriod"` } diff --git a/config/versions/v3.go b/config/versions/v3.go new file mode 100644 index 00000000..b7773c9a --- /dev/null +++ b/config/versions/v3.go @@ -0,0 +1,39 @@ +package versions + +import ( + "context" + "encoding/json" + "time" + + "github.com/buger/jsonparser" +) + +// Version3 is an ExchangeVersion to remove the publishPeriod from the exchange's orderbook config +type Version3 struct{} + +func init() { + Manager.registerVersion(3, &Version3{}) +} + +// Exchanges returns all exchanges: "*" +func (v *Version3) Exchanges() []string { return []string{"*"} } + +// UpgradeExchange will remove the publishPeriod from the exchange's orderbook config +func (v *Version3) UpgradeExchange(_ context.Context, e []byte) ([]byte, error) { + e = jsonparser.Delete(e, "orderbook", "publishPeriod") + return e, nil +} + +const defaultOrderbookPublishPeriod = time.Second * 10 + +// DowngradeExchange will downgrade the exchange's config by setting the default orderbook publish period +func (v *Version3) DowngradeExchange(_ context.Context, e []byte) ([]byte, error) { + if _, _, _, err := jsonparser.Get(e, "orderbook"); err != nil { + return e, nil //nolint:nilerr // No error, just return the original config + } + out, err := json.Marshal(defaultOrderbookPublishPeriod) + if err != nil { + return e, err + } + return jsonparser.Set(e, out, "orderbook", "publishPeriod") +} diff --git a/config/versions/v3_test.go b/config/versions/v3_test.go new file mode 100644 index 00000000..e816effc --- /dev/null +++ b/config/versions/v3_test.go @@ -0,0 +1,42 @@ +package versions + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVersion3UpgradeExchange(t *testing.T) { + t.Parallel() + + got, err := (&Version3{}).UpgradeExchange(context.Background(), nil) + require.NoError(t, err) + require.Nil(t, got) + + payload := []byte(`{"orderbook": {"verificationBypass": false,"websocketBufferLimit": 5,"websocketBufferEnabled": false,"publishPeriod": 10000000000}}`) + expected := []byte(`{"orderbook": {"verificationBypass": false,"websocketBufferLimit": 5,"websocketBufferEnabled": false}}`) + got, err = (&Version3{}).UpgradeExchange(context.Background(), payload) + require.NoError(t, err) + require.Equal(t, expected, got) +} + +func TestVersion3DowngradeExchange(t *testing.T) { + t.Parallel() + + got, err := (&Version3{}).DowngradeExchange(context.Background(), nil) + require.NoError(t, err) + require.Nil(t, got) + + payload := []byte(`{"orderbook": {"verificationBypass": false,"websocketBufferLimit": 5,"websocketBufferEnabled": false}}`) + expected := []byte(`{"orderbook": {"verificationBypass": false,"websocketBufferLimit": 5,"websocketBufferEnabled": false,"publishPeriod":10000000000}}`) + got, err = (&Version3{}).DowngradeExchange(context.Background(), payload) + require.NoError(t, err) + require.Equal(t, expected, got) +} + +func TestVersion3Exchanges(t *testing.T) { + t.Parallel() + assert := require.New(t) + assert.Equal([]string{"*"}, (&Version3{}).Exchanges()) +} diff --git a/config_example.json b/config_example.json index 42c52c09..f0cd15f5 100644 --- a/config_example.json +++ b/config_example.json @@ -574,8 +574,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -812,8 +811,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -2224,8 +2222,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index 8294c3fb..e49e793d 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "sort" - "time" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/config" @@ -63,13 +62,6 @@ func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandle w.dataHandler = dataHandler w.ob = make(map[key.PairAsset]*orderbookHolder) w.verbose = exchangeConfig.Verbose - - // set default publish period if missing - orderbookPublishPeriod := config.DefaultOrderbookPublishPeriod - if exchangeConfig.Orderbook.PublishPeriod != nil { - orderbookPublishPeriod = *exchangeConfig.Orderbook.PublishPeriod - } - w.publishPeriod = orderbookPublishPeriod w.updateIDProgression = c.UpdateIDProgression w.checksum = c.Checksum return nil @@ -172,25 +164,6 @@ func (w *Orderbook) Update(u *orderbook.Update) error { // Publish all state changes, disregarding verbosity or sync requirements. book.ob.Publish() - - if book.ticker != nil { - select { - case <-book.ticker.C: - // Send update to engine websocket manager to update engine - // sync manager to reset websocket orderbook sync timeout. This will - // stop the fall over to REST protocol fetching of orderbook data. - default: - if !w.verbose { - // We do not need to send an update to the sync manager within - // this time window unless verbose is turned on. - return nil - } - } - } - - // A nil ticker means that a zero publish period has been set and the entire - // websocket updates will be sent to the engine websocket manager for - // display purposes. Same as being verbose. w.dataHandler <- book.ob return nil } @@ -320,11 +293,7 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { depth.AssignOptions(book) buffer := make([]orderbook.Update, w.obBufferLimit) - var ticker *time.Ticker - if w.publishPeriod != 0 { - ticker = time.NewTicker(w.publishPeriod) - } - holder = &orderbookHolder{ob: depth, buffer: &buffer, ticker: ticker} + holder = &orderbookHolder{ob: depth, buffer: &buffer} w.ob[key.PairAsset{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] = holder } diff --git a/exchanges/stream/buffer/buffer_types.go b/exchanges/stream/buffer/buffer_types.go index 9383e361..3451e413 100644 --- a/exchanges/stream/buffer/buffer_types.go +++ b/exchanges/stream/buffer/buffer_types.go @@ -2,7 +2,6 @@ package buffer import ( "sync" - "time" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" @@ -44,9 +43,6 @@ type Orderbook struct { updateIDProgression bool // checksum is a package defined checksum calculation for updated books. checksum func(state *orderbook.Base, checksum uint32) error - - publishPeriod time.Duration - // TODO: sync.RWMutex. For the moment we process the orderbook in a single // thread. In future when there are workers directly involved this can be // can be improved with RW mechanics which will allow updates to occur at @@ -57,12 +53,7 @@ type Orderbook struct { // orderbookHolder defines a store of pending updates and a pointer to the // orderbook depth type orderbookHolder struct { - ob *orderbook.Depth - buffer *[]orderbook.Update - // Reduces the amount of outbound alerts to the data handler for example - // coinbasepro can have up too 100 updates per second introducing overhead. - // The sync agent only requires an alert every 15 seconds for a specific - // currency. - ticker *time.Ticker + ob *orderbook.Depth + buffer *[]orderbook.Update updateID int64 } diff --git a/testdata/configtest.json b/testdata/configtest.json index 2d4ad2fd..2d7a7624 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -318,8 +318,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -399,8 +398,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -480,8 +478,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -564,8 +561,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -681,8 +677,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -844,8 +839,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -946,8 +940,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -1152,8 +1145,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -1638,8 +1630,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -1716,8 +1707,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -1797,8 +1787,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -1903,8 +1892,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -2000,8 +1988,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, { @@ -2303,8 +2290,7 @@ "orderbook": { "verificationBypass": false, "websocketBufferLimit": 5, - "websocketBufferEnabled": false, - "publishPeriod": 10000000000 + "websocketBufferEnabled": false } }, {