Files
gocryptotrader/exchanges/orderbook/incremental_updates.go
Ryan O'Hara-Reid c892f492a9 buffer/orderbook: shift orderbook update logic from buffer package to orderbook package (#1908)
* buffer/orderbook: shift orderbook update logic from buffer package to orderbook package

* Update exchanges/orderbook/depth.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* linter: fixes

* spelling: fix

* samboss: add in some todos

* sammy nit: add unlock on error

* sammy nits: rm ptr to slice field buffer in orderbookHolder

* sammy nits: Add more coverage bro

* sammy nits: even more coverage

* gk: nits on commentary

* gk: nits change sort.Slice to slices.SortFunc

* gk: fix commentary on buffer clearing

* gk: nits fin

* linter: fix

* Update exchange/websocket/buffer/buffer.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/tranches.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/orderbook.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchange/websocket/buffer/buffer_test.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/incremental_updates.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: refresh action types and names

* gk nits: consolidate error vars and naming

* gk nits: more name changes

* gk nits; buffer tests update

* gk nits: error var names change

* linter: FIX

* it gets inlined but there is an alloc

* rn field in TODO

* Update exchanges/binance/binance_websocket.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* Update exchanges/binance/binance_websocket.go

Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>

* orderbook: shift verify/validate funcs to validate.go and rn Verify() -> Validate()

* orderbook: validate even in presence of checksum and allow cowboy mode

* buffer; fix test

* kraken: fix futures orderbook by reversing incoming bids

* okx: change default spread pair

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* Update exchanges/orderbook/validate.go

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>

* gk: initial nits

* rn fields V(v)erifyorderbook to V(v)alidateOrderbook

* buffer/orderbook: nilguard in validate and change method receiver w -> o

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
Co-authored-by: Adrian Gallagher <adrian.gallagher@thrasher.io>
2025-06-18 16:19:58 +10:00

250 lines
7.4 KiB
Go

