exchange: binance orderbook fix (#599)

* port orderbook binance management from draft singular asset (spot) processing add additional updates to buffer management

* integrate port

* shifted burden of proof to exchange and remove repairing techniques that obfuscate issues and could caause artifacts

* WIP

* Update exchanges, update tests, update configuration so we can default off on buffer util.

* Add buffer enabled switching to all exchanges and some that are missing, default to off.

* lbtc set not aggregate books

* Addr linter issues

* EOD wip

* optimization and bug fix pass

* clean before test and benchmarking

* add testing/benchmarks to sorting/reversing functions, dropped pointer to slice as we aren't changing slice len or cap

* Add tests and removed ptr for main book as we just ammend amount

* addr exchange test issues

* ci issues

* addr glorious issues

* Addr MCB nits, fixed funding rate book for bitfinex and fixed potential panic on nil book return

* addr linter issues

* updated mistakes

* Fix more tests

* revert bypass

* Addr mcb nits

* fix zero price bug caused by exchange. Filted out bid result rather then unsubscribing. Updated orderbook to L2 so there is no aggregation.

* Allow for zero bid and ask books to be loaded and warn if found.

* remove authentication subscription conflicts as they do not have a channel ID return

* WIP - Batching outbound requests for kraken as they do not give you the partial if you subscribe to do many things.

* finalised outbound request for kraken

* filter zero value due to invalid returned data from exchange, add in max subscription amount and increased outbound batch limit

* expand to max allowed book length & fix issue where they were sending a zero length ask side when we sent a depth of zero

* Updated function comments and added in more realistic book sizing for sort cases

* change map ordering

* amalgamate maps in buffer

* Rm ln

* fix kraken linter issues

* add in buffer initialisation

* increase timout by 30seconds

* Coinbene: Add websocket orderbook length check.

* Engine: Improve switch statement for orderbook summary dissplay.

* Binance: Added tests, remove deadlock

* Exchanges: Change orderbook field -> IsFundingRate

* Orderbook Buffer: Added method to orderbookHolder

* Kraken: removed superfluous integer for sleep

* Bitmex: fixed error return

* cmd/gctcli: force 8 decimal place usage for orderbook streaming

* Kraken: Add checksum and fix bug where we were dropping returned data which was causing artifacts

* Kraken: As per orderbook documentation added in maxdepth field to update to filter depth that goes beyond current scope

* Bitfinex: Tracking down bug on margin-funding, added sequence and checksum validation websocket config on connect (WIP)

* Bitfinex: Complete implementation of checksum

* Bitfinex: Fix funding book insertion and checksum - Dropped updates and deleting items not on book are continuously occuring from stream

* Bitfinex: Fix linter issues

* Bitfinex: Fix even more linter issues.

* Bitmex: Populate orderbook base identification fields to be passed back when error occurrs

* OkGroup: Populate orderbook base identification fields to be passed back when error occurrs

* BTSE: Change string check to 'connect success' to capture multiple user successful strings

* Bitfinex: Updated handling of funding tickers

* Bitfinex: Fix undocumented alignment bug for funding rates

* Bitfinex: Updated error return with more information

* Bitfinex: Change REST fetching to Raw book to keep it in line with websocket implementation. Fix woopsy.

* Localbitcoins: Had to impose a rate limiter to stop errors, fixed return for easier error identification.

* Exchanges: Update failing tests

* LocalBitcoins: Addr nit and bumped time by 1 second for fetching books

* Kraken: Dynamically scale precision based on str return for checksum calculations

* Kraken: Add pair and asset type to validateCRC32 error reponse

* BTSE: Filter out zero amount orderbook price levels in websocket return

* Exchanges: Update orderbook functions to return orderbook base to differentiate errors.

* BTSE: Fix spelling

* Bitmex: Fix error return string

* BTSE: Add orderbook filtering function

* Coinbene: Change wording

* BTSE: Add test for filtering

* Binance: Addr nits, added in variables for buffers and worker amounts and fixed error log messages

* GolangCI: Remove excess 0

* Binance: Reduces double ups on asset and pair in errors

* Binance: Fix error checking
This commit is contained in:
Ryan O'Hara-Reid
2021-01-04 17:19:55 +11:00
committed by GitHub
parent 59013ea076
commit eb0571cc9b
80 changed files with 11139 additions and 1408 deletions

View File

@@ -10,8 +10,31 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)
const packageError = "websocket orderbook buffer error: %w"
var (
errUnsetExchangeName = errors.New("exchange name unset")
errUnsetDataHandler = errors.New("datahandler unset")
errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set")
errUpdateIsNil = errors.New("update is nil")
errUpdateNoTargets = errors.New("update bid/ask targets cannot be nil")
)
// Setup sets private variables
func (w *Orderbook) Setup(obBufferLimit int, bufferEnabled, sortBuffer, sortBufferByUpdateIDs, updateEntriesByID bool, exchangeName string, dataHandler chan interface{}) {
func (w *Orderbook) Setup(obBufferLimit int,
bufferEnabled,
sortBuffer,
sortBufferByUpdateIDs,
updateEntriesByID bool, exchangeName string, dataHandler chan interface{}) error {
if exchangeName == "" {
return fmt.Errorf(packageError, errUnsetExchangeName)
}
if dataHandler == nil {
return fmt.Errorf(packageError, errUnsetDataHandler)
}
if bufferEnabled && obBufferLimit < 1 {
return fmt.Errorf(packageError, errIssueBufferEnabledButNoLimit)
}
w.obBufferLimit = obBufferLimit
w.bufferEnabled = bufferEnabled
w.sortBuffer = sortBuffer
@@ -19,6 +42,19 @@ func (w *Orderbook) Setup(obBufferLimit int, bufferEnabled, sortBuffer, sortBuff
w.updateEntriesByID = updateEntriesByID
w.exchangeName = exchangeName
w.dataHandler = dataHandler
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
return nil
}
// validate validates update against setup values
func (w *Orderbook) validate(u *Update) error {
if u == nil {
return fmt.Errorf(packageError, errUpdateIsNil)
}
if len(u.Bids) == 0 && len(u.Asks) == 0 {
return fmt.Errorf(packageError, errUpdateNoTargets)
}
return nil
}
// Update updates a local buffer using bid targets and ask targets then updates
@@ -27,13 +63,12 @@ func (w *Orderbook) Setup(obBufferLimit int, bufferEnabled, sortBuffer, sortBuff
// Price target not found; append of price target
// Price target found; amend volume of price target
func (w *Orderbook) Update(u *Update) error {
if (u.Bids == nil && u.Asks == nil) || (len(u.Bids) == 0 && len(u.Asks) == 0) {
return fmt.Errorf("%v cannot have bids and ask targets both nil",
w.exchangeName)
if err := w.validate(u); err != nil {
return err
}
w.m.Lock()
defer w.m.Unlock()
obLookup, ok := w.ob[u.Pair][u.Asset]
obLookup, ok := w.ob[u.Pair.Base][u.Pair.Quote][u.Asset]
if !ok {
return fmt.Errorf("ob.Base could not be found for Exchange %s CurrencyPair: %s AssetType: %s",
w.exchangeName,
@@ -42,246 +77,345 @@ func (w *Orderbook) Update(u *Update) error {
}
if w.bufferEnabled {
overBufferLimit := w.processBufferUpdate(obLookup, u)
if !overBufferLimit {
processed, err := w.processBufferUpdate(obLookup, u)
if err != nil {
return err
}
if !processed {
return nil
}
} else {
w.processObUpdate(obLookup, u)
err := w.processObUpdate(obLookup, u)
if err != nil {
return err
}
}
err := obLookup.Process()
err := obLookup.ob.Process()
if err != nil {
return err
}
if w.bufferEnabled {
// Reset the buffer
w.buffer[u.Pair][u.Asset] = nil
}
// Process in data handler
w.dataHandler <- obLookup
select {
case w.dataHandler <- obLookup.ob:
default:
}
return nil
}
func (w *Orderbook) processBufferUpdate(o *orderbook.Base, u *Update) bool {
if w.buffer == nil {
w.buffer = make(map[currency.Pair]map[asset.Item][]*Update)
}
if w.buffer[u.Pair] == nil {
w.buffer[u.Pair] = make(map[asset.Item][]*Update)
}
bufferLookup := w.buffer[u.Pair][u.Asset]
if len(bufferLookup) <= w.obBufferLimit {
bufferLookup = append(bufferLookup, u)
if len(bufferLookup) < w.obBufferLimit {
w.buffer[u.Pair][u.Asset] = bufferLookup
return false
}
// processBufferUpdate stores update into buffer, when buffer at capacity as
// defined by w.obBufferLimit it well then sort and apply updates.
func (w *Orderbook) processBufferUpdate(o *orderbookHolder, u *Update) (bool, error) {
*o.buffer = append(*o.buffer, *u)
if len(*o.buffer) < w.obBufferLimit {
return false, nil
}
if w.sortBuffer {
// sort by last updated to ensure each update is in order
if w.sortBufferByUpdateIDs {
sort.Slice(bufferLookup, func(i, j int) bool {
return bufferLookup[i].UpdateID < bufferLookup[j].UpdateID
sort.Slice(*o.buffer, func(i, j int) bool {
return (*o.buffer)[i].UpdateID < (*o.buffer)[j].UpdateID
})
} else {
sort.Slice(bufferLookup, func(i, j int) bool {
return bufferLookup[i].UpdateTime.Before(bufferLookup[j].UpdateTime)
sort.Slice(*o.buffer, func(i, j int) bool {
return (*o.buffer)[i].UpdateTime.Before((*o.buffer)[j].UpdateTime)
})
}
}
for i := range bufferLookup {
w.processObUpdate(o, bufferLookup[i])
for i := range *o.buffer {
err := w.processObUpdate(o, &(*o.buffer)[i])
if err != nil {
return false, err
}
}
w.buffer[u.Pair][u.Asset] = bufferLookup
return true
// clear buffer of old updates
*o.buffer = nil
return true, nil
}
func (w *Orderbook) processObUpdate(o *orderbook.Base, u *Update) {
o.LastUpdateID = u.UpdateID
// processObUpdate processes updates either by its corresponding id or by
// price level
func (w *Orderbook) processObUpdate(o *orderbookHolder, u *Update) error {
o.ob.LastUpdateID = u.UpdateID
if w.updateEntriesByID {
w.updateByIDAndAction(o, u)
} else {
w.updateAsksByPrice(o, u)
w.updateBidsByPrice(o, u)
return o.updateByIDAndAction(u)
}
return o.updateByPrice(u)
}
func (w *Orderbook) updateAsksByPrice(o *orderbook.Base, u *Update) {
updates:
for j := range u.Asks {
for k := range o.Asks {
if o.Asks[k].Price == u.Asks[j].Price {
if u.Asks[j].Amount <= 0 {
o.Asks = append(o.Asks[:k], o.Asks[k+1:]...)
continue updates
// updateByPrice ammends amount if match occurs by price, deletes if amount is
// zero or less and inserts if not found.
func (o *orderbookHolder) updateByPrice(updts *Update) error {
askUpdates:
for j := range updts.Asks {
for target := range o.ob.Asks {
if o.ob.Asks[target].Price == updts.Asks[j].Price {
if updts.Asks[j].Amount == 0 {
o.ob.Asks = append(o.ob.Asks[:target], o.ob.Asks[target+1:]...)
continue askUpdates
}
o.Asks[k].Amount = u.Asks[j].Amount
continue updates
o.ob.Asks[target].Amount = updts.Asks[j].Amount
continue askUpdates
}
}
if u.Asks[j].Amount == 0 {
if updts.Asks[j].Amount <= 0 {
continue
}
o.Asks = append(o.Asks, u.Asks[j])
insertAsk(updts.Asks[j], &o.ob.Asks)
if updts.MaxDepth != 0 && len(o.ob.Asks) > updts.MaxDepth {
o.ob.Asks = o.ob.Asks[:updts.MaxDepth]
}
}
sort.Slice(o.Asks, func(i, j int) bool {
return o.Asks[i].Price < o.Asks[j].Price
})
}
func (w *Orderbook) updateBidsByPrice(o *orderbook.Base, u *Update) {
updates:
for j := range u.Bids {
for k := range o.Bids {
if o.Bids[k].Price == u.Bids[j].Price {
if u.Bids[j].Amount <= 0 {
o.Bids = append(o.Bids[:k], o.Bids[k+1:]...)
continue updates
bidUpdates:
for j := range updts.Bids {
for target := range o.ob.Bids {
if o.ob.Bids[target].Price == updts.Bids[j].Price {
if updts.Bids[j].Amount == 0 {
o.ob.Bids = append(o.ob.Bids[:target], o.ob.Bids[target+1:]...)
continue bidUpdates
}
o.Bids[k].Amount = u.Bids[j].Amount
continue updates
o.ob.Bids[target].Amount = updts.Bids[j].Amount
continue bidUpdates
}
}
if u.Bids[j].Amount == 0 {
if updts.Bids[j].Amount <= 0 {
continue
}
o.Bids = append(o.Bids, u.Bids[j])
insertBid(updts.Bids[j], &o.ob.Bids)
if updts.MaxDepth != 0 && len(o.ob.Bids) > updts.MaxDepth {
o.ob.Bids = o.ob.Bids[:updts.MaxDepth]
}
}
sort.Slice(o.Bids, func(i, j int) bool {
return o.Bids[i].Price > o.Bids[j].Price
})
return nil
}
// updateByIDAndAction will receive an action to execute against the orderbook
// it will then match by IDs instead of price to perform the action
func (w *Orderbook) updateByIDAndAction(o *orderbook.Base, u *Update) {
switch u.Action {
case "update":
for x := range u.Bids {
for y := range o.Bids {
if o.Bids[y].ID == u.Bids[x].ID {
o.Bids[y].Amount = u.Bids[x].Amount
break
}
}
func (o *orderbookHolder) updateByIDAndAction(updts *Update) (err error) {
switch updts.Action {
case Amend:
err = applyUpdates(updts.Bids, o.ob.Bids)
if err != nil {
return err
}
for x := range u.Asks {
for y := range o.Asks {
if o.Asks[y].ID == u.Asks[x].ID {
o.Asks[y].Amount = u.Asks[x].Amount
break
}
}
err = applyUpdates(updts.Asks, o.ob.Asks)
if err != nil {
return err
}
case "delete":
for x := range u.Bids {
for y := 0; y < len(o.Bids); y++ {
if o.Bids[y].ID == u.Bids[x].ID {
o.Bids = append(o.Bids[:y], o.Bids[y+1:]...)
break
}
}
case Delete:
// edge case for Bitfinex as their streaming endpoint duplicates deletes
bypassErr := o.ob.ExchangeName == "Bitfinex" && o.ob.IsFundingRate
err = deleteUpdates(updts.Bids, &o.ob.Bids, bypassErr)
if err != nil {
return fmt.Errorf("%s %s %v", o.ob.AssetType, o.ob.Pair, err)
}
for x := range u.Asks {
for y := 0; y < len(o.Asks); y++ {
if o.Asks[y].ID == u.Asks[x].ID {
o.Asks = append(o.Asks[:y], o.Asks[y+1:]...)
break
}
}
err = deleteUpdates(updts.Asks, &o.ob.Asks, bypassErr)
if err != nil {
return fmt.Errorf("%s %s %v", o.ob.AssetType, o.ob.Pair, err)
}
case "insert":
o.Bids = append(o.Bids, u.Bids...)
sort.Slice(o.Bids, func(i, j int) bool {
return o.Bids[i].Price > o.Bids[j].Price
})
o.Asks = append(o.Asks, u.Asks...)
sort.Slice(o.Asks, func(i, j int) bool {
return o.Asks[i].Price < o.Asks[j].Price
})
case "update/insert":
case Insert:
insertUpdatesBid(updts.Bids, &o.ob.Bids)
insertUpdatesAsk(updts.Asks, &o.ob.Asks)
case UpdateInsert:
updateBids:
for x := range u.Bids {
for y := range o.Bids {
if o.Bids[y].ID == u.Bids[x].ID {
o.Bids[y].Amount = u.Bids[x].Amount
for x := range updts.Bids {
for target := range o.ob.Bids { // First iteration finds ID matches
if o.ob.Bids[target].ID == updts.Bids[x].ID {
if o.ob.Bids[target].Price != updts.Bids[x].Price {
// Price change occurred so correct bid alignment is
// needed - delete instance and insert into correct
// price level
o.ob.Bids = append(o.ob.Bids[:target], o.ob.Bids[target+1:]...)
break
}
o.ob.Bids[target].Amount = updts.Bids[x].Amount
continue updateBids
}
}
o.Bids = append(o.Bids, u.Bids[x])
insertBid(updts.Bids[x], &o.ob.Bids)
}
updateAsks:
for x := range u.Asks {
for y := range o.Asks {
if o.Asks[y].ID == u.Asks[x].ID {
o.Asks[y].Amount = u.Asks[x].Amount
for x := range updts.Asks {
for target := range o.ob.Asks {
if o.ob.Asks[target].ID == updts.Asks[x].ID {
if o.ob.Asks[target].Price != updts.Asks[x].Price {
// Price change occurred so correct ask alignment is
// needed - delete instance and insert into correct
// price level
o.ob.Asks = append(o.ob.Asks[:target], o.ob.Asks[target+1:]...)
break
}
o.ob.Asks[target].Amount = updts.Asks[x].Amount
continue updateAsks
}
}
o.Asks = append(o.Asks, u.Asks[x])
insertAsk(updts.Asks[x], &o.ob.Asks)
}
default:
return fmt.Errorf("invalid action [%s]", updts.Action)
}
return nil
}
// applyUpdates amends amount by ID and returns an error if not found
func applyUpdates(updts, book []orderbook.Item) error {
updates:
for x := range updts {
for y := range book {
if book[y].ID == updts[x].ID {
book[y].Amount = updts[x].Amount
continue updates
}
}
return fmt.Errorf("update cannot be applied id: %d not found",
updts[x].ID)
}
return nil
}
// deleteUpdates removes updates from orderbook and returns an error if not
// found
func deleteUpdates(updt []orderbook.Item, book *[]orderbook.Item, bypassErr bool) error {
updates:
for x := range updt {
for y := range *book {
if (*book)[y].ID == updt[x].ID {
*book = append((*book)[:y], (*book)[y+1:]...) // nolint:gocritic
continue updates
}
}
// bypassErr is for expected duplication from endpoint.
if !bypassErr {
return fmt.Errorf("update cannot be deleted id: %d not found",
updt[x].ID)
}
}
return nil
}
func insertAsk(updt orderbook.Item, book *[]orderbook.Item) {
for target := range *book {
if updt.Price < (*book)[target].Price {
insertItem(updt, book, target)
return
}
}
*book = append(*book, updt)
}
func insertBid(updt orderbook.Item, book *[]orderbook.Item) {
for target := range *book {
if updt.Price > (*book)[target].Price {
insertItem(updt, book, target)
return
}
}
*book = append(*book, updt)
}
// insertUpdatesBid inserts on **correctly aligned** book at price level
func insertUpdatesBid(updt []orderbook.Item, book *[]orderbook.Item) {
updates:
for x := range updt {
for target := range *book {
if updt[x].Price > (*book)[target].Price {
insertItem(updt[x], book, target)
continue updates
}
}
*book = append(*book, updt[x])
}
}
// LoadSnapshot loads initial snapshot of ob data, overwrite allows full
// ob to be completely rewritten because the exchange is a doing a full
// update not an incremental one
func (w *Orderbook) LoadSnapshot(newOrderbook *orderbook.Base) error {
if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 {
return fmt.Errorf("%v snapshot ask and bids are nil", w.exchangeName)
// insertUpdatesBid inserts on **correctly aligned** book at price level
func insertUpdatesAsk(updt []orderbook.Item, book *[]orderbook.Item) {
updates:
for x := range updt {
for target := range *book {
if updt[x].Price < (*book)[target].Price {
insertItem(updt[x], book, target)
continue updates
}
}
*book = append(*book, updt[x])
}
}
if newOrderbook.Pair.IsEmpty() {
return errors.New("websocket orderbook pair unset")
}
if newOrderbook.AssetType.String() == "" {
return errors.New("websocket orderbook asset type unset")
}
if newOrderbook.ExchangeName == "" {
return errors.New("websocket orderbook exchange name unset")
}
// insertItem inserts item in slice by target element this is an optimization
// to reduce the need for sorting algorithms
func insertItem(update orderbook.Item, book *[]orderbook.Item, target int) {
// TODO: extend slice by incoming update length before this gets hit
*book = append(*book, orderbook.Item{})
copy((*book)[target+1:], (*book)[target:])
(*book)[target] = update
}
// LoadSnapshot loads initial snapshot of ob data from websocket
func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
w.m.Lock()
defer w.m.Unlock()
if w.ob == nil {
w.ob = make(map[currency.Pair]map[asset.Item]*orderbook.Base)
}
if w.ob[newOrderbook.Pair] == nil {
w.ob[newOrderbook.Pair] = make(map[asset.Item]*orderbook.Base)
}
w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook
err := newOrderbook.Process()
err := book.Process()
if err != nil {
return err
}
w.dataHandler <- newOrderbook
m1, ok := w.ob[book.Pair.Base]
if !ok {
m1 = make(map[currency.Code]map[asset.Item]*orderbookHolder)
w.ob[book.Pair.Base] = m1
}
m2, ok := m1[book.Pair.Quote]
if !ok {
m2 = make(map[asset.Item]*orderbookHolder)
m1[book.Pair.Quote] = m2
}
m3, ok := m2[book.AssetType]
if !ok {
m3 = &orderbookHolder{ob: book, buffer: &[]Update{}}
m2[book.AssetType] = m3
} else {
m3.ob.Bids = book.Bids
m3.ob.Asks = book.Asks
}
w.dataHandler <- book
return nil
}
// GetOrderbook use sparingly. Modifying anything here will ruin hash
// calculation and cause problems
// GetOrderbook returns orderbook stored in current buffer
func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) *orderbook.Base {
w.m.Lock()
ob := w.ob[p][a]
w.m.Unlock()
return ob
defer w.m.Unlock()
ptr, ok := w.ob[p.Base][p.Quote][a]
if !ok {
return nil
}
cpy := *ptr.ob
cpy.Asks = append(cpy.Asks[:0:0], cpy.Asks...)
cpy.Bids = append(cpy.Bids[:0:0], cpy.Bids...)
return &cpy
}
// FlushBuffer flushes w.ob data to be garbage collected and refreshed when a
// connection is lost and reconnected
func (w *Orderbook) FlushBuffer() {
w.m.Lock()
w.ob = nil
w.buffer = nil
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
w.m.Unlock()
}
// FlushOrderbook flushes independent orderbook
func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error {
w.m.Lock()
defer w.m.Unlock()
book, ok := w.ob[p.Base][p.Quote][a]
if !ok {
return fmt.Errorf("orderbook not associated with pair: [%s] and asset [%s]", p, a)
}
book.ob.Bids = nil
book.ob.Asks = nil
return nil
}