mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-14 15:09:51 +00:00
* gateio: Add multi asset websocket support WIP. * meow * Add tests and shenanigans * integrate flushing and for enabling/disabling pairs from rpc shenanigans * some changes * linter: fixes strikes again. * Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails. * Add subscription tests (state functional) * glorious:nits + proxy handling * Spelling * linter: fixerino * instead of nil, dont do nil. * clean up nils * cya nils * don't need to set URL or check if its running * stream match update * update tests * linter: fix * glorious: nits + handle context cancellations * stop ping handler routine leak * * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled. * Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle. * Dial now uses code from DialContext but just calls context.Background() * Don't allow reader to return on parse binary response error. Just output error and return a non nil response * Allow rollback on connect on any error across all connections * fix shadow jutsu * glorious/gk: nitters - adds in ws mock server * linter: fix * fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity. * glorious: whooops * gk: nits * Leak issue and edge case * Websocket: Add SendMessageReturnResponses * whooooooopsie * gk: nitssssss * Update exchanges/stream/stream_match.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * Update exchanges/stream/stream_match_test.go Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> * linter: appease the linter gods * gk: nits * gk: drain brain * started * more changes before merge match pr * gateio: still building out * gateio: finish spot * fix up tests in gateio * Add tests for stream package * rm unused field * glorious: nits * rn files, specifically set function names to asset and offload routing to websocket type. * linter: fix * Add futures websocket request support * gateio: integrate with IBOTExchange (cherry pick my nose) * linter: fix * glorious: nits * add counter and update gateio * fix collision issue * Update exchanges/stream/websocket.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * add tests * linter: fix * After merge * Add error connection info * upgrade to upstream merge * Fix edge case where it does not reconnect made by an already closed connection * stream coverage * glorious: nits * glorious: nits removed asset error handling in stream package * linter: fix * rm block * Add basic readme * fix asset enabled flush cycle for multi connection * spella: fix * linter: fix * Add glorious suggestions, fix some race thing * reinstate name before any routine gets spawned * stop on error in mock tests * glorious: nits * Set correct price * glorious: nits found in CI build * Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant. * mutex across shutdown and connect for protection * lint: fix * test time withoffset, reinstate stop * fix whoops * const trafficCheckInterval; rm testmain * y * fix lint * bump time check window * stream: fix intermittant test failures while testing routines and remove code that is not needed. * spells * cant do what I did * protect race due to routine. * update testURL * use mock websocket connection instead of test URL's * linter: fix * remove url because its throwing errors on CI builds * connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side. * remove another superfluous url thats not really set up for this * spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly * linter: fixerino uperino * fix ID bug, why I do this, I don't know. * glorious: panix * linter: things * whoops * dont need to make consecutive Unix() calls * websocket: fix potential panic on error and no responses and adding waitForResponses * rm json parser and handle in json package instead * in favour of json package unmarshalling * linter: fix * linter: fix again * * change field name OutboundRequestSignature to WrapperDefinedConnectionSignature for agnostic inbound and outbound connections. * change method name GetOutboundConnection to GetConnection for agnostic inbound and outbound connections. * drop outbound field map for improved performance just using a range and field check (less complex as well) * change field name connections to connectionToWrapper for better clarity * spells and magic and wands * glorious: nits * comparable check for signature * mv err var * glorious: nits and stuff * attempt to fix race * glorious: nits * gk: nits; engine log cleanup * gk: nits; OCD * gk: nits; move function change file names * gk: nits; 🚀 * gk: nits; convert variadic function and message inspection to interface and include a specific function for that handling so as to not need nil on every call * gk: nits; continued * gk: engine nits; rm loaded exchange * gk: nits; drop WebsocketLoginResponse * stream: Add match method EnsureMatchWithData * gk: nits; rn Inspect to IsFinal * gk: nits; rn to MessageFilter * linter: fix * gateio: update rate limit definitions (cherry-pick) * Add test and missing * Shared REST rate limit definitions with Websocket service, set lookup item to nil for systems that do not require rate limiting; add glorious nit * integrate rate limits for websocket trading spot * conform to match upstream changes * standardise names to upstream style * fix wrapper standards test when sending a auth request through a websocket connection * whoops * Update exchanges/gateio/gateio_types.go Co-authored-by: Scott <gloriousCode@users.noreply.github.com> * glorious: nits * linter: fix * linter: overload * whoops * spelling fixes on recent merge * glorious: nits * linter: fix? * glorious: nits * gk: assert errors touched * gk: unexport derive functions * gk: nitssssssss * fix test * gk: nitters v1 * gk: http status * gk/nits: Add getAssetFromFuturesPair * gk: nits single response when submitting * gk: new pair with delimiter in tests * gk: param update slice to slice of pointers * gk: add asset type in params, includes t.Context() for tests * linter: fix * linter: fix * fix merge whoopsie * glorious: nits * gk: nit * shift over to websocket package error * internal/exchange/websocket -> exchange/websocket * PEAK OCD! * appease the OCD gods * thrasher: nits --------- Co-authored-by: shazbert <ryan.oharareid@thrasher.io> Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com> Co-authored-by: Scott <gloriousCode@users.noreply.github.com>
345 lines
11 KiB
Go
345 lines
11 KiB
Go
package buffer
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
|
|
"github.com/thrasher-corp/gocryptotrader/common/key"
|
|
"github.com/thrasher-corp/gocryptotrader/config"
|
|
"github.com/thrasher-corp/gocryptotrader/currency"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
|
"github.com/thrasher-corp/gocryptotrader/log"
|
|
)
|
|
|
|
const packageError = "websocket orderbook buffer error: %w"
|
|
|
|
var (
|
|
errExchangeConfigNil = errors.New("exchange config is nil")
|
|
errBufferConfigNil = errors.New("buffer config is nil")
|
|
errUnsetDataHandler = errors.New("datahandler unset")
|
|
errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set")
|
|
errUpdateIsNil = errors.New("update is nil")
|
|
errUpdateNoTargets = errors.New("update bid/ask targets cannot be nil")
|
|
errDepthNotFound = errors.New("orderbook depth not found")
|
|
errRESTOverwrite = errors.New("orderbook has been overwritten by REST protocol")
|
|
errInvalidAction = errors.New("invalid action")
|
|
errAmendFailure = errors.New("orderbook amend update failure")
|
|
errDeleteFailure = errors.New("orderbook delete update failure")
|
|
errInsertFailure = errors.New("orderbook insert update failure")
|
|
errUpdateInsertFailure = errors.New("orderbook update/insert update failure")
|
|
errRESTTimerLapse = errors.New("rest sync timer lapse with active websocket connection")
|
|
errOrderbookFlushed = errors.New("orderbook flushed")
|
|
)
|
|
|
|
// Setup sets private variables
|
|
func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandler chan<- any) error {
|
|
if exchangeConfig == nil { // exchange config fields are checked in websocket package prior to calling this, so further checks are not needed
|
|
return fmt.Errorf(packageError, errExchangeConfigNil)
|
|
}
|
|
if c == nil {
|
|
return fmt.Errorf(packageError, errBufferConfigNil)
|
|
}
|
|
if dataHandler == nil {
|
|
return fmt.Errorf(packageError, errUnsetDataHandler)
|
|
}
|
|
if exchangeConfig.Orderbook.WebsocketBufferEnabled &&
|
|
exchangeConfig.Orderbook.WebsocketBufferLimit < 1 {
|
|
return fmt.Errorf(packageError, errIssueBufferEnabledButNoLimit)
|
|
}
|
|
|
|
// NOTE: These variables are set by config.json under "orderbook" for each individual exchange
|
|
w.bufferEnabled = exchangeConfig.Orderbook.WebsocketBufferEnabled
|
|
w.obBufferLimit = exchangeConfig.Orderbook.WebsocketBufferLimit
|
|
|
|
w.sortBuffer = c.SortBuffer
|
|
w.sortBufferByUpdateIDs = c.SortBufferByUpdateIDs
|
|
w.updateEntriesByID = c.UpdateEntriesByID
|
|
w.exchangeName = exchangeConfig.Name
|
|
w.dataHandler = dataHandler
|
|
w.ob = make(map[key.PairAsset]*orderbookHolder)
|
|
w.verbose = exchangeConfig.Verbose
|
|
w.updateIDProgression = c.UpdateIDProgression
|
|
w.checksum = c.Checksum
|
|
return nil
|
|
}
|
|
|
|
// validate validates update against setup values
|
|
func (w *Orderbook) validate(u *orderbook.Update) error {
|
|
if u == nil {
|
|
return fmt.Errorf(packageError, errUpdateIsNil)
|
|
}
|
|
if len(u.Bids) == 0 && len(u.Asks) == 0 {
|
|
return fmt.Errorf(packageError, errUpdateNoTargets)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Update updates a stored pointer to an orderbook.Depth struct containing a
|
|
// bid and ask Tranches, this switches between the usage of a buffered update
|
|
func (w *Orderbook) Update(u *orderbook.Update) error {
|
|
if err := w.validate(u); err != nil {
|
|
return err
|
|
}
|
|
w.mtx.Lock()
|
|
defer w.mtx.Unlock()
|
|
book, ok := w.ob[key.PairAsset{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}]
|
|
if !ok {
|
|
return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s",
|
|
errDepthNotFound,
|
|
w.exchangeName,
|
|
u.Pair,
|
|
u.Asset)
|
|
}
|
|
|
|
// out of order update ID can be skipped
|
|
if w.updateIDProgression && u.UpdateID <= book.updateID {
|
|
if w.verbose {
|
|
log.Warnf(log.WebsocketMgr,
|
|
"Exchange %s CurrencyPair: %s AssetType: %s out of order websocket update received",
|
|
w.exchangeName,
|
|
u.Pair,
|
|
u.Asset)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Checks for when the rest protocol overwrites a streaming dominated book
|
|
// will stop updating book via incremental updates. This occurs because our
|
|
// sync manager (engine/sync.go) timer has elapsed for streaming. Usually
|
|
// because the book is highly illiquid.
|
|
isREST, err := book.ob.IsRESTSnapshot()
|
|
if err != nil {
|
|
if !errors.Is(err, orderbook.ErrOrderbookInvalid) {
|
|
return err
|
|
}
|
|
// In the event a checksum or processing error invalidates the book, all
|
|
// updates that could be stored in the websocket buffer, skip applying
|
|
// until a new snapshot comes through.
|
|
if w.verbose {
|
|
log.Warnf(log.WebsocketMgr,
|
|
"Exchange %s CurrencyPair: %s AssetType: %s underlying book is invalid, cannot apply update.",
|
|
w.exchangeName,
|
|
u.Pair,
|
|
u.Asset)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if isREST {
|
|
if w.verbose {
|
|
log.Warnf(log.WebsocketMgr,
|
|
"%s for Exchange %s CurrencyPair: %s AssetType: %s consider extending synctimeoutwebsocket",
|
|
errRESTOverwrite,
|
|
w.exchangeName,
|
|
u.Pair,
|
|
u.Asset)
|
|
}
|
|
// Instance of illiquidity, this signal notifies that there is websocket
|
|
// activity. We can invalidate the book and request a new snapshot. All
|
|
// further updates through the websocket should be caught above in the
|
|
// IsRestSnapshot() call.
|
|
return book.ob.Invalidate(errRESTTimerLapse)
|
|
}
|
|
|
|
if w.bufferEnabled {
|
|
var processed bool
|
|
processed, err = w.processBufferUpdate(book, u)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !processed {
|
|
return nil
|
|
}
|
|
} else {
|
|
err = w.processObUpdate(book, u)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Publish all state changes, disregarding verbosity or sync requirements.
|
|
book.ob.Publish()
|
|
w.dataHandler <- book.ob
|
|
return nil
|
|
}
|
|
|
|
// processBufferUpdate stores update into buffer, when buffer at capacity as
|
|
// defined by w.obBufferLimit it well then sort and apply updates.
|
|
func (w *Orderbook) processBufferUpdate(o *orderbookHolder, u *orderbook.Update) (bool, error) {
|
|
*o.buffer = append(*o.buffer, *u)
|
|
if len(*o.buffer) < w.obBufferLimit {
|
|
return false, nil
|
|
}
|
|
|
|
if w.sortBuffer {
|
|
// sort by last updated to ensure each update is in order
|
|
if w.sortBufferByUpdateIDs {
|
|
sort.Slice(*o.buffer, func(i, j int) bool {
|
|
return (*o.buffer)[i].UpdateID < (*o.buffer)[j].UpdateID
|
|
})
|
|
} else {
|
|
sort.Slice(*o.buffer, func(i, j int) bool {
|
|
return (*o.buffer)[i].UpdateTime.Before((*o.buffer)[j].UpdateTime)
|
|
})
|
|
}
|
|
}
|
|
for i := range *o.buffer {
|
|
err := w.processObUpdate(o, &(*o.buffer)[i])
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
// clear buffer of old updates
|
|
*o.buffer = nil
|
|
return true, nil
|
|
}
|
|
|
|
// processObUpdate processes updates either by its corresponding id or by price level
|
|
func (w *Orderbook) processObUpdate(o *orderbookHolder, u *orderbook.Update) error {
|
|
// Both update methods require post processing to ensure the orderbook is in a valid state.
|
|
if w.updateEntriesByID {
|
|
if err := o.updateByIDAndAction(u); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := o.updateByPrice(u); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if w.checksum != nil {
|
|
compare, err := o.ob.Retrieve()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = w.checksum(compare, u.Checksum)
|
|
if err != nil {
|
|
return o.ob.Invalidate(err)
|
|
}
|
|
o.updateID = u.UpdateID
|
|
} else if o.ob.VerifyOrderbook() {
|
|
compare, err := o.ob.Retrieve()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = compare.Verify()
|
|
if err != nil {
|
|
return o.ob.Invalidate(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateByPrice amends amount if match occurs by price, deletes if amount is
|
|
// zero or less and inserts if not found.
|
|
func (o *orderbookHolder) updateByPrice(updts *orderbook.Update) error {
|
|
return o.ob.UpdateBidAskByPrice(updts)
|
|
}
|
|
|
|
// updateByIDAndAction will receive an action to execute against the orderbook
|
|
// it will then match by IDs instead of price to perform the action
|
|
func (o *orderbookHolder) updateByIDAndAction(updts *orderbook.Update) error {
|
|
switch updts.Action {
|
|
case orderbook.Amend:
|
|
err := o.ob.UpdateBidAskByID(updts)
|
|
if err != nil {
|
|
return fmt.Errorf("%w %w", errAmendFailure, err)
|
|
}
|
|
case orderbook.Delete:
|
|
// edge case for Bitfinex as their streaming endpoint duplicates deletes
|
|
bypassErr := o.ob.GetName() == "Bitfinex" && o.ob.IsFundingRate()
|
|
err := o.ob.DeleteBidAskByID(updts, bypassErr)
|
|
if err != nil {
|
|
return fmt.Errorf("%w %w", errDeleteFailure, err)
|
|
}
|
|
case orderbook.Insert:
|
|
err := o.ob.InsertBidAskByID(updts)
|
|
if err != nil {
|
|
return fmt.Errorf("%w %w", errInsertFailure, err)
|
|
}
|
|
case orderbook.UpdateInsert:
|
|
err := o.ob.UpdateInsertByID(updts)
|
|
if err != nil {
|
|
return fmt.Errorf("%w %w", errUpdateInsertFailure, err)
|
|
}
|
|
default:
|
|
return fmt.Errorf("%w [%d]", errInvalidAction, updts.Action)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LoadSnapshot loads initial snapshot of orderbook data from websocket
|
|
func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
|
|
// Checks if book can deploy to depth
|
|
err := book.Verify()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
w.mtx.Lock()
|
|
defer w.mtx.Unlock()
|
|
holder, ok := w.ob[key.PairAsset{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}]
|
|
if !ok {
|
|
// Associate orderbook pointer with local exchange depth map
|
|
var depth *orderbook.Depth
|
|
depth, err = orderbook.DeployDepth(book.Exchange, book.Pair, book.Asset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
depth.AssignOptions(book)
|
|
buffer := make([]orderbook.Update, w.obBufferLimit)
|
|
|
|
holder = &orderbookHolder{ob: depth, buffer: &buffer}
|
|
w.ob[key.PairAsset{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] = holder
|
|
}
|
|
|
|
holder.updateID = book.LastUpdateID
|
|
|
|
err = holder.ob.LoadSnapshot(book.Bids, book.Asks, book.LastUpdateID, book.LastUpdated, book.UpdatePushedAt, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
holder.ob.Publish()
|
|
w.dataHandler <- holder.ob
|
|
return nil
|
|
}
|
|
|
|
// GetOrderbook returns an orderbook copy as orderbook.Base
|
|
func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base, error) {
|
|
w.mtx.Lock()
|
|
defer w.mtx.Unlock()
|
|
book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s %s %s %w", w.exchangeName, p, a, errDepthNotFound)
|
|
}
|
|
return book.ob.Retrieve()
|
|
}
|
|
|
|
// FlushBuffer flushes w.ob data to be garbage collected and refreshed when a
|
|
// connection is lost and reconnected
|
|
func (w *Orderbook) FlushBuffer() {
|
|
w.mtx.Lock()
|
|
w.ob = make(map[key.PairAsset]*orderbookHolder)
|
|
w.mtx.Unlock()
|
|
}
|
|
|
|
// FlushOrderbook flushes independent orderbook
|
|
func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error {
|
|
w.mtx.Lock()
|
|
defer w.mtx.Unlock()
|
|
book, ok := w.ob[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
|
|
if !ok {
|
|
return fmt.Errorf("cannot flush orderbook %s %s %s %w",
|
|
w.exchangeName,
|
|
p,
|
|
a,
|
|
errDepthNotFound)
|
|
}
|
|
// error not needed in this return
|
|
_ = book.ob.Invalidate(errOrderbookFlushed)
|
|
return nil
|
|
}
|