mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
stream/buffer: Reduces map lookups by using key struct (#1309)
* stream/buffer: Adds key map optimisation (cherry-pick) * stream/buffer: Add buffer to DataHandler intermediary. Add field InitialUpdate bool to toggle when first update is seen for initial sync. * whoops * buffer: Add difference benchmarks for reference * glorious: nits (reverting out of context changes) * RM unused error that will be used later * purge: benchmark --------- Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
@@ -60,7 +60,7 @@ func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandle
|
||||
w.updateEntriesByID = c.UpdateEntriesByID
|
||||
w.exchangeName = exchangeConfig.Name
|
||||
w.dataHandler = dataHandler
|
||||
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
w.ob = make(map[Key]*orderbookHolder)
|
||||
w.verbose = exchangeConfig.Verbose
|
||||
|
||||
// set default publish period if missing
|
||||
@@ -91,9 +91,9 @@ func (w *Orderbook) Update(u *orderbook.Update) error {
|
||||
if err := w.validate(u); err != nil {
|
||||
return err
|
||||
}
|
||||
w.m.Lock()
|
||||
defer w.m.Unlock()
|
||||
book, ok := w.ob[u.Pair.Base][u.Pair.Quote][u.Asset]
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
book, ok := w.ob[Key{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}]
|
||||
if !ok {
|
||||
return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s",
|
||||
errDepthNotFound,
|
||||
@@ -306,19 +306,9 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
|
||||
return err
|
||||
}
|
||||
|
||||
w.m.Lock()
|
||||
defer w.m.Unlock()
|
||||
m1, ok := w.ob[book.Pair.Base]
|
||||
if !ok {
|
||||
m1 = make(map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
w.ob[book.Pair.Base] = m1
|
||||
}
|
||||
m2, ok := m1[book.Pair.Quote]
|
||||
if !ok {
|
||||
m2 = make(map[asset.Item]*orderbookHolder)
|
||||
m1[book.Pair.Quote] = m2
|
||||
}
|
||||
holder, ok := m2[book.Asset]
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
holder, ok := w.ob[Key{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}]
|
||||
if !ok {
|
||||
// Associate orderbook pointer with local exchange depth map
|
||||
var depth *orderbook.Depth
|
||||
@@ -333,16 +323,11 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
|
||||
if w.publishPeriod != 0 {
|
||||
ticker = time.NewTicker(w.publishPeriod)
|
||||
}
|
||||
holder = &orderbookHolder{
|
||||
ob: depth,
|
||||
buffer: &buffer,
|
||||
ticker: ticker,
|
||||
}
|
||||
m2[book.Asset] = holder
|
||||
holder = &orderbookHolder{ob: depth, buffer: &buffer, ticker: ticker}
|
||||
w.ob[Key{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] = holder
|
||||
}
|
||||
|
||||
holder.updateID = book.LastUpdateID
|
||||
|
||||
holder.ob.LoadSnapshot(book.Bids,
|
||||
book.Asks,
|
||||
book.LastUpdateID,
|
||||
@@ -370,15 +355,11 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
|
||||
|
||||
// GetOrderbook returns an orderbook copy as orderbook.Base
|
||||
func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base, error) {
|
||||
w.m.Lock()
|
||||
defer w.m.Unlock()
|
||||
book, ok := w.ob[p.Base][p.Quote][a]
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
book, ok := w.ob[Key{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s %s %s %w",
|
||||
w.exchangeName,
|
||||
p,
|
||||
a,
|
||||
errDepthNotFound)
|
||||
return nil, fmt.Errorf("%s %s %s %w", w.exchangeName, p, a, errDepthNotFound)
|
||||
}
|
||||
return book.ob.Retrieve()
|
||||
}
|
||||
@@ -386,16 +367,16 @@ func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base
|
||||
// FlushBuffer flushes w.ob data to be garbage collected and refreshed when a
|
||||
// connection is lost and reconnected
|
||||
func (w *Orderbook) FlushBuffer() {
|
||||
w.m.Lock()
|
||||
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
w.m.Unlock()
|
||||
w.mtx.Lock()
|
||||
w.ob = make(map[Key]*orderbookHolder)
|
||||
w.mtx.Unlock()
|
||||
}
|
||||
|
||||
// FlushOrderbook flushes independent orderbook
|
||||
func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error {
|
||||
w.m.Lock()
|
||||
defer w.m.Unlock()
|
||||
book, ok := w.ob[p.Base][p.Quote][a]
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
book, ok := w.ob[Key{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot flush orderbook %s %s %s %w",
|
||||
w.exchangeName,
|
||||
|
||||
@@ -43,7 +43,7 @@ func createSnapshot() (holder *Orderbook, asks, bids orderbook.Items, err error)
|
||||
PriceDuplication: true,
|
||||
}
|
||||
|
||||
newBook := make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
newBook := make(map[Key]*orderbookHolder)
|
||||
|
||||
ch := make(chan interface{})
|
||||
go func(<-chan interface{}) { // reader
|
||||
@@ -57,7 +57,7 @@ func createSnapshot() (holder *Orderbook, asks, bids orderbook.Items, err error)
|
||||
ob: newBook,
|
||||
}
|
||||
err = holder.LoadSnapshot(book)
|
||||
return
|
||||
return holder, asks, bids, err
|
||||
}
|
||||
|
||||
func bidAskGenerator() []orderbook.Item {
|
||||
@@ -92,7 +92,7 @@ func BenchmarkUpdateBidsByPrice(b *testing.B) {
|
||||
UpdateTime: time.Now(),
|
||||
Asset: asset.Spot,
|
||||
}
|
||||
holder := ob.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
holder := ob.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
holder.updateByPrice(update)
|
||||
}
|
||||
}
|
||||
@@ -112,7 +112,7 @@ func BenchmarkUpdateAsksByPrice(b *testing.B) {
|
||||
UpdateTime: time.Now(),
|
||||
Asset: asset.Spot,
|
||||
}
|
||||
holder := ob.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
holder := ob.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
holder.updateByPrice(update)
|
||||
}
|
||||
}
|
||||
@@ -239,7 +239,7 @@ func TestUpdates(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
book.updateByPrice(&orderbook.Update{
|
||||
Bids: itemArray[5],
|
||||
Asks: itemArray[5],
|
||||
@@ -295,7 +295,7 @@ func TestHittingTheBuffer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
askLen, err := book.ob.GetAskLength()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
@@ -343,7 +343,7 @@ func TestInsertWithIDs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
askLen, err := book.ob.GetAskLength()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
@@ -385,7 +385,7 @@ func TestSortIDs(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
askLen, err := book.ob.GetAskLength()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
@@ -429,7 +429,7 @@ func TestOutOfOrderIDs(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
cpy, err := book.ob.Retrieve()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
@@ -559,7 +559,7 @@ func TestRunUpdateWithoutAnyUpdates(t *testing.T) {
|
||||
func TestRunSnapshotWithNoData(t *testing.T) {
|
||||
t.Parallel()
|
||||
var obl Orderbook
|
||||
obl.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
obl.ob = make(map[Key]*orderbookHolder)
|
||||
obl.dataHandler = make(chan interface{}, 1)
|
||||
var snapShot1 orderbook.Base
|
||||
snapShot1.Asset = asset.Spot
|
||||
@@ -577,7 +577,7 @@ func TestLoadSnapshot(t *testing.T) {
|
||||
t.Parallel()
|
||||
var obl Orderbook
|
||||
obl.dataHandler = make(chan interface{}, 100)
|
||||
obl.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
obl.ob = make(map[Key]*orderbookHolder)
|
||||
var snapShot1 orderbook.Base
|
||||
snapShot1.Exchange = "SnapshotWithOverride"
|
||||
asks := []orderbook.Item{
|
||||
@@ -602,11 +602,11 @@ func TestFlushBuffer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if obl.ob[cp.Base][cp.Quote][asset.Spot] == nil {
|
||||
if obl.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] == nil {
|
||||
t.Error("expected ob to have ask entries")
|
||||
}
|
||||
obl.FlushBuffer()
|
||||
if obl.ob[cp.Base][cp.Quote][asset.Spot] != nil {
|
||||
if obl.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] != nil {
|
||||
t.Error("expected ob be flushed")
|
||||
}
|
||||
}
|
||||
@@ -616,7 +616,7 @@ func TestInsertingSnapShots(t *testing.T) {
|
||||
t.Parallel()
|
||||
var holder Orderbook
|
||||
holder.dataHandler = make(chan interface{}, 100)
|
||||
holder.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
|
||||
holder.ob = make(map[Key]*orderbookHolder)
|
||||
var snapShot1 orderbook.Base
|
||||
snapShot1.Exchange = "WSORDERBOOKTEST1"
|
||||
asks := []orderbook.Item{
|
||||
@@ -779,7 +779,7 @@ func TestGetOrderbook(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
bufferOb := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
bufferOb := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
b, err := bufferOb.ob.Retrieve()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
@@ -872,7 +872,7 @@ func TestEnsureMultipleUpdatesViaPrice(t *testing.T) {
|
||||
}
|
||||
|
||||
asks := bidAskGenerator()
|
||||
book := holder.ob[cp.Base][cp.Quote][asset.Spot]
|
||||
book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}]
|
||||
book.updateByPrice(&orderbook.Update{
|
||||
Bids: asks,
|
||||
Asks: asks,
|
||||
|
||||
@@ -30,7 +30,7 @@ type Config struct {
|
||||
// Orderbook defines a local cache of orderbooks for amending, appending
|
||||
// and deleting changes and updates the main store for a stream
|
||||
type Orderbook struct {
|
||||
ob map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder
|
||||
ob map[Key]*orderbookHolder
|
||||
obBufferLimit int
|
||||
bufferEnabled bool
|
||||
sortBuffer bool
|
||||
@@ -47,7 +47,12 @@ type Orderbook struct {
|
||||
checksum func(state *orderbook.Base, checksum uint32) error
|
||||
|
||||
publishPeriod time.Duration
|
||||
m sync.Mutex
|
||||
|
||||
// TODO: sync.RWMutex. For the moment we process the orderbook in a single
|
||||
// thread. In future when there are workers directly involved this can be
|
||||
// can be improved with RW mechanics which will allow updates to occur at
|
||||
// the same time on different books.
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
// orderbookHolder defines a store of pending updates and a pointer to the
|
||||
@@ -62,3 +67,10 @@ type orderbookHolder struct {
|
||||
ticker *time.Ticker
|
||||
updateID int64
|
||||
}
|
||||
|
||||
// Key defines a unique orderbook key for a specific pair and asset
|
||||
type Key struct {
|
||||
Base *currency.Item
|
||||
Quote *currency.Item
|
||||
Asset asset.Item
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultJobBuffer = 1000
|
||||
defaultJobBuffer = 5000
|
||||
// defaultTrafficPeriod defines a period of pause for the traffic monitor,
|
||||
// as there are periods with large incoming traffic alerts which requires a
|
||||
// timer reset, this limits work on this routine to a more effective rate
|
||||
@@ -62,7 +62,7 @@ func SetupGlobalReporter(r Reporter) {
|
||||
func New() *Websocket {
|
||||
return &Websocket{
|
||||
Init: true,
|
||||
DataHandler: make(chan interface{}),
|
||||
DataHandler: make(chan interface{}, defaultJobBuffer),
|
||||
ToRoutine: make(chan interface{}, defaultJobBuffer),
|
||||
TrafficAlert: make(chan struct{}),
|
||||
ReadMessageErrors: make(chan error),
|
||||
|
||||
Reference in New Issue
Block a user