diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index 473683a8..dd9c044e 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -60,7 +60,7 @@ func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandle w.updateEntriesByID = c.UpdateEntriesByID w.exchangeName = exchangeConfig.Name w.dataHandler = dataHandler - w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) + w.ob = make(map[Key]*orderbookHolder) w.verbose = exchangeConfig.Verbose // set default publish period if missing @@ -91,9 +91,9 @@ func (w *Orderbook) Update(u *orderbook.Update) error { if err := w.validate(u); err != nil { return err } - w.m.Lock() - defer w.m.Unlock() - book, ok := w.ob[u.Pair.Base][u.Pair.Quote][u.Asset] + w.mtx.Lock() + defer w.mtx.Unlock() + book, ok := w.ob[Key{Base: u.Pair.Base.Item, Quote: u.Pair.Quote.Item, Asset: u.Asset}] if !ok { return fmt.Errorf("%w for Exchange %s CurrencyPair: %s AssetType: %s", errDepthNotFound, @@ -306,19 +306,9 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { return err } - w.m.Lock() - defer w.m.Unlock() - m1, ok := w.ob[book.Pair.Base] - if !ok { - m1 = make(map[currency.Code]map[asset.Item]*orderbookHolder) - w.ob[book.Pair.Base] = m1 - } - m2, ok := m1[book.Pair.Quote] - if !ok { - m2 = make(map[asset.Item]*orderbookHolder) - m1[book.Pair.Quote] = m2 - } - holder, ok := m2[book.Asset] + w.mtx.Lock() + defer w.mtx.Unlock() + holder, ok := w.ob[Key{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] if !ok { // Associate orderbook pointer with local exchange depth map var depth *orderbook.Depth @@ -333,16 +323,11 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { if w.publishPeriod != 0 { ticker = time.NewTicker(w.publishPeriod) } - holder = &orderbookHolder{ - ob: depth, - buffer: &buffer, - ticker: ticker, - } - m2[book.Asset] = holder + holder = &orderbookHolder{ob: depth, buffer: &buffer, ticker: ticker} + w.ob[Key{Base: book.Pair.Base.Item, Quote: book.Pair.Quote.Item, Asset: book.Asset}] = holder } holder.updateID = book.LastUpdateID - holder.ob.LoadSnapshot(book.Bids, book.Asks, book.LastUpdateID, @@ -370,15 +355,11 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { // GetOrderbook returns an orderbook copy as orderbook.Base func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base, error) { - w.m.Lock() - defer w.m.Unlock() - book, ok := w.ob[p.Base][p.Quote][a] + w.mtx.Lock() + defer w.mtx.Unlock() + book, ok := w.ob[Key{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] if !ok { - return nil, fmt.Errorf("%s %s %s %w", - w.exchangeName, - p, - a, - errDepthNotFound) + return nil, fmt.Errorf("%s %s %s %w", w.exchangeName, p, a, errDepthNotFound) } return book.ob.Retrieve() } @@ -386,16 +367,16 @@ func (w *Orderbook) GetOrderbook(p currency.Pair, a asset.Item) (*orderbook.Base // FlushBuffer flushes w.ob data to be garbage collected and refreshed when a // connection is lost and reconnected func (w *Orderbook) FlushBuffer() { - w.m.Lock() - w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) - w.m.Unlock() + w.mtx.Lock() + w.ob = make(map[Key]*orderbookHolder) + w.mtx.Unlock() } // FlushOrderbook flushes independent orderbook func (w *Orderbook) FlushOrderbook(p currency.Pair, a asset.Item) error { - w.m.Lock() - defer w.m.Unlock() - book, ok := w.ob[p.Base][p.Quote][a] + w.mtx.Lock() + defer w.mtx.Unlock() + book, ok := w.ob[Key{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] if !ok { return fmt.Errorf("cannot flush orderbook %s %s %s %w", w.exchangeName, diff --git a/exchanges/stream/buffer/buffer_test.go b/exchanges/stream/buffer/buffer_test.go index b3009e04..582a51dc 100644 --- a/exchanges/stream/buffer/buffer_test.go +++ b/exchanges/stream/buffer/buffer_test.go @@ -43,7 +43,7 @@ func createSnapshot() (holder *Orderbook, asks, bids orderbook.Items, err error) PriceDuplication: true, } - newBook := make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) + newBook := make(map[Key]*orderbookHolder) ch := make(chan interface{}) go func(<-chan interface{}) { // reader @@ -57,7 +57,7 @@ func createSnapshot() (holder *Orderbook, asks, bids orderbook.Items, err error) ob: newBook, } err = holder.LoadSnapshot(book) - return + return holder, asks, bids, err } func bidAskGenerator() []orderbook.Item { @@ -92,7 +92,7 @@ func BenchmarkUpdateBidsByPrice(b *testing.B) { UpdateTime: time.Now(), Asset: asset.Spot, } - holder := ob.ob[cp.Base][cp.Quote][asset.Spot] + holder := ob.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] holder.updateByPrice(update) } } @@ -112,7 +112,7 @@ func BenchmarkUpdateAsksByPrice(b *testing.B) { UpdateTime: time.Now(), Asset: asset.Spot, } - holder := ob.ob[cp.Base][cp.Quote][asset.Spot] + holder := ob.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] holder.updateByPrice(update) } } @@ -239,7 +239,7 @@ func TestUpdates(t *testing.T) { t.Error(err) } - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] book.updateByPrice(&orderbook.Update{ Bids: itemArray[5], Asks: itemArray[5], @@ -295,7 +295,7 @@ func TestHittingTheBuffer(t *testing.T) { } } - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] askLen, err := book.ob.GetAskLength() if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) @@ -343,7 +343,7 @@ func TestInsertWithIDs(t *testing.T) { } } - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] askLen, err := book.ob.GetAskLength() if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) @@ -385,7 +385,7 @@ func TestSortIDs(t *testing.T) { t.Fatal(err) } } - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] askLen, err := book.ob.GetAskLength() if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) @@ -429,7 +429,7 @@ func TestOutOfOrderIDs(t *testing.T) { t.Fatal(err) } } - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] cpy, err := book.ob.Retrieve() if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) @@ -559,7 +559,7 @@ func TestRunUpdateWithoutAnyUpdates(t *testing.T) { func TestRunSnapshotWithNoData(t *testing.T) { t.Parallel() var obl Orderbook - obl.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) + obl.ob = make(map[Key]*orderbookHolder) obl.dataHandler = make(chan interface{}, 1) var snapShot1 orderbook.Base snapShot1.Asset = asset.Spot @@ -577,7 +577,7 @@ func TestLoadSnapshot(t *testing.T) { t.Parallel() var obl Orderbook obl.dataHandler = make(chan interface{}, 100) - obl.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) + obl.ob = make(map[Key]*orderbookHolder) var snapShot1 orderbook.Base snapShot1.Exchange = "SnapshotWithOverride" asks := []orderbook.Item{ @@ -602,11 +602,11 @@ func TestFlushBuffer(t *testing.T) { if err != nil { t.Fatal(err) } - if obl.ob[cp.Base][cp.Quote][asset.Spot] == nil { + if obl.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] == nil { t.Error("expected ob to have ask entries") } obl.FlushBuffer() - if obl.ob[cp.Base][cp.Quote][asset.Spot] != nil { + if obl.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] != nil { t.Error("expected ob be flushed") } } @@ -616,7 +616,7 @@ func TestInsertingSnapShots(t *testing.T) { t.Parallel() var holder Orderbook holder.dataHandler = make(chan interface{}, 100) - holder.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) + holder.ob = make(map[Key]*orderbookHolder) var snapShot1 orderbook.Base snapShot1.Exchange = "WSORDERBOOKTEST1" asks := []orderbook.Item{ @@ -779,7 +779,7 @@ func TestGetOrderbook(t *testing.T) { if err != nil { t.Fatal(err) } - bufferOb := holder.ob[cp.Base][cp.Quote][asset.Spot] + bufferOb := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] b, err := bufferOb.ob.Retrieve() if !errors.Is(err, nil) { t.Fatalf("received: '%v' but expected: '%v'", err, nil) @@ -872,7 +872,7 @@ func TestEnsureMultipleUpdatesViaPrice(t *testing.T) { } asks := bidAskGenerator() - book := holder.ob[cp.Base][cp.Quote][asset.Spot] + book := holder.ob[Key{Base: cp.Base.Item, Quote: cp.Quote.Item, Asset: asset.Spot}] book.updateByPrice(&orderbook.Update{ Bids: asks, Asks: asks, diff --git a/exchanges/stream/buffer/buffer_types.go b/exchanges/stream/buffer/buffer_types.go index a2aba1a4..bc66df5c 100644 --- a/exchanges/stream/buffer/buffer_types.go +++ b/exchanges/stream/buffer/buffer_types.go @@ -30,7 +30,7 @@ type Config struct { // Orderbook defines a local cache of orderbooks for amending, appending // and deleting changes and updates the main store for a stream type Orderbook struct { - ob map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder + ob map[Key]*orderbookHolder obBufferLimit int bufferEnabled bool sortBuffer bool @@ -47,7 +47,12 @@ type Orderbook struct { checksum func(state *orderbook.Base, checksum uint32) error publishPeriod time.Duration - m sync.Mutex + + // TODO: sync.RWMutex. For the moment we process the orderbook in a single + // thread. In future when there are workers directly involved this can be + // can be improved with RW mechanics which will allow updates to occur at + // the same time on different books. + mtx sync.Mutex } // orderbookHolder defines a store of pending updates and a pointer to the @@ -62,3 +67,10 @@ type orderbookHolder struct { ticker *time.Ticker updateID int64 } + +// Key defines a unique orderbook key for a specific pair and asset +type Key struct { + Base *currency.Item + Quote *currency.Item + Asset asset.Item +} diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index a39c0f6f..7d9c9e4e 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -15,7 +15,7 @@ import ( ) const ( - defaultJobBuffer = 1000 + defaultJobBuffer = 5000 // defaultTrafficPeriod defines a period of pause for the traffic monitor, // as there are periods with large incoming traffic alerts which requires a // timer reset, this limits work on this routine to a more effective rate @@ -62,7 +62,7 @@ func SetupGlobalReporter(r Reporter) { func New() *Websocket { return &Websocket{ Init: true, - DataHandler: make(chan interface{}), + DataHandler: make(chan interface{}, defaultJobBuffer), ToRoutine: make(chan interface{}, defaultJobBuffer), TrafficAlert: make(chan struct{}), ReadMessageErrors: make(chan error),