Files
gocryptotrader/exchange/websocket/buffer/buffer.go
Ryan O'Hara-Reid c892f492a9 buffer/orderbook: shift orderbook update logic from buffer package to orderbook package (#1908)
* buffer/orderbook: shift orderbook update logic from buffer package to orderbook package

* Update exchanges/orderbook/depth.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* linter: fixes

* spelling: fix

* samboss: add in some todos

* sammy nit: add unlock on error

* sammy nits: rm ptr to slice field buffer in orderbookHolder

* sammy nits: Add more coverage bro

* sammy nits: even more coverage

* gk: nits on commentary

* gk: nits change sort.Slice to slices.SortFunc

* gk: fix commentary on buffer clearing

* gk: nits fin

* linter: fix

* Update exchange/websocket/buffer/buffer.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/tranches.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/orderbook.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/incremental_updates.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: refresh action types and names

* gk nits: consolidate error vars and naming

* gk nits: more name changes

* gk nits; buffer tests update

* gk nits: error var names change

* linter: FIX

* it gets inlined but there is an alloc

* rn field in TODO

* Update exchanges/binance/binance_websocket.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* Update exchanges/binance/binance_websocket.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* orderbook: shift verify/validate funcs to validate.go and rn Verify() -> Validate()

* orderbook: validate even in presence of checksum and allow cowboy mode

* buffer; fix test

* kraken: fix futures orderbook by reversing incoming bids

* okx: change default spread pair

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: initial nits

* rn fields V(v)erifyorderbook to V(v)alidateOrderbook

* buffer/orderbook: nilguard in validate and change method receiver w -> o

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
2025-06-18 16:19:58 +10:00

205 lines
6.6 KiB
Go

