btcmarkets: Add websocket orderbook checksum validation (#900)

* btcmarkets: add websocket checksum, fetch different book via REST

* Update exchanges/btcmarkets/btcmarkets_test.go

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>

* buffer: add explicit type for buffer related variables and comments, do all checks buffer side and load in setup as per glorious recom.

* buffer: fix tests add error

* buffer: test re-add code cov

* depth/stream/ws: fix tests, change field name to be more specific.

* buffer: rm unused field and small comment fixes

* btcm: remove redundant field

* glorious: nits

* buffer: fix commenting

Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
This commit is contained in:
Ryan O'Hara-Reid
2022-03-21 16:19:58 +11:00
committed by GitHub
parent 09fa2f236a
commit 1669f1c626
20 changed files with 356 additions and 118 deletions

View File

@@ -17,6 +17,7 @@ const packageError = "websocket orderbook buffer error: %w"
var (
errExchangeConfigNil = errors.New("exchange config is nil")
errBufferConfigNil = errors.New("buffer config is nil")
errUnsetDataHandler = errors.New("datahandler unset")
errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set")
errUpdateIsNil = errors.New("update is nil")
@@ -26,35 +27,43 @@ var (
)
// Setup sets private variables
func (w *Orderbook) Setup(cfg *config.Exchange, sortBuffer, sortBufferByUpdateIDs, updateEntriesByID bool, dataHandler chan interface{}) error {
if cfg == nil { // exchange config fields are checked in stream package
func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandler chan<- interface{}) error {
if exchangeConfig == nil { // exchange config fields are checked in stream package
// prior to calling this, so further checks are not needed.
return fmt.Errorf(packageError, errExchangeConfigNil)
}
if c == nil {
return fmt.Errorf(packageError, errBufferConfigNil)
}
if dataHandler == nil {
return fmt.Errorf(packageError, errUnsetDataHandler)
}
if cfg.Orderbook.WebsocketBufferEnabled &&
cfg.Orderbook.WebsocketBufferLimit < 1 {
if exchangeConfig.Orderbook.WebsocketBufferEnabled &&
exchangeConfig.Orderbook.WebsocketBufferLimit < 1 {
return fmt.Errorf(packageError, errIssueBufferEnabledButNoLimit)
}
w.bufferEnabled = cfg.Orderbook.WebsocketBufferEnabled
w.obBufferLimit = cfg.Orderbook.WebsocketBufferLimit
w.sortBuffer = sortBuffer
w.sortBufferByUpdateIDs = sortBufferByUpdateIDs
w.updateEntriesByID = updateEntriesByID
w.exchangeName = cfg.Name
// NOTE: These variables are set by config.json under "orderbook" for each
// individual exchange.
w.bufferEnabled = exchangeConfig.Orderbook.WebsocketBufferEnabled
w.obBufferLimit = exchangeConfig.Orderbook.WebsocketBufferLimit
w.sortBuffer = c.SortBuffer
w.sortBufferByUpdateIDs = c.SortBufferByUpdateIDs
w.updateEntriesByID = c.UpdateEntriesByID
w.exchangeName = exchangeConfig.Name
w.dataHandler = dataHandler
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
w.verbose = cfg.Verbose
w.verbose = exchangeConfig.Verbose
// set default publish period if missing
orderbookPublishPeriod := config.DefaultOrderbookPublishPeriod
if cfg.Orderbook.PublishPeriod != nil {
orderbookPublishPeriod = *cfg.Orderbook.PublishPeriod
if exchangeConfig.Orderbook.PublishPeriod != nil {
orderbookPublishPeriod = *exchangeConfig.Orderbook.PublishPeriod
}
w.publishPeriod = orderbookPublishPeriod
w.updateIDProgression = c.UpdateIDProgression
w.checksum = c.Checksum
return nil
}
@@ -86,6 +95,18 @@ func (w *Orderbook) Update(u *Update) error {
u.Asset)
}
// out of order update ID can be skipped
if w.updateIDProgression && u.UpdateID <= book.updateID {
if w.verbose {
log.Warnf(log.WebsocketMgr,
"Exchange %s CurrencyPair: %s AssetType: %s out of order websocket update received",
w.exchangeName,
u.Pair,
u.Asset)
}
return nil
}
// Checks for when the rest protocol overwrites a streaming dominated book
// will stop updating book via incremental updates. This occurs because our
// sync manager (engine/sync.go) timer has elapsed for streaming. Usually
@@ -200,6 +221,13 @@ func (w *Orderbook) processObUpdate(o *orderbookHolder, u *Update) error {
return o.updateByIDAndAction(u)
}
o.updateByPrice(u)
if w.checksum != nil {
err := w.checksum(o.ob.Retrieve(), u.Checksum)
if err != nil {
return err
}
o.updateID = u.UpdateID
}
return nil
}
@@ -281,14 +309,15 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
m2[book.Asset] = holder
}
holder.updateID = book.LastUpdateID
// Checks if book can deploy to linked list
err := book.Verify()
if err != nil {
return err
}
holder.ob.LoadSnapshot(
book.Bids,
holder.ob.LoadSnapshot(book.Bids,
book.Asks,
book.LastUpdateID,
book.LastUpdated,

View File

@@ -392,6 +392,8 @@ func TestOutOfOrderIDs(t *testing.T) {
}
}
var errTest = errors.New("test error")
func TestOrderbookLastUpdateID(t *testing.T) {
holder, _, _, err := createSnapshot()
if err != nil {
@@ -402,6 +404,21 @@ func TestOrderbookLastUpdateID(t *testing.T) {
exp, itemArray[1][0].Price)
}
holder.checksum = func(state *orderbook.Base, checksum uint32) error { return errTest }
err = holder.Update(&Update{
Asks: []orderbook.Item{{Price: 999999}},
Pair: cp,
UpdateID: -1,
Asset: asset.Spot,
})
if !errors.Is(err, errTest) {
t.Fatalf("received: %v but expected: %v", err, errTest)
}
holder.checksum = func(state *orderbook.Base, checksum uint32) error { return nil }
holder.updateIDProgression = true
for i := range itemArray {
asks := itemArray[i]
err = holder.Update(&Update{
@@ -415,6 +432,18 @@ func TestOrderbookLastUpdateID(t *testing.T) {
}
}
// out of order
holder.verbose = true
err = holder.Update(&Update{
Asks: []orderbook.Item{{Price: 999999}},
Pair: cp,
UpdateID: 1,
Asset: asset.Spot,
})
if err != nil {
t.Fatal(err)
}
ob, err := holder.GetOrderbook(cp, asset.Spot)
if err != nil {
t.Fatal(err)
@@ -715,19 +744,25 @@ func TestGetOrderbook(t *testing.T) {
func TestSetup(t *testing.T) {
t.Parallel()
w := Orderbook{}
err := w.Setup(nil, false, false, false, nil)
err := w.Setup(nil, nil, nil)
if !errors.Is(err, errExchangeConfigNil) {
t.Fatalf("expected error %v but received %v", errExchangeConfigNil, err)
}
exchangeConfig := &config.Exchange{}
err = w.Setup(exchangeConfig, false, false, false, nil)
err = w.Setup(exchangeConfig, nil, nil)
if !errors.Is(err, errBufferConfigNil) {
t.Fatalf("expected error %v but received %v", errBufferConfigNil, err)
}
bufferConf := &Config{}
err = w.Setup(exchangeConfig, bufferConf, nil)
if !errors.Is(err, errUnsetDataHandler) {
t.Fatalf("expected error %v but received %v", errUnsetDataHandler, err)
}
exchangeConfig.Orderbook.WebsocketBufferEnabled = true
err = w.Setup(exchangeConfig, false, false, false, make(chan interface{}))
err = w.Setup(exchangeConfig, bufferConf, make(chan interface{}))
if !errors.Is(err, errIssueBufferEnabledButNoLimit) {
t.Fatalf("expected error %v but received %v", errIssueBufferEnabledButNoLimit, err)
}
@@ -735,7 +770,10 @@ func TestSetup(t *testing.T) {
exchangeConfig.Orderbook.WebsocketBufferLimit = 1337
exchangeConfig.Orderbook.WebsocketBufferEnabled = true
exchangeConfig.Name = "test"
err = w.Setup(exchangeConfig, true, true, true, make(chan interface{}))
bufferConf.SortBuffer = true
bufferConf.SortBufferByUpdateIDs = true
bufferConf.UpdateEntriesByID = true
err = w.Setup(exchangeConfig, bufferConf, make(chan interface{}))
if err != nil {
t.Fatal(err)
}
@@ -1010,7 +1048,7 @@ func TestUpdateByIDAndAction(t *testing.T) {
func TestFlushOrderbook(t *testing.T) {
t.Parallel()
w := &Orderbook{}
err := w.Setup(&config.Exchange{Name: "test"}, false, false, false, make(chan interface{}, 2))
err := w.Setup(&config.Exchange{Name: "test"}, &Config{}, make(chan interface{}, 2))
if err != nil {
t.Fatal(err)
}

View File

@@ -9,6 +9,24 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
// Config defines the configuration variables for the websocket buffer; snapshot
// and incremental update orderbook processing.
type Config struct {
// SortBuffer enables a websocket to sort incoming updates before processing.
SortBuffer bool
// SortBufferByUpdateIDs allows the sorting of the buffered updates by their
// corresponding update IDs.
SortBufferByUpdateIDs bool
// UpdateEntriesByID will match by IDs instead of price to perform the an
// action. e.g. update, delete, insert.
UpdateEntriesByID bool
// UpdateIDProgression requires that the new update ID be greater than the
// prior ID. This will skip processing and not error.
UpdateIDProgression bool
// Checksum is a package defined checksum calculation for updated books.
Checksum func(state *orderbook.Base, checksum uint32) error
}
// Orderbook defines a local cache of orderbooks for amending, appending
// and deleting changes and updates the main store for a stream
type Orderbook struct {
@@ -19,10 +37,17 @@ type Orderbook struct {
sortBufferByUpdateIDs bool // When timestamps aren't provided, an id can help sort
updateEntriesByID bool // Use the update IDs to match ob entries
exchangeName string
dataHandler chan interface{}
dataHandler chan<- interface{}
verbose bool
publishPeriod time.Duration
m sync.Mutex
// updateIDProgression requires that the new update ID be greater than the
// prior ID. This will skip processing and not error.
updateIDProgression bool
// checksum is a package defined checksum calculation for updated books.
checksum func(state *orderbook.Base, checksum uint32) error
publishPeriod time.Duration
m sync.Mutex
}
// orderbookHolder defines a store of pending updates and a pointer to the
@@ -34,7 +59,8 @@ type orderbookHolder struct {
// coinbasepro can have up too 100 updates per second introducing overhead.
// The sync agent only requires an alert every 15 seconds for a specific
// currency.
ticker *time.Ticker
ticker *time.Ticker
updateID int64
}
// Update stores orderbook updates and dictates what features to use when processing
@@ -46,7 +72,8 @@ type Update struct {
Bids []orderbook.Item
Asks []orderbook.Item
Pair currency.Pair
// Checksum defines the expected value when the books have been verified
Checksum uint32
// Determines if there is a max depth of orderbooks and after an append we
// should remove any items that are outside of this scope. Kraken is the
// only exchange utilising this field.