diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index c17dde40..aed5e228 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -237,9 +238,11 @@ func (b *Binance) Setup(exch *config.Exchange) error { Unsubscriber: b.Unsubscribe, GenerateSubscriptions: b.GenerateSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, - SortBuffer: true, - SortBufferByUpdateIDs: true, - TradeFeed: b.Features.Enabled.TradeFeed, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + SortBufferByUpdateIDs: true, + }, + TradeFeed: b.Features.Enabled.TradeFeed, }) if err != nil { return err diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 769b3f15..5db6935c 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -24,6 +24,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -214,7 +215,9 @@ func (b *Bitfinex) Setup(exch *config.Exchange) error { Unsubscriber: b.Unsubscribe, GenerateSubscriptions: b.GenerateDefaultSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, - UpdateEntriesByID: true, + OrderbookBufferConfig: buffer.Config{ + UpdateEntriesByID: true, + }, }) if err != nil { return err diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index 011ba4d1..26a78293 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -24,6 +24,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -173,7 +174,9 @@ func (b *Bitmex) Setup(exch *config.Exchange) error { Unsubscriber: b.Unsubscribe, GenerateSubscriptions: b.GenerateDefaultSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, - UpdateEntriesByID: true, + OrderbookBufferConfig: buffer.Config{ + UpdateEntriesByID: true, + }, }) if err != nil { return err diff --git a/exchanges/bittrex/bittrex_wrapper.go b/exchanges/bittrex/bittrex_wrapper.go index f1d97718..f0fe590a 100644 --- a/exchanges/bittrex/bittrex_wrapper.go +++ b/exchanges/bittrex/bittrex_wrapper.go @@ -22,6 +22,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -173,12 +174,10 @@ func (b *Bittrex) Setup(exch *config.Exchange) error { Unsubscriber: b.Unsubscribe, // Unsubscriber function outlined above. GenerateSubscriptions: b.GenerateDefaultSubscriptions, // GenerateDefaultSubscriptions function outlined above. Features: &b.Features.Supports.WebsocketCapabilities, // Defines the capabilities of the websocket outlined in supported features struct. This allows the websocket connection to be flushed appropriately if we have a pair/asset enable/disable change. This is outlined below. - - // Orderbook buffer specific variables for processing orderbook updates via websocket feed. - // Other orderbook buffer vars: - // UpdateEntriesByID bool - SortBuffer: true, - SortBufferByUpdateIDs: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + SortBufferByUpdateIDs: true, + }, }) if err != nil { return err diff --git a/exchanges/btcmarkets/btcmarkets.go b/exchanges/btcmarkets/btcmarkets.go index 4aa5f409..affe115e 100644 --- a/exchanges/btcmarkets/btcmarkets.go +++ b/exchanges/btcmarkets/btcmarkets.go @@ -111,7 +111,11 @@ func (b *BTCMarkets) GetTrades(ctx context.Context, marketID string, before, aft &trades) } -// GetOrderbook returns current orderbook +// GetOrderbook returns current orderbook. +// levels are: +// 0 - Returns the top bids and ask orders only. +// 1 - Returns top 50 bids and asks. +// 2 - Returns full orderbook. WARNING: This is cached every 10 seconds. func (b *BTCMarkets) GetOrderbook(ctx context.Context, marketID string, level int64) (Orderbook, error) { var orderbook Orderbook var temp tempOrderbook diff --git a/exchanges/btcmarkets/btcmarkets_test.go b/exchanges/btcmarkets/btcmarkets_test.go index 2903933d..80521ad7 100644 --- a/exchanges/btcmarkets/btcmarkets_test.go +++ b/exchanges/btcmarkets/btcmarkets_test.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" ) @@ -610,7 +611,8 @@ func TestWsOrderbookUpdate(t *testing.T) { "snapshotId": 1578512844045000, "bids": [ ["99.81", "1.2", 1 ], ["95.8", "0", 0 ]], "asks": [ ["100", "3.2", 2 ] ], - "messageType": "orderbookUpdate" + "messageType": "orderbookUpdate", + "checksum": "2513007604" }`) err = b.wsHandleData(pressXToJSON) if err != nil { @@ -845,3 +847,55 @@ func TestGetHistoricTrades(t *testing.T) { t.Error(err) } } + +func TestChecksum(t *testing.T) { + b := &orderbook.Base{ + Asks: []orderbook.Item{ + {Price: 0.3965, Amount: 44149.815}, + {Price: 0.3967, Amount: 16000.0}, + }, + Bids: []orderbook.Item{ + {Price: 0.396, Amount: 51.0}, + {Price: 0.396, Amount: 25.0}, + {Price: 0.3958, Amount: 18570.0}, + }, + } + + expecting := 3802968298 + err := checksum(b, uint32(expecting)) + if err != nil { + t.Fatal(err) + } + err = checksum(b, uint32(1223123)) + if !errors.Is(err, errChecksumFailure) { + t.Errorf("received '%v', expected '%v'", err, errChecksumFailure) + } +} + +func TestTrim(t *testing.T) { + testCases := []struct { + Value float64 + Expected string + }{ + {Value: 0.1234, Expected: "1234"}, + {Value: 0.00001234, Expected: "1234"}, + {Value: 32.00001234, Expected: "3200001234"}, + {Value: 0, Expected: ""}, + {Value: 0.0, Expected: ""}, + {Value: 1.0, Expected: "1"}, + {Value: 0.3965, Expected: "3965"}, + {Value: 16000.0, Expected: "16000"}, + {Value: 0.0019, Expected: "19"}, + {Value: 1.01, Expected: "101"}, + } + + for x := range testCases { + tt := testCases[x] + t.Run("", func(t *testing.T) { + received := trim(tt.Value) + if received != tt.Expected { + t.Fatalf("received: %v but expected: %v", received, tt.Expected) + } + }) + } +} diff --git a/exchanges/btcmarkets/btcmarkets_types.go b/exchanges/btcmarkets/btcmarkets_types.go index 614ab30f..e3df79fd 100644 --- a/exchanges/btcmarkets/btcmarkets_types.go +++ b/exchanges/btcmarkets/btcmarkets_types.go @@ -2,6 +2,9 @@ package btcmarkets import ( "time" + + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" ) // Market holds a tradable market instrument @@ -377,12 +380,14 @@ type WsTrade struct { // WsOrderbook message received for orderbook data type WsOrderbook struct { - Currency string `json:"marketId"` - Timestamp time.Time `json:"timestamp"` - Bids [][]interface{} `json:"bids"` - Asks [][]interface{} `json:"asks"` - MessageType string `json:"messageType"` - Snapshot bool `json:"snapshot"` + Currency currency.Pair `json:"marketId"` + Snapshot bool `json:"snapshot"` + Timestamp time.Time `json:"timestamp"` + SnapshotID int64 `json:"snapshotId"` + Bids WebsocketOrderbook `json:"bids"` + Asks WebsocketOrderbook `json:"asks"` + Checksum uint32 `json:"checksum,string"` + MessageType string `json:"messageType"` } // WsFundTransfer stores fund transfer data for websocket @@ -429,3 +434,7 @@ type WsError struct { // CandleResponse holds OHLCV data for exchange type CandleResponse [][6]string + +// WebsocketOrderbook defines a specific websocket orderbook type to directly +// unmarshal json. +type WebsocketOrderbook orderbook.Items diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 29b137bd..b7058601 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -5,8 +5,10 @@ import ( "encoding/json" "errors" "fmt" + "hash/crc32" "net/http" "strconv" + "strings" "time" "github.com/gorilla/websocket" @@ -27,6 +29,11 @@ const ( btcMarketsWSURL = "wss://socket.btcmarkets.net/v2" ) +var ( + errTypeAssertionFailure = errors.New("type assertion failure") + errChecksumFailure = errors.New("crc32 checksum failure") +) + // WsConnect connects to a websocket feed func (b *BTCMarkets) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { @@ -62,6 +69,51 @@ func (b *BTCMarkets) wsReadData() { } } +// UnmarshalJSON implements the unmarshaler interface. +func (w *WebsocketOrderbook) UnmarshalJSON(data []byte) error { + resp := make([][3]interface{}, len(data)) + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + + *w = WebsocketOrderbook(make(orderbook.Items, len(resp))) + for x := range resp { + sPrice, ok := resp[x][0].(string) + if !ok { + return fmt.Errorf("price string %w", errTypeAssertionFailure) + } + var price float64 + price, err = strconv.ParseFloat(sPrice, 64) + if err != nil { + return err + } + + sAmount, ok := resp[x][1].(string) + if !ok { + return fmt.Errorf("amount string %w", errTypeAssertionFailure) + } + + var amount float64 + amount, err = strconv.ParseFloat(sAmount, 64) + if err != nil { + return err + } + + count, ok := resp[x][2].(float64) + if !ok { + return fmt.Errorf("count float64 %w", errTypeAssertionFailure) + } + + (*w)[x] = orderbook.Item{ + Amount: amount, + Price: price, + OrderCount: int64(count), + } + } + return nil +} + func (b *BTCMarkets) wsHandleData(respRaw []byte) error { var wsResponse WsMessageType err := json.Unmarshal(respRaw, &wsResponse) @@ -80,51 +132,13 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error { return err } - p, err := currency.NewPairFromString(ob.Currency) - if err != nil { - return err - } - - var bids, asks orderbook.Items - for x := range ob.Bids { - var price, amount float64 - price, err = strconv.ParseFloat(ob.Bids[x][0].(string), 64) - if err != nil { - return err - } - amount, err = strconv.ParseFloat(ob.Bids[x][1].(string), 64) - if err != nil { - return err - } - bids = append(bids, orderbook.Item{ - Amount: amount, - Price: price, - OrderCount: int64(ob.Bids[x][2].(float64)), - }) - } - for x := range ob.Asks { - var price, amount float64 - price, err = strconv.ParseFloat(ob.Asks[x][0].(string), 64) - if err != nil { - return err - } - amount, err = strconv.ParseFloat(ob.Asks[x][1].(string), 64) - if err != nil { - return err - } - asks = append(asks, orderbook.Item{ - Amount: amount, - Price: price, - OrderCount: int64(ob.Asks[x][2].(float64)), - }) - } if ob.Snapshot { - bids.SortBids() // Alignment completely out, sort is needed. err = b.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ - Pair: p, - Bids: bids, - Asks: asks, + Pair: ob.Currency, + Bids: orderbook.Items(ob.Bids), + Asks: orderbook.Items(ob.Asks), LastUpdated: ob.Timestamp, + LastUpdateID: ob.SnapshotID, Asset: asset.Spot, Exchange: b.Name, VerifyOrderbook: b.CanVerifyOrderbook, @@ -132,13 +146,14 @@ func (b *BTCMarkets) wsHandleData(respRaw []byte) error { } else { err = b.Websocket.Orderbook.Update(&buffer.Update{ UpdateTime: ob.Timestamp, + UpdateID: ob.SnapshotID, Asset: asset.Spot, - Bids: bids, - Asks: asks, - Pair: p, + Bids: orderbook.Items(ob.Bids), + Asks: orderbook.Items(ob.Asks), + Pair: ob.Currency, + Checksum: ob.Checksum, }) } - if err != nil { return err } @@ -380,3 +395,41 @@ func (b *BTCMarkets) Subscribe(channelsToSubscribe []stream.ChannelSubscription) b.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) return nil } + +// checksum provides assurance on current in memory liquidity +func checksum(ob *orderbook.Base, checksum uint32) error { + check := crc32.ChecksumIEEE([]byte(concat(ob.Bids) + concat(ob.Asks))) + if check != checksum { + return fmt.Errorf("%s %s %s ID: %v expected: %v but received: %v %w", + ob.Exchange, + ob.Pair, + ob.Asset, + ob.LastUpdateID, + checksum, + check, + errChecksumFailure) + } + return nil +} + +// concat concatenates price and amounts together for checksum processing +func concat(liquidity orderbook.Items) string { + length := 10 + if len(liquidity) < 10 { + length = len(liquidity) + } + var c string + for x := 0; x < length; x++ { + c += trim(liquidity[x].Price) + trim(liquidity[x].Amount) + } + return c +} + +// trim turns value into string, removes the decimal point and all the leading +// zeros. +func trim(value float64) string { + valstr := strconv.FormatFloat(value, 'f', -1, 64) + valstr = strings.ReplaceAll(valstr, ".", "") + valstr = strings.TrimLeft(valstr, "0") + return valstr +} diff --git a/exchanges/btcmarkets/btcmarkets_wrapper.go b/exchanges/btcmarkets/btcmarkets_wrapper.go index b78476cd..1246957a 100644 --- a/exchanges/btcmarkets/btcmarkets_wrapper.go +++ b/exchanges/btcmarkets/btcmarkets_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -167,7 +168,11 @@ func (b *BTCMarkets) Setup(exch *config.Exchange) error { Subscriber: b.Subscribe, GenerateSubscriptions: b.generateDefaultSubscriptions, Features: &b.Features.Supports.WebsocketCapabilities, - SortBuffer: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + UpdateIDProgression: true, + Checksum: checksum, + }, }) if err != nil { return err @@ -386,7 +391,9 @@ func (b *BTCMarkets) UpdateOrderbook(ctx context.Context, p currency.Pair, asset return book, err } - tempResp, err := b.GetOrderbook(ctx, fpair.String(), 2) + // Retrieve level one book which is the top 50 ask and bids, this is not + // cached. + tempResp, err := b.GetOrderbook(ctx, fpair.String(), 1) if err != nil { return book, err } diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index e67f3ee4..d74ba0b7 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -180,7 +181,9 @@ func (c *CoinbasePro) Setup(exch *config.Exchange) error { Unsubscriber: c.Unsubscribe, GenerateSubscriptions: c.GenerateDefaultSubscriptions, Features: &c.Features.Supports.WebsocketCapabilities, - SortBuffer: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + }, }) if err != nil { return err diff --git a/exchanges/coinut/coinut_wrapper.go b/exchanges/coinut/coinut_wrapper.go index f4a1db7d..39e666bc 100644 --- a/exchanges/coinut/coinut_wrapper.go +++ b/exchanges/coinut/coinut_wrapper.go @@ -24,6 +24,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -162,8 +163,10 @@ func (c *COINUT) Setup(exch *config.Exchange) error { Unsubscriber: c.Unsubscribe, GenerateSubscriptions: c.GenerateDefaultSubscriptions, Features: &c.Features.Supports.WebsocketCapabilities, - SortBuffer: true, - SortBufferByUpdateIDs: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + SortBufferByUpdateIDs: true, + }, }) if err != nil { return err diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index cd329c26..d181282c 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -178,8 +179,10 @@ func (h *HitBTC) Setup(exch *config.Exchange) error { Unsubscriber: h.Unsubscribe, GenerateSubscriptions: h.GenerateDefaultSubscriptions, Features: &h.Features.Supports.WebsocketCapabilities, - SortBuffer: true, - SortBufferByUpdateIDs: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + SortBufferByUpdateIDs: true, + }, }) if err != nil { return err diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 00229f58..dcffed5e 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -24,6 +24,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -225,7 +226,7 @@ func (k *Kraken) Setup(exch *config.Exchange) error { Unsubscriber: k.Unsubscribe, GenerateSubscriptions: k.GenerateDefaultSubscriptions, Features: &k.Features.Supports.WebsocketCapabilities, - SortBuffer: true, + OrderbookBufferConfig: buffer.Config{SortBuffer: true}, }) if err != nil { return err diff --git a/exchanges/orderbook/depth_test.go b/exchanges/orderbook/depth_test.go index c69b5d6c..1c2e8371 100644 --- a/exchanges/orderbook/depth_test.go +++ b/exchanges/orderbook/depth_test.go @@ -142,11 +142,15 @@ func TestFlush(t *testing.T) { func TestUpdateBidAskByPrice(t *testing.T) { d := newDepth(id) d.LoadSnapshot(Items{{Price: 1337, Amount: 1, ID: 1}}, Items{{Price: 1337, Amount: 10, ID: 2}}, 0, time.Time{}, false) - d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, 0, 0, time.Time{}) + + // empty + d.UpdateBidAskByPrice(nil, nil, 0, 1, time.Time{}) + + d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 2, ID: 1}}, Items{{Price: 1337, Amount: 2, ID: 2}}, 0, 1, time.Time{}) if d.Retrieve().Asks[0].Amount != 2 || d.Retrieve().Bids[0].Amount != 2 { t.Fatal("orderbook amounts not updated correctly") } - d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 0, ID: 1}}, Items{{Price: 1337, Amount: 0, ID: 2}}, 0, 0, time.Time{}) + d.UpdateBidAskByPrice(Items{{Price: 1337, Amount: 0, ID: 1}}, Items{{Price: 1337, Amount: 0, ID: 2}}, 0, 2, time.Time{}) if d.GetAskLength() != 0 || d.GetBidLength() != 0 { t.Fatal("orderbook amounts not updated correctly") } diff --git a/exchanges/poloniex/poloniex_wrapper.go b/exchanges/poloniex/poloniex_wrapper.go index bfb509ca..3a37aab7 100644 --- a/exchanges/poloniex/poloniex_wrapper.go +++ b/exchanges/poloniex/poloniex_wrapper.go @@ -23,6 +23,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -182,8 +183,10 @@ func (p *Poloniex) Setup(exch *config.Exchange) error { Unsubscriber: p.Unsubscribe, GenerateSubscriptions: p.GenerateDefaultSubscriptions, Features: &p.Features.Supports.WebsocketCapabilities, - SortBuffer: true, - SortBufferByUpdateIDs: true, + OrderbookBufferConfig: buffer.Config{ + SortBuffer: true, + SortBufferByUpdateIDs: true, + }, }) if err != nil { return err diff --git a/exchanges/stream/buffer/buffer.go b/exchanges/stream/buffer/buffer.go index 02fc1c95..4aafde7b 100644 --- a/exchanges/stream/buffer/buffer.go +++ b/exchanges/stream/buffer/buffer.go @@ -17,6 +17,7 @@ const packageError = "websocket orderbook buffer error: %w" var ( errExchangeConfigNil = errors.New("exchange config is nil") + errBufferConfigNil = errors.New("buffer config is nil") errUnsetDataHandler = errors.New("datahandler unset") errIssueBufferEnabledButNoLimit = errors.New("buffer enabled but no limit set") errUpdateIsNil = errors.New("update is nil") @@ -26,35 +27,43 @@ var ( ) // Setup sets private variables -func (w *Orderbook) Setup(cfg *config.Exchange, sortBuffer, sortBufferByUpdateIDs, updateEntriesByID bool, dataHandler chan interface{}) error { - if cfg == nil { // exchange config fields are checked in stream package +func (w *Orderbook) Setup(exchangeConfig *config.Exchange, c *Config, dataHandler chan<- interface{}) error { + if exchangeConfig == nil { // exchange config fields are checked in stream package // prior to calling this, so further checks are not needed. return fmt.Errorf(packageError, errExchangeConfigNil) } + if c == nil { + return fmt.Errorf(packageError, errBufferConfigNil) + } if dataHandler == nil { return fmt.Errorf(packageError, errUnsetDataHandler) } - if cfg.Orderbook.WebsocketBufferEnabled && - cfg.Orderbook.WebsocketBufferLimit < 1 { + if exchangeConfig.Orderbook.WebsocketBufferEnabled && + exchangeConfig.Orderbook.WebsocketBufferLimit < 1 { return fmt.Errorf(packageError, errIssueBufferEnabledButNoLimit) } - w.bufferEnabled = cfg.Orderbook.WebsocketBufferEnabled - w.obBufferLimit = cfg.Orderbook.WebsocketBufferLimit - w.sortBuffer = sortBuffer - w.sortBufferByUpdateIDs = sortBufferByUpdateIDs - w.updateEntriesByID = updateEntriesByID - w.exchangeName = cfg.Name + // NOTE: These variables are set by config.json under "orderbook" for each + // individual exchange. + w.bufferEnabled = exchangeConfig.Orderbook.WebsocketBufferEnabled + w.obBufferLimit = exchangeConfig.Orderbook.WebsocketBufferLimit + + w.sortBuffer = c.SortBuffer + w.sortBufferByUpdateIDs = c.SortBufferByUpdateIDs + w.updateEntriesByID = c.UpdateEntriesByID + w.exchangeName = exchangeConfig.Name w.dataHandler = dataHandler w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder) - w.verbose = cfg.Verbose + w.verbose = exchangeConfig.Verbose // set default publish period if missing orderbookPublishPeriod := config.DefaultOrderbookPublishPeriod - if cfg.Orderbook.PublishPeriod != nil { - orderbookPublishPeriod = *cfg.Orderbook.PublishPeriod + if exchangeConfig.Orderbook.PublishPeriod != nil { + orderbookPublishPeriod = *exchangeConfig.Orderbook.PublishPeriod } w.publishPeriod = orderbookPublishPeriod + w.updateIDProgression = c.UpdateIDProgression + w.checksum = c.Checksum return nil } @@ -86,6 +95,18 @@ func (w *Orderbook) Update(u *Update) error { u.Asset) } + // out of order update ID can be skipped + if w.updateIDProgression && u.UpdateID <= book.updateID { + if w.verbose { + log.Warnf(log.WebsocketMgr, + "Exchange %s CurrencyPair: %s AssetType: %s out of order websocket update received", + w.exchangeName, + u.Pair, + u.Asset) + } + return nil + } + // Checks for when the rest protocol overwrites a streaming dominated book // will stop updating book via incremental updates. This occurs because our // sync manager (engine/sync.go) timer has elapsed for streaming. Usually @@ -200,6 +221,13 @@ func (w *Orderbook) processObUpdate(o *orderbookHolder, u *Update) error { return o.updateByIDAndAction(u) } o.updateByPrice(u) + if w.checksum != nil { + err := w.checksum(o.ob.Retrieve(), u.Checksum) + if err != nil { + return err + } + o.updateID = u.UpdateID + } return nil } @@ -281,14 +309,15 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error { m2[book.Asset] = holder } + holder.updateID = book.LastUpdateID + // Checks if book can deploy to linked list err := book.Verify() if err != nil { return err } - holder.ob.LoadSnapshot( - book.Bids, + holder.ob.LoadSnapshot(book.Bids, book.Asks, book.LastUpdateID, book.LastUpdated, diff --git a/exchanges/stream/buffer/buffer_test.go b/exchanges/stream/buffer/buffer_test.go index 2963c403..868104e7 100644 --- a/exchanges/stream/buffer/buffer_test.go +++ b/exchanges/stream/buffer/buffer_test.go @@ -392,6 +392,8 @@ func TestOutOfOrderIDs(t *testing.T) { } } +var errTest = errors.New("test error") + func TestOrderbookLastUpdateID(t *testing.T) { holder, _, _, err := createSnapshot() if err != nil { @@ -402,6 +404,21 @@ func TestOrderbookLastUpdateID(t *testing.T) { exp, itemArray[1][0].Price) } + holder.checksum = func(state *orderbook.Base, checksum uint32) error { return errTest } + + err = holder.Update(&Update{ + Asks: []orderbook.Item{{Price: 999999}}, + Pair: cp, + UpdateID: -1, + Asset: asset.Spot, + }) + if !errors.Is(err, errTest) { + t.Fatalf("received: %v but expected: %v", err, errTest) + } + + holder.checksum = func(state *orderbook.Base, checksum uint32) error { return nil } + holder.updateIDProgression = true + for i := range itemArray { asks := itemArray[i] err = holder.Update(&Update{ @@ -415,6 +432,18 @@ func TestOrderbookLastUpdateID(t *testing.T) { } } + // out of order + holder.verbose = true + err = holder.Update(&Update{ + Asks: []orderbook.Item{{Price: 999999}}, + Pair: cp, + UpdateID: 1, + Asset: asset.Spot, + }) + if err != nil { + t.Fatal(err) + } + ob, err := holder.GetOrderbook(cp, asset.Spot) if err != nil { t.Fatal(err) @@ -715,19 +744,25 @@ func TestGetOrderbook(t *testing.T) { func TestSetup(t *testing.T) { t.Parallel() w := Orderbook{} - err := w.Setup(nil, false, false, false, nil) + err := w.Setup(nil, nil, nil) if !errors.Is(err, errExchangeConfigNil) { t.Fatalf("expected error %v but received %v", errExchangeConfigNil, err) } exchangeConfig := &config.Exchange{} - err = w.Setup(exchangeConfig, false, false, false, nil) + err = w.Setup(exchangeConfig, nil, nil) + if !errors.Is(err, errBufferConfigNil) { + t.Fatalf("expected error %v but received %v", errBufferConfigNil, err) + } + + bufferConf := &Config{} + err = w.Setup(exchangeConfig, bufferConf, nil) if !errors.Is(err, errUnsetDataHandler) { t.Fatalf("expected error %v but received %v", errUnsetDataHandler, err) } exchangeConfig.Orderbook.WebsocketBufferEnabled = true - err = w.Setup(exchangeConfig, false, false, false, make(chan interface{})) + err = w.Setup(exchangeConfig, bufferConf, make(chan interface{})) if !errors.Is(err, errIssueBufferEnabledButNoLimit) { t.Fatalf("expected error %v but received %v", errIssueBufferEnabledButNoLimit, err) } @@ -735,7 +770,10 @@ func TestSetup(t *testing.T) { exchangeConfig.Orderbook.WebsocketBufferLimit = 1337 exchangeConfig.Orderbook.WebsocketBufferEnabled = true exchangeConfig.Name = "test" - err = w.Setup(exchangeConfig, true, true, true, make(chan interface{})) + bufferConf.SortBuffer = true + bufferConf.SortBufferByUpdateIDs = true + bufferConf.UpdateEntriesByID = true + err = w.Setup(exchangeConfig, bufferConf, make(chan interface{})) if err != nil { t.Fatal(err) } @@ -1010,7 +1048,7 @@ func TestUpdateByIDAndAction(t *testing.T) { func TestFlushOrderbook(t *testing.T) { t.Parallel() w := &Orderbook{} - err := w.Setup(&config.Exchange{Name: "test"}, false, false, false, make(chan interface{}, 2)) + err := w.Setup(&config.Exchange{Name: "test"}, &Config{}, make(chan interface{}, 2)) if err != nil { t.Fatal(err) } diff --git a/exchanges/stream/buffer/buffer_types.go b/exchanges/stream/buffer/buffer_types.go index c9afe11c..fcd19e1c 100644 --- a/exchanges/stream/buffer/buffer_types.go +++ b/exchanges/stream/buffer/buffer_types.go @@ -9,6 +9,24 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" ) +// Config defines the configuration variables for the websocket buffer; snapshot +// and incremental update orderbook processing. +type Config struct { + // SortBuffer enables a websocket to sort incoming updates before processing. + SortBuffer bool + // SortBufferByUpdateIDs allows the sorting of the buffered updates by their + // corresponding update IDs. + SortBufferByUpdateIDs bool + // UpdateEntriesByID will match by IDs instead of price to perform the an + // action. e.g. update, delete, insert. + UpdateEntriesByID bool + // UpdateIDProgression requires that the new update ID be greater than the + // prior ID. This will skip processing and not error. + UpdateIDProgression bool + // Checksum is a package defined checksum calculation for updated books. + Checksum func(state *orderbook.Base, checksum uint32) error +} + // Orderbook defines a local cache of orderbooks for amending, appending // and deleting changes and updates the main store for a stream type Orderbook struct { @@ -19,10 +37,17 @@ type Orderbook struct { sortBufferByUpdateIDs bool // When timestamps aren't provided, an id can help sort updateEntriesByID bool // Use the update IDs to match ob entries exchangeName string - dataHandler chan interface{} + dataHandler chan<- interface{} verbose bool - publishPeriod time.Duration - m sync.Mutex + + // updateIDProgression requires that the new update ID be greater than the + // prior ID. This will skip processing and not error. + updateIDProgression bool + // checksum is a package defined checksum calculation for updated books. + checksum func(state *orderbook.Base, checksum uint32) error + + publishPeriod time.Duration + m sync.Mutex } // orderbookHolder defines a store of pending updates and a pointer to the @@ -34,7 +59,8 @@ type orderbookHolder struct { // coinbasepro can have up too 100 updates per second introducing overhead. // The sync agent only requires an alert every 15 seconds for a specific // currency. - ticker *time.Ticker + ticker *time.Ticker + updateID int64 } // Update stores orderbook updates and dictates what features to use when processing @@ -46,7 +72,8 @@ type Update struct { Bids []orderbook.Item Asks []orderbook.Item Pair currency.Pair - + // Checksum defines the expected value when the books have been verified + Checksum uint32 // Determines if there is a max depth of orderbooks and after an append we // should remove any items that are outside of this scope. Kraken is the // only exchange utilising this field. diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 7d27c86c..0d413f72 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -148,21 +148,12 @@ func (w *Websocket) Setup(s *WebsocketSetup) error { w.Wg = new(sync.WaitGroup) w.SetCanUseAuthenticatedEndpoints(s.ExchangeConfig.API.AuthenticatedWebsocketSupport) - if err := w.Orderbook.Setup(s.ExchangeConfig, - s.SortBuffer, - s.SortBufferByUpdateIDs, - s.UpdateEntriesByID, - w.DataHandler); err != nil { + if err := w.Orderbook.Setup(s.ExchangeConfig, &s.OrderbookBufferConfig, w.DataHandler); err != nil { return err } - w.Trade.Setup(w.exchangeName, - s.TradeFeed, - w.DataHandler) - - w.Fills.Setup(s.FillsFeed, - w.DataHandler) - + w.Trade.Setup(w.exchangeName, s.TradeFeed, w.DataHandler) + w.Fills.Setup(s.FillsFeed, w.DataHandler) return nil } diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 3c39ec24..08ad2d2c 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -106,11 +106,12 @@ type WebsocketSetup struct { GenerateSubscriptions func() ([]ChannelSubscription, error) Features *protocol.Features ConnectionMonitorDelay time.Duration + // Local orderbook buffer config values - SortBuffer bool - SortBufferByUpdateIDs bool - UpdateEntriesByID bool - TradeFeed bool + OrderbookBufferConfig buffer.Config + + TradeFeed bool + // Fill data config values FillsFeed bool }