diff --git a/exchanges/okx/okx_test.go b/exchanges/okx/okx_test.go index 202e7ec7..1301a2ce 100644 --- a/exchanges/okx/okx_test.go +++ b/exchanges/okx/okx_test.go @@ -4470,13 +4470,13 @@ func TestWsProcessOrderbook5(t *testing.T) { require.NoError(t, err) required := currency.NewPairWithDelimiter("OKB", "USDT", "-") - got, err := orderbook.Get("okx", required, asset.Spot) + got, err := orderbook.Get(ok.Name, required, asset.Spot) require.NoError(t, err) require.Len(t, got.Asks, 5) require.Len(t, got.Bids, 5) // Book replicated to margin - got, err = orderbook.Get("okx", required, asset.Margin) + got, err = orderbook.Get(ok.Name, required, asset.Margin) require.NoError(t, err) require.Len(t, got.Asks, 5) assert.Len(t, got.Bids, 5) diff --git a/exchanges/orderbook/depth.go b/exchanges/orderbook/depth.go index fa59e45c..2eb07acf 100644 --- a/exchanges/orderbook/depth.go +++ b/exchanges/orderbook/depth.go @@ -41,7 +41,7 @@ type Depth struct { alert.Notice mux *dispatch.Mux - _ID uuid.UUID + id uuid.UUID options @@ -53,12 +53,12 @@ type Depth struct { // NewDepth returns a new orderbook depth func NewDepth(id uuid.UUID) *Depth { - return &Depth{_ID: id, mux: service.Mux} + return &Depth{id: id, mux: s.signalMux} } // Publish alerts any subscribed routines using a dispatch mux func (d *Depth) Publish() { - if err := d.mux.Publish(Outbound(d), d._ID); err != nil { + if err := d.mux.Publish(Outbound(d), d.id); err != nil { log.Errorf(log.ExchangeSys, "Cannot publish orderbook update to mux %v", err) } } diff --git a/exchanges/orderbook/orderbook.go b/exchanges/orderbook/orderbook.go index 02759416..8b8149b2 100644 --- a/exchanges/orderbook/orderbook.go +++ b/exchanges/orderbook/orderbook.go @@ -3,7 +3,6 @@ package orderbook import ( "fmt" "sort" - "strings" "time" "github.com/thrasher-corp/gocryptotrader/common/key" @@ -15,71 +14,71 @@ import ( // Get checks and returns the orderbook given an exchange name and currency pair func Get(exchange string, p currency.Pair, a asset.Item) (*Base, error) { - return service.Retrieve(exchange, p, a) + return s.Retrieve(exchange, p, a) } // GetDepth returns a Depth pointer allowing the caller to stream orderbook changes func GetDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { - return service.GetDepth(exchange, p, a) + return s.GetDepth(exchange, p, a) } // DeployDepth sets a depth struct and returns a depth pointer. This allows for // the loading of a new orderbook snapshot and incremental updates via the // streaming package. func DeployDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { - return service.DeployDepth(exchange, p, a) + return s.DeployDepth(exchange, p, a) } // SubscribeToExchangeOrderbooks returns a pipe to an exchange feed func SubscribeToExchangeOrderbooks(exchange string) (dispatch.Pipe, error) { - service.mu.Lock() - defer service.mu.Unlock() - exch, ok := service.books[strings.ToLower(exchange)] + s.m.RLock() + defer s.m.RUnlock() + id, ok := s.exchangeRouters[exchange] if !ok { return dispatch.Pipe{}, fmt.Errorf("%w for %s exchange", ErrOrderbookNotFound, exchange) } - return service.Mux.Subscribe(exch.ID) + return s.signalMux.Subscribe(id) } // Update stores orderbook data -func (s *Service) Update(b *Base) error { - name := strings.ToLower(b.Exchange) - mapKey := key.PairAsset{ - Base: b.Pair.Base.Item, - Quote: b.Pair.Quote.Item, - Asset: b.Asset, - } - - s.mu.Lock() - m1, ok := s.books[name] +func (s *store) Update(b *Base) error { + s.m.RLock() + book, ok := s.orderbooks[key.ExchangePairAsset{Exchange: b.Exchange, Base: b.Pair.Base.Item, Quote: b.Pair.Quote.Item, Asset: b.Asset}] + s.m.RUnlock() if !ok { - id, err := s.Mux.GetID() + var err error + book, err = s.track(b) if err != nil { - s.mu.Unlock() return err } - m1 = Exchange{ - m: make(map[key.PairAsset]*Depth), - ID: id, - } - s.books[name] = m1 } - book, ok := m1.m[mapKey] - if !ok { - book = NewDepth(m1.ID) - book.AssignOptions(b) - m1.m[mapKey] = book - } - err := book.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.UpdatePushedAt, true) - s.mu.Unlock() - if err != nil { + if err := book.Depth.LoadSnapshot(b.Bids, b.Asks, b.LastUpdateID, b.LastUpdated, b.UpdatePushedAt, true); err != nil { return err } - return s.Mux.Publish(book, m1.ID) + return s.signalMux.Publish(book.Depth, book.RouterID) +} + +func (s *store) track(b *Base) (book, error) { + s.m.Lock() + defer s.m.Unlock() + id, ok := s.exchangeRouters[b.Exchange] + if !ok { + exchangeID, err := s.signalMux.GetID() + if err != nil { + return book{}, err + } + id = exchangeID + s.exchangeRouters[b.Exchange] = id + } + depth := NewDepth(id) + depth.AssignOptions(b) + ob := book{RouterID: id, Depth: depth} + s.orderbooks[key.ExchangePairAsset{Exchange: b.Exchange, Base: b.Pair.Base.Item, Quote: b.Pair.Quote.Item, Asset: b.Asset}] = ob + return ob, nil } // DeployDepth used for subsystem deployment creates a depth item in the struct then returns a ptr to that Depth item -func (s *Service) DeployDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { +func (s *store) DeployDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { if exchange == "" { return nil, errExchangeNameUnset } @@ -89,89 +88,49 @@ func (s *Service) DeployDepth(exchange string, p currency.Pair, a asset.Item) (* if !a.IsValid() { return nil, errAssetTypeNotSet } - mapKey := key.PairAsset{ - Base: p.Base.Item, - Quote: p.Quote.Item, - Asset: a, - } - s.mu.Lock() - defer s.mu.Unlock() - m1, ok := s.books[strings.ToLower(exchange)] + s.m.RLock() + ob, ok := s.orderbooks[key.ExchangePairAsset{Exchange: exchange, Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] + s.m.RUnlock() + var err error if !ok { - id, err := s.Mux.GetID() - if err != nil { - return nil, err - } - m1 = Exchange{ - m: make(map[key.PairAsset]*Depth), - ID: id, - } - s.books[strings.ToLower(exchange)] = m1 + ob, err = s.track(&Base{Exchange: exchange, Pair: p, Asset: a}) } - book, ok := m1.m[mapKey] - if ok { - // Maybe in future we should return an error here and be more strict. - return book, nil - } - book = NewDepth(m1.ID) - book.exchange = exchange - book.pair = p - book.asset = a - m1.m[mapKey] = book - return book, nil + return ob.Depth, err } // GetDepth returns the actual depth struct for potential subsystems and strategies to interact with -func (s *Service) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { - s.mu.Lock() - defer s.mu.Unlock() - m1, ok := s.books[strings.ToLower(exchange)] +func (s *store) GetDepth(exchange string, p currency.Pair, a asset.Item) (*Depth, error) { + s.m.RLock() + ob, ok := s.orderbooks[key.ExchangePairAsset{Exchange: exchange, Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] + s.m.RUnlock() if !ok { - return nil, fmt.Errorf("%w for %s exchange", ErrOrderbookNotFound, exchange) + return nil, fmt.Errorf("%w for %s %s %s", ErrOrderbookNotFound, exchange, p, a) } - - book, ok := m1.m[key.PairAsset{ - Base: p.Base.Item, - Quote: p.Quote.Item, - Asset: a, - }] - if !ok { - return nil, fmt.Errorf("%w associated with base currency %s", ErrOrderbookNotFound, p.Quote) - } - return book, nil + return ob.Depth, nil } // Retrieve gets orderbook depth data from the stored tranches and returns the // base equivalent copy -func (s *Service) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Base, error) { +func (s *store) Retrieve(exchange string, p currency.Pair, a asset.Item) (*Base, error) { if p.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } if !a.IsValid() { return nil, fmt.Errorf("%w %v", asset.ErrNotSupported, a) } - - s.mu.Lock() - defer s.mu.Unlock() - m1, ok := s.books[strings.ToLower(exchange)] + s.m.RLock() + ob, ok := s.orderbooks[key.ExchangePairAsset{Exchange: exchange, Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] + s.m.RUnlock() if !ok { - return nil, fmt.Errorf("%w for %s exchange", ErrOrderbookNotFound, exchange) + return nil, fmt.Errorf("%w for %s %s %s", ErrOrderbookNotFound, exchange, p, a) } - book, ok := m1.m[key.PairAsset{ - Base: p.Base.Item, - Quote: p.Quote.Item, - Asset: a, - }] - if !ok { - return nil, fmt.Errorf("%w associated with currency %s %s", ErrOrderbookNotFound, p, a) - } - return book.Retrieve() + return ob.Depth.Retrieve() } // GetDepth returns the concrete book allowing the caller to stream orderbook changes func (b *Base) GetDepth() (*Depth, error) { - return service.GetDepth(b.Exchange, b.Pair, b.Asset) + return s.GetDepth(b.Exchange, b.Pair, b.Asset) } // TotalBidsAmount returns the total amount of bids and the total orderbook @@ -303,14 +262,14 @@ func (b *Base) Process() error { return errAssetTypeNotSet } - if b.LastUpdated.IsZero() { + if b.LastUpdated.IsZero() { // TODO: Enforce setting this on all exchanges b.LastUpdated = time.Now() } if err := b.Verify(); err != nil { return err } - return service.Update(b) + return s.Update(b) } // Reverse reverses the order of orderbook items; some bid/asks are diff --git a/exchanges/orderbook/orderbook_test.go b/exchanges/orderbook/orderbook_test.go index c8f7bf3e..c2831d2b 100644 --- a/exchanges/orderbook/orderbook_test.go +++ b/exchanges/orderbook/orderbook_test.go @@ -696,3 +696,21 @@ func TestCheckAlignment(t *testing.T) { err = checkAlignment(itemWithFunding, true, true, false, true, dsc, "Binance") require.NoError(t, err) } + +// 5572401 210.9 ns/op 0 B/op 0 allocs/op (current) +// 3748009 312.7 ns/op 32 B/op 1 allocs/op (previous) +func BenchmarkProcess(b *testing.B) { + base := &Base{ + Pair: currency.NewBTCUSD(), + Asks: make(Tranches, 100), + Bids: make(Tranches, 100), + Exchange: "BenchmarkProcessOrderbook", + Asset: asset.Spot, + } + + for b.Loop() { + if err := base.Process(); err != nil { + b.Fatal(err) + } + } +} diff --git a/exchanges/orderbook/orderbook_types.go b/exchanges/orderbook/orderbook_types.go index 47231c04..f55df966 100644 --- a/exchanges/orderbook/orderbook_types.go +++ b/exchanges/orderbook/orderbook_types.go @@ -38,23 +38,23 @@ var ( errChecksumStringNotSet = errors.New("checksum string not set") ) -var service = Service{ - books: make(map[string]Exchange), - Mux: dispatch.GetNewMux(nil), +var s = store{ + orderbooks: make(map[key.ExchangePairAsset]book), + exchangeRouters: make(map[string]uuid.UUID), + signalMux: dispatch.GetNewMux(nil), } -// Service provides a store for difference exchange orderbooks -type Service struct { - books map[string]Exchange - *dispatch.Mux - mu sync.Mutex +type book struct { + RouterID uuid.UUID + Depth *Depth } -// Exchange defines a holder for the exchange specific depth items with a -// specific ID associated with that exchange -type Exchange struct { - m map[key.PairAsset]*Depth - ID uuid.UUID +// store provides a centralised store for orderbooks +type store struct { + orderbooks map[key.ExchangePairAsset]book + exchangeRouters map[string]uuid.UUID + signalMux *dispatch.Mux + m sync.RWMutex } // Tranche defines a segmented portions of an order or options book