package buffer
import (
"cmp"
"errors"
"fmt"
"slices"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
const packageError = "websocket orderbook buffer error: %w"
var (
errExchangeConfigNil = errors.New("exchange config is nil")
errBufferConfigNil = errors.New("buffer config is nil")
errUnsetDataHandler = errors.New("datahandler unset")
errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set")
errOrderbookFlushed = errors.New("orderbook flushed")
)
// Setup sets private variables
func (o *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandler chan<- any) error {
if exchangeConfig == nil { // exchange config fields are checked in websocket package prior to calling this, so further checks are not needed
return fmt.Errorf(packageError, errExchangeConfigNil)
}
if c == nil {
return fmt.Errorf(packageError, errBufferConfigNil)
}
if dataHandler == nil {
return fmt.Errorf(packageError, errUnsetDataHandler)
}
if exchangeConfig.Orderbook.WebsocketBufferEnabled &&
exchangeConfig.Orderbook.WebsocketBufferLimit < 1 {
return fmt.Errorf(packageError, errIssueBufferEnabledButNoLimit)
}
// NOTE: These variables are set by config.json under "orderbook" for each individual exchange
o.bufferEnabled = exchangeConfig.Orderbook.WebsocketBufferEnabled
o.obBufferLimit = exchangeConfig.Orderbook.WebsocketBufferLimit
o.sortBuffer = c.SortBuffer
o.sortBufferByUpdateIDs = c.SortBufferByUpdateIDs
o.exchangeName = exchangeConfig.Name
o.dataHandler = dataHandler
o.ob = make(map[key.PairAsset]*orderbookHolder)
o.verbose = exchangeConfig.Verbose
return nil
}
// LoadSnapshot loads initial snapshot of orderbook data from websocket
func (o *Orderbook) LoadSnapshot(book *orderbook.Book) error {
if err := book.Validate(); err != nil {
return err
}
o.m.RLock()
holder, ok := o.ob[key.PairAsset{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}]
o.m.RUnlock()
if !ok {
o.m.Lock()
// Associate orderbook pointer with local exchange depth map
depth, err := orderbook.DeployDepth(book.Exchange, book.Pair, book.Asset)
if err != nil {
o.m.Unlock()
return err
}
depth.AssignOptions(book)
holder = &orderbookHolder{ob: depth, buffer: make([]orderbook.Update, 0, o.obBufferLimit)}
o.ob[key.PairAsset{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] = holder
o.m.Unlock()
}
book.RestSnapshot = false
if err := holder.ob.LoadSnapshot(book); err != nil {
return err
}
holder.ob.Publish()
o.dataHandler <- holder.ob
return nil
}
// Update updates a stored pointer to an orderbook.Depth struct containing bid and ask Tranches, this switches between
// the usage of a buffered update
func (o *Orderbook) Update(u *orderbook.Update) error {
o.m.RLock()
holder, ok := o.ob[key.PairAsset{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}]
o.m.RUnlock()
if !ok {
return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s", orderbook.ErrDepthNotFound, o.exchangeName, u.Pair, u.Asset)
}
if o.bufferEnabled {
if processed, err := o.processBufferUpdate(holder, u); err != nil || !processed {
return err
}
} else {
if err := holder.ob.ProcessUpdate(u); err != nil {
return err
}
}
// Publish all state changes, disregarding verbosity or sync requirements.
holder.ob.Publish()
o.dataHandler <- holder.ob
return nil
}
// processBufferUpdate stores update into buffer, when buffer at capacity as
// defined by o.obBufferLimit it well then sort and apply updates.
func (o *Orderbook) processBufferUpdate(holder *orderbookHolder, u *orderbook.Update) (bool, error) {
holder.buffer = append(holder.buffer, *u)
if len(holder.buffer) < o.obBufferLimit {
return false, nil
}
if o.sortBuffer {
// sort by last updated to ensure each update is in order
if o.sortBufferByUpdateIDs {
slices.SortFunc(holder.buffer, func(a, b orderbook.Update) int {
return cmp.Compare(a.UpdateID, b.UpdateID)
})
} else {
slices.SortFunc(holder.buffer, func(a, b orderbook.Update) int {
return a.UpdateTime.Compare(b.UpdateTime)
})
}
}
// Always empty the buffer after processing, even if there's an error
defer func() { holder.buffer = holder.buffer[:0] }()
for i := range holder.buffer {
if err := holder.ob.ProcessUpdate(&holder.buffer[i]); err != nil {
return false, err
}
}
return true, nil
}
// GetOrderbook returns an orderbook copy as orderbook.Book
func (o *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Book, error) {
if p.IsEmpty() {
return nil, currency.ErrCurrencyPairEmpty
}
if !a.IsValid() {
return nil, asset.ErrInvalidAsset
}
o.m.RLock()
holder, ok := o.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
o.m.RUnlock()
if !ok {
return nil, fmt.Errorf("%s %w: %s.%s", o.exchangeName, orderbook.ErrDepthNotFound, a, p)
}
return holder.ob.Retrieve()
}
// LastUpdateID returns the last update ID of the orderbook
func (o *Orderbook) LastUpdateID(p currency.Pair, a asset.Item) (int64, error) {
if p.IsEmpty() {
return 0, currency.ErrCurrencyPairEmpty
}
if !a.IsValid() {
return 0, asset.ErrInvalidAsset
}
o.m.RLock()
book, ok := o.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
o.m.RUnlock()
if !ok {
return 0, fmt.Errorf("%s %w: %s.%s", o.exchangeName, orderbook.ErrDepthNotFound, a, p)
}
return book.ob.LastUpdateID()
}
// FlushBuffer flushes individual orderbook buffers while keeping the orderbook lookups intact and ready for new updates
// when a connection is re-established.
func (o *Orderbook) FlushBuffer() {
o.m.Lock()
for _, holder := range o.ob {
holder.buffer = holder.buffer[:0]
}
o.m.Unlock()
}
// InvalidateOrderbook invalidates the orderbook so no trading can occur on potential corrupted data
// TODO: Add in reason for invalidation for debugging purposes.
func (o *Orderbook) InvalidateOrderbook(p currency.Pair, a asset.Item) error {
o.m.RLock()
holder, ok := o.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
o.m.RUnlock()
if !ok {
return fmt.Errorf("cannot invalidate orderbook %s %s %s %w", o.exchangeName, p, a, orderbook.ErrDepthNotFound)
}
// Invalidate returns a formatted version of the error it's passed
// In this context we don't need that, since this method only returns an error if it cannot invalidate
_ = holder.ob.Invalidate(errOrderbookFlushed)
return nil
}