package orderbook
import (
"errors"
"fmt"
"time"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
)
// ActionType defines the behaviour of an orderbook update
type ActionType uint8
// ActionType constants for use with ProcessUpdate
const (
UnknownAction ActionType = iota
InsertAction
UpdateOrInsertAction
UpdateAction
DeleteAction
)
// Public error vars
var (
ErrDepthNotFound = errors.New("orderbook depth not found")
ErrEmptyUpdate = errors.New("update contains no bids or asks")
)
var (
errInvalidAction = errors.New("invalid action")
errUpdateFailed = errors.New("orderbook update failed")
errDeleteFailed = errors.New("orderbook update delete failed")
errRESTSnapshot = errors.New("cannot update REST protocol loaded snapshot")
errChecksumMismatch = errors.New("checksum mismatch")
errChecksumGeneratorUnset = errors.New("checksum generator unset")
)
// Update holds changes that are to be applied to a stored orderbook
type Update struct {
UpdateID int64
UpdateTime time.Time
LastPushed time.Time
Asset asset.Item
Bids Levels
Asks Levels
Pair currency.Pair
// ExpectedChecksum defines the expected value when the books have been verified
ExpectedChecksum uint32
// GenerateChecksum is a function that will be called to generate a checksum from the stored orderbook post update
GenerateChecksum func(snapshot *Book) uint32
// AllowEmpty, when true, permits loading an empty order book update to set an UpdateID without including actual data
AllowEmpty bool
// Action defines the action to be performed on the orderbook e.g. amend, delete, insert, update/insert
// Orderbook IDs are used to identify the orderbook level to be updated, deleted or inserted
Action ActionType
SkipOutOfOrderLastUpdateID bool
}
// ProcessUpdate applies updates to the orderbook depth, on error it will invalidate the orderbook and return the
// error, this is to ensure the orderbook is always in a valid state.
func (d *Depth) ProcessUpdate(u *Update) error {
if len(u.Bids) == 0 && len(u.Asks) == 0 && !u.AllowEmpty {
return d.Invalidate(ErrEmptyUpdate)
}
// TODO: Enforce LastPushed set to determine server latency
d.m.Lock()
defer d.m.Unlock()
if d.validationError != nil {
return d.validationError
}
// This will process out of order updates but will not error on them.
// TODO: Error on out of order updates; this is intentionally kept as is from the buffer package.
// Add update.UpdateTime time check to ensure that the update is newer than the last update,
// this should screen zero values as well.
if u.SkipOutOfOrderLastUpdateID && d.lastUpdateID >= u.UpdateID {
return nil
}
if d.options.restSnapshot {
return d.invalidate(errRESTSnapshot)
}
if u.Action != UnknownAction {
if err := d.update(u); err != nil {
return d.invalidate(err)
}
} else {
if err := d.updateBidAskByPrice(u); err != nil {
return d.invalidate(err)
}
}
if !d.validateOrderbook {
return nil
}
if u.ExpectedChecksum != 0 {
if u.GenerateChecksum == nil {
return d.invalidate(errChecksumGeneratorUnset)
}
if checksum := u.GenerateChecksum(d.snapshot()); checksum != u.ExpectedChecksum {
return d.invalidate(fmt.Errorf("%s %s %s %w: expected '%d', got '%d'", d.exchange, d.pair, d.asset, errChecksumMismatch, u.ExpectedChecksum, checksum))
}
}
if err := validate(d.snapshot()); err != nil {
return d.invalidate(err)
}
return nil
}
func (d *Depth) snapshot() *Book {
return &Book{
Bids: d.bidLevels.Levels,
Asks: d.askLevels.Levels,
Exchange: d.options.exchange,
Pair: d.pair,
Asset: d.asset,
IsFundingRate: d.options.isFundingRate,
PriceDuplication: d.options.priceDuplication,
IDAlignment: d.options.idAligned,
ChecksumStringRequired: d.options.checksumStringRequired,
}
}
// update will receive an action to execute against the orderbook it will then match by IDs instead of
// price to perform the action
func (d *Depth) update(u *Update) error {
switch u.Action {
case UpdateAction:
if err := d.updateBidAskByID(u); err != nil {
return fmt.Errorf("%w for %q: %w", errUpdateFailed, u.Action, err)
}
case DeleteAction:
// edge case for Bitfinex as their streaming endpoint duplicates deletes
bypassErr := d.options.exchange == "Bitfinex" && d.options.isFundingRate // TODO: Confirm this is still correct
if err := d.delete(u, bypassErr); err != nil {
return fmt.Errorf("%w for %q: %w", errDeleteFailed, u.Action, err)
}
case InsertAction:
if err := d.insert(u); err != nil {
return fmt.Errorf("%w for %q: %w", errUpdateFailed, u.Action, err)
}
case UpdateOrInsertAction:
if err := d.updateOrInsert(u); err != nil {
return fmt.Errorf("%w for %q: %w", errUpdateFailed, u.Action, err)
}
default:
return fmt.Errorf("%w [%s]", errInvalidAction, u.Action)
}
return nil
}
// updateBidAskByPrice updates the bid and ask spread and enforces Depth.options.maxDepth
func (d *Depth) updateBidAskByPrice(update *Update) error {
if update.UpdateTime.IsZero() {
return fmt.Errorf("%s %s %s %w", d.exchange, d.pair, d.asset, ErrLastUpdatedNotSet)
}
d.bidLevels.updateInsertByPrice(update.Bids, d.options.maxDepth)
d.askLevels.updateInsertByPrice(update.Asks, d.options.maxDepth)
d.updateAndAlert(update)
return nil
}
// updateBidAskByID amends details by ID
func (d *Depth) updateBidAskByID(update *Update) error {
if update.UpdateTime.IsZero() {
return fmt.Errorf("%s %s %s %w", d.exchange, d.pair, d.asset, ErrLastUpdatedNotSet)
}
if err := d.bidLevels.updateByID(update.Bids); err != nil {
return err
}
if err := d.askLevels.updateByID(update.Asks); err != nil {
return err
}
d.updateAndAlert(update)
return nil
}
// delete deletes a price level by ID
func (d *Depth) delete(update *Update, bypassErr bool) error {
if update.UpdateTime.IsZero() {
return fmt.Errorf("%s %s %s %w", d.exchange, d.pair, d.asset, ErrLastUpdatedNotSet)
}
if err := d.bidLevels.deleteByID(update.Bids, bypassErr); err != nil {
return err
}
if err := d.askLevels.deleteByID(update.Asks, bypassErr); err != nil {
return err
}
d.updateAndAlert(update)
return nil
}
// insert inserts new updates
func (d *Depth) insert(update *Update) error {
if update.UpdateTime.IsZero() {
return fmt.Errorf("%s %s %s %w", d.exchange, d.pair, d.asset, ErrLastUpdatedNotSet)
}
if err := d.bidLevels.insertUpdates(update.Bids); err != nil {
return err
}
if err := d.askLevels.insertUpdates(update.Asks); err != nil {
return err
}
d.updateAndAlert(update)
return nil
}
// updateOrInsert updates or inserts by ID at current price level.
func (d *Depth) updateOrInsert(update *Update) error {
if update.UpdateTime.IsZero() {
return fmt.Errorf("%s %s %s %w", d.exchange, d.pair, d.asset, ErrLastUpdatedNotSet)
}
if err := d.bidLevels.updateInsertByID(update.Bids); err != nil {
return err
}
if err := d.askLevels.updateInsertByID(update.Asks); err != nil {
return err
}
d.updateAndAlert(update)
return nil
}
// String returns a string representation of the ActionType
func (a ActionType) String() string {
switch a {
case UnknownAction:
return "Unknown"
case InsertAction:
return "Insert"
case UpdateOrInsertAction:
return "UpdateOrInsert"
case UpdateAction:
return "Update"
case DeleteAction:
return "Delete"
default:
return fmt.Sprintf("Unknown(%d)", a)
}
}