Kraken: fix websocket orderbook processing (#1279)

* kraken: fix websocket orderbook processing

* kraken: clean

* glorious: nits also add todo

* more: add required checksum to TODO for future, move variable to top

* glorious: get mad

* orderbook: fix retrieve issue

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2023-07-26 16:35:43 +10:00
committed by GitHub
parent 471f4f21c4
commit 4dd3008fd4
7 changed files with 140 additions and 74 deletions

View File

@@ -67,7 +67,6 @@ func (b *Bittrex) ProcessUpdateOB(pair currency.Pair, message *OrderbookUpdateMe
Asset: asset.Spot,
Pair: pair,
UpdateID: message.Sequence,
MaxDepth: orderbookDepth,
Bids: updateBids,
Asks: updateAsks,
})
@@ -115,11 +114,11 @@ func (b *Bittrex) SeedLocalOBCache(ctx context.Context, p currency.Pair) error {
if err != nil {
return err
}
return b.SeedLocalCacheWithOrderBook(p, sequence, ob)
return b.SeedLocalCacheWithOrderBook(p, sequence, ob, orderbookDepth)
}
// SeedLocalCacheWithOrderBook seeds the local orderbook cache
func (b *Bittrex) SeedLocalCacheWithOrderBook(p currency.Pair, sequence int64, orderbookNew *OrderbookData) error {
func (b *Bittrex) SeedLocalCacheWithOrderBook(p currency.Pair, sequence int64, orderbookNew *OrderbookData, maxDepth int) error {
newOrderBook := orderbook.Base{
Pair: p,
Asset: asset.Spot,
@@ -128,6 +127,7 @@ func (b *Bittrex) SeedLocalCacheWithOrderBook(p currency.Pair, sequence int64, o
VerifyOrderbook: b.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(orderbookNew.Bid)),
Asks: make(orderbook.Items, len(orderbookNew.Ask)),
MaxDepth: maxDepth,
}
for i := range orderbookNew.Bid {

View File

@@ -2189,3 +2189,63 @@ func TestGetFuturesTrades(t *testing.T) {
t.Error(err)
}
}
var websocketXDGUSDOrderbookUpdates = []string{
`{"channelID":2304,"channelName":"book-10","event":"subscriptionStatus","pair":"XDG/USD","reqid":163845014,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`,
`[2304,{"as":[["0.074602700","278.39626342","1690246067.832139"],["0.074611000","555.65134028","1690246086.243668"],["0.074613300","524.87121572","1690245901.574881"],["0.074624600","77.57180740","1690246060.668500"],["0.074632500","620.64648404","1690246010.904883"],["0.074698400","409.57419037","1690246041.269821"],["0.074700000","61067.71115772","1690246089.485595"],["0.074723200","4394.01869240","1690246087.557913"],["0.074725200","4229.57885125","1690246082.911452"],["0.074738400","212.25501214","1690246089.421559"]],"bs":[["0.074597400","53591.43163675","1690246089.451762"],["0.074596700","33594.18269213","1690246089.514152"],["0.074596600","53598.60351469","1690246089.340781"],["0.074594800","5358.57247081","1690246089.347962"],["0.074594200","30168.21074680","1690246089.345112"],["0.074590900","7089.69894583","1690246088.212880"],["0.074586700","46925.20182082","1690246089.074618"],["0.074577200","5500.00000000","1690246087.568856"],["0.074569600","8132.49888631","1690246086.841219"],["0.074562900","8413.11098009","1690246087.024863"]]},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074700000","0.00000000","1690246089.516119"],["0.074738500","125000.00000000","1690246063.352141","r"]],"c":"2219685759"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074678800","33476.70673703","1690246089.570183"]],"c":"1897176819"},"book-10","XDG/USD"]`,
`[2304,{"b":[["0.074562900","0.00000000","1690246089.570206"],["0.074559600","4000.00000000","1690246086.478591","r"]],"c":"2498018751"},"book-10","XDG/USD"]`,
`[2304,{"b":[["0.074577300","125000.00000000","1690246089.577140"]],"c":"155006629"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074678800","0.00000000","1690246089.584498"],["0.074738500","125000.00000000","1690246063.352141","r"]],"c":"3703147735"},"book-10","XDG/USD"]`,
`[2304,{"b":[["0.074597500","10000.00000000","1690246089.602477"]],"c":"2989534775"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074738500","0.00000000","1690246089.608769"],["0.074750800","51369.02100000","1690246089.495500","r"]],"c":"1842075082"},"book-10","XDG/USD"]`,
`[2304,{"b":[["0.074583500","8413.11098009","1690246089.612144"]],"c":"710274752"},"book-10","XDG/USD"]`,
`[2304,{"b":[["0.074578500","9966.55841398","1690246089.634739"]],"c":"1646135532"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074738400","0.00000000","1690246089.638648"],["0.074751500","80499.09450000","1690246086.679402","r"]],"c":"2509689626"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074750700","290.96851266","1690246089.638754"]],"c":"3981738175"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074720000","61067.71115772","1690246089.662102"]],"c":"1591820326"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074602700","0.00000000","1690246089.670911"],["0.074750800","51369.02100000","1690246089.495500","r"]],"c":"3838272404"},"book-10","XDG/USD"]`,
`[2304,{"a":[["0.074611000","0.00000000","1690246089.680343"],["0.074758500","159144.39750000","1690246035.158327","r"]],"c":"4241552383"},"book-10","XDG/USD"] `,
}
var websocketLUNAEUROrderbookUpdates = []string{
`{"channelID":9536,"channelName":"book-10","event":"subscriptionStatus","pair":"LUNA/EUR","reqid":106845459,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`,
`[9536,{"as":[["0.000074650000","147354.32016076","1690249755.076929"],["0.000074710000","5084881.40000000","1690250711.359411"],["0.000074760000","9700502.70476704","1690250743.279490"],["0.000074990000","2933380.23886300","1690249596.627969"],["0.000075000000","433333.33333333","1690245575.626780"],["0.000075020000","152914.84493416","1690243661.232520"],["0.000075070000","146529.90542161","1690249048.358424"],["0.000075250000","737072.85720004","1690211553.549248"],["0.000075400000","670061.64567140","1690250769.261196"],["0.000075460000","980226.63603417","1690250769.627523"]],"bs":[["0.000074590000","71029.87806720","1690250763.012724"],["0.000074580000","15935576.86404000","1690250763.012710"],["0.000074520000","33758611.79634000","1690250718.290955"],["0.000074350000","3156650.58590277","1690250766.499648"],["0.000074340000","301727260.79999999","1690250766.490238"],["0.000074320000","64611496.53837000","1690250742.680258"],["0.000074310000","104228596.60000000","1690250744.679121"],["0.000074300000","40366046.10582000","1690250762.685914"],["0.000074200000","3690216.57320475","1690250645.311465"],["0.000074060000","1337170.52532521","1690250742.012527"]]},"book-10","LUNA/EUR"]`,
`[9536,{"b":[["0.000074060000","0.00000000","1690250770.616604"],["0.000074050000","16742421.17790510","1690250710.867730","r"]],"c":"418307145"},"book-10","LUNA/EUR"]`,
}
var websocketGSTEUROrderbookUpdates = []string{
`{"channelID":8912,"channelName":"book-10","event":"subscriptionStatus","pair":"GST/EUR","reqid":157734759,"status":"subscribed","subscription":{"depth":10,"name":"book"}}`,
`[8912,{"as":[["0.01300","850.00000000","1690230914.230506"],["0.01400","323483.99590510","1690256356.615823"],["0.01500","100287.34442717","1690219133.193345"],["0.01600","67995.78441017","1690118389.451216"],["0.01700","41776.38397740","1689676303.381189"],["0.01800","11785.76177777","1688631951.812452"],["0.01900","23700.00000000","1686935422.319042"],["0.02000","3941.17000000","1689415829.176481"],["0.02100","16598.69173066","1689420942.541943"],["0.02200","17572.51572836","1689851425.907427"]],"bs":[["0.01200","14220.66466572","1690256540.842831"],["0.01100","160223.61546438","1690256401.072463"],["0.01000","63083.48958963","1690256604.037673"],["0.00900","6750.00000000","1690252470.633938"],["0.00800","213059.49706376","1690256360.386301"],["0.00700","1000.00000000","1689869458.464975"],["0.00600","4000.00000000","1690221333.528698"],["0.00100","245000.00000000","1690051368.753455"]]},"book-10","GST/EUR"]`,
`[8912,{"b":[["0.01000","60583.48958963","1690256620.206768"],["0.01000","63083.48958963","1690256620.206783"]],"c":"69619317"},"book-10","GST/EUR"]`,
}
func TestWsOrderbookMax10Depth(t *testing.T) {
t.Parallel()
for x := range websocketXDGUSDOrderbookUpdates {
err := k.wsHandleData([]byte(websocketXDGUSDOrderbookUpdates[x]))
if err != nil {
t.Fatal(err)
}
}
for x := range websocketLUNAEUROrderbookUpdates {
err := k.wsHandleData([]byte(websocketLUNAEUROrderbookUpdates[x]))
// TODO: Known issue with LUNA pairs and big number float precision
// storage and checksum calc. Might need to store raw strings as fields
// in the orderbook.Item struct.
// Required checksum: 7465000014735432016076747100005084881400000007476000097005027047670474990000293338023886300750000004333333333333375020000152914844934167507000014652990542161752500007370728572000475400000670061645671407546000098022663603417745900007102987806720745800001593557686404000745200003375861179634000743500003156650585902777434000030172726079999999743200006461149653837000743100001042285966000000074300000403660461058200074200000369021657320475740500001674242117790510
if err != nil && x != len(websocketLUNAEUROrderbookUpdates)-1 {
t.Fatal(err)
}
}
// This has less than 10 bids and still needs a checksum calc.
for x := range websocketGSTEUROrderbookUpdates {
err := k.wsHandleData([]byte(websocketGSTEUROrderbookUpdates[x]))
if err != nil {
t.Fatal(err)
}
}
}

View File

@@ -562,6 +562,7 @@ type WebsocketChannelData struct {
Subscription string
Pair currency.Pair
ChannelID *int64
MaxDepth int
}
// WsTokenResponse holds the WS auth token

View File

@@ -58,10 +58,11 @@ const (
// orderbookMutex Ensures if two entries arrive at once, only one can be
// processed at a time
var (
subscriptionChannelPair []WebsocketChannelData
authToken string
pingRequest = WebsocketBaseEventRequest{Event: stream.Ping}
m sync.Mutex
subscriptionChannelPair []WebsocketChannelData
authToken string
pingRequest = WebsocketBaseEventRequest{Event: stream.Ping}
m sync.Mutex
errNoWebsocketOrderbookData = errors.New("no websocket orderbook data")
)
// Channels require a topic and a currency
@@ -629,8 +630,8 @@ func (k *Kraken) wsProcessOpenOrders(ownOrders interface{}) error {
func (k *Kraken) addNewSubscriptionChannelData(response *wsSubscription) {
// We change the / to - to maintain compatibility with REST/config
var pair, fPair currency.Pair
var err error
if response.Pair != "" {
var err error
pair, err = currency.NewPairFromString(response.Pair)
if err != nil {
log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err)
@@ -642,12 +643,21 @@ func (k *Kraken) addNewSubscriptionChannelData(response *wsSubscription) {
return
}
}
maxDepth := 0
if splits := strings.Split(response.ChannelName, "-"); len(splits) > 1 {
maxDepth, err = strconv.Atoi(splits[1])
if err != nil {
log.Errorf(log.ExchangeSys, "%s exchange error: %s", k.Name, err)
}
}
m.Lock()
defer m.Unlock()
subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{
Subscription: response.Subscription.Name,
Pair: fPair,
ChannelID: response.ChannelID,
MaxDepth: maxDepth,
})
}
@@ -810,45 +820,49 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter
// wsProcessOrderBook determines if the orderbook data is partial or update
// Then sends to appropriate fun
func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[string]interface{}) error {
askSnapshot, askSnapshotExists := data["as"].([]interface{})
bidSnapshot, bidSnapshotExists := data["bs"].([]interface{})
if askSnapshotExists || bidSnapshotExists {
err := k.wsProcessOrderBookPartial(channelData, askSnapshot, bidSnapshot)
if err != nil {
return err
}
} else {
askData, asksExist := data["a"].([]interface{})
bidData, bidsExist := data["b"].([]interface{})
// NOTE: Updates are a priority so check if it's an update first as we don't
// need multiple map lookups to check for snapshot.
askData, asksExist := data["a"].([]interface{})
bidData, bidsExist := data["b"].([]interface{})
if asksExist || bidsExist {
checksum, ok := data["c"].(string)
if !ok {
return fmt.Errorf("could not process orderbook update checksum not found")
}
if asksExist || bidsExist {
k.wsRequestMtx.Lock()
defer k.wsRequestMtx.Unlock()
err := k.wsProcessOrderBookUpdate(channelData, askData, bidData, checksum)
if err != nil {
go func(resub *stream.ChannelSubscription) {
// This was locking the main websocket reader routine and a
// backlog occurred. So put this into it's own go routine.
errResub := k.Websocket.ResubscribeToChannel(resub)
if errResub != nil {
log.Errorf(log.WebsocketMgr,
"resubscription failure for %v: %v",
resub,
errResub)
}
}(&stream.ChannelSubscription{
Channel: krakenWsOrderbook,
Currency: channelData.Pair,
Asset: asset.Spot,
})
return err
}
k.wsRequestMtx.Lock()
defer k.wsRequestMtx.Unlock()
err := k.wsProcessOrderBookUpdate(channelData, askData, bidData, checksum)
if err != nil {
outbound := channelData.Pair // Format required "XBT/USD"
outbound.Delimiter = "/"
go func(resub *stream.ChannelSubscription) {
// This was locking the main websocket reader routine and a
// backlog occurred. So put this into it's own go routine.
errResub := k.Websocket.ResubscribeToChannel(resub)
if errResub != nil {
log.Errorf(log.WebsocketMgr,
"resubscription failure for %v: %v",
resub,
errResub)
}
}(&stream.ChannelSubscription{
Channel: krakenWsOrderbook,
Currency: outbound,
Asset: asset.Spot,
})
return err
}
return nil
}
return nil
askSnapshot, askSnapshotExists := data["as"].([]interface{})
bidSnapshot, bidSnapshotExists := data["bs"].([]interface{})
if !askSnapshotExists && !bidSnapshotExists {
return fmt.Errorf("%w for %v %v", errNoWebsocketOrderbookData, channelData.Pair, asset.Spot)
}
return k.wsProcessOrderBookPartial(channelData, askSnapshot, bidSnapshot)
}
// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair
@@ -859,6 +873,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, as
VerifyOrderbook: k.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(bidData)),
Asks: make(orderbook.Items, len(askData)),
MaxDepth: channelData.MaxDepth,
}
// Kraken ob data is timestamped per price, GCT orderbook data is
// timestamped per entry using the highest last update time, we can attempt
@@ -933,11 +948,10 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, as
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, askData, bidData []interface{}, checksum string) error {
update := orderbook.Update{
Asset: asset.Spot,
Pair: channelData.Pair,
MaxDepth: krakenWsOrderbookDepth,
Bids: make([]orderbook.Item, len(bidData)),
Asks: make([]orderbook.Item, len(askData)),
Asset: asset.Spot,
Pair: channelData.Pair,
Bids: make([]orderbook.Item, len(bidData)),
Asks: make([]orderbook.Item, len(askData)),
}
// Calculating checksum requires incoming decimal place checks for both
@@ -1072,6 +1086,7 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, ask
}
}
update.UpdateTime = highestLastUpdate
err := k.Websocket.Orderbook.Update(&update)
if err != nil {
return err
@@ -1094,12 +1109,6 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData, ask
}
func validateCRC32(b *orderbook.Base, token uint32, decPrice, decAmount int) error {
if len(b.Asks) < 10 || len(b.Bids) < 10 {
return fmt.Errorf("%s %s insufficient bid and asks to calculate checksum",
b.Pair,
b.Asset)
}
if decPrice == 0 || decAmount == 0 {
return fmt.Errorf("%s %s trailing decimal count not calculated",
b.Pair,
@@ -1107,14 +1116,14 @@ func validateCRC32(b *orderbook.Base, token uint32, decPrice, decAmount int) err
}
var checkStr strings.Builder
for i := 0; i < 10; i++ {
for i := 0; i < 10 && i < len(b.Asks); i++ {
priceStr := trim(strconv.FormatFloat(b.Asks[i].Price, 'f', decPrice, 64))
checkStr.WriteString(priceStr)
amountStr := trim(strconv.FormatFloat(b.Asks[i].Amount, 'f', decAmount, 64))
checkStr.WriteString(amountStr)
}
for i := 0; i < 10; i++ {
for i := 0; i < 10 && i < len(b.Bids); i++ {
priceStr := trim(strconv.FormatFloat(b.Bids[i].Price, 'f', decPrice, 64))
checkStr.WriteString(priceStr)
amountStr := trim(strconv.FormatFloat(b.Bids[i].Amount, 'f', decAmount, 64))
@@ -1232,11 +1241,6 @@ channels:
}
for j := range *s {
if len((*s)[j].Channels) >= 20 {
// Batch outgoing subscriptions as there are limitations on the
// orderbook snapshots
continue
}
(*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Currency.String())
(*s)[j].Channels = append((*s)[j].Channels, channelsToSubscribe[i])
continue channels
@@ -1251,7 +1255,7 @@ channels:
},
}
if channelsToSubscribe[i].Channel == "book" {
outbound.Subscription.Depth = 1000
outbound.Subscription.Depth = krakenWsOrderbookDepth
}
if !channelsToSubscribe[i].Currency.IsEmpty() {
outbound.Pairs = []string{channelsToSubscribe[i].Currency.String()}
@@ -1265,7 +1269,7 @@ channels:
}
var errs error
for subType, subs := range subscriptions {
for _, subs := range subscriptions {
for i := range *subs {
if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) {
_, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i])
@@ -1276,13 +1280,6 @@ channels:
k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...)
continue
}
if subType == "book" {
// There is an undocumented subscription limit that is present
// on websocket orderbooks, to subscribe to the channel while
// actually receiving the snapshots a rudimentary sleep is
// imposed and requests are batched in allotments of 20 items.
time.Sleep(time.Second)
}
_, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i])
if err != nil {
errs = common.AppendError(errs, err)

View File

@@ -83,6 +83,7 @@ func (d *Depth) Retrieve() (*Base, error) {
PriceDuplication: d.priceDuplication,
IsFundingRate: d.isFundingRate,
VerifyOrderbook: d.VerifyOrderbook,
MaxDepth: d.maxDepth,
}, nil
}
@@ -138,10 +139,10 @@ func (d *Depth) UpdateBidAskByPrice(update *Update) {
tn := getNow()
d.m.Lock()
if len(update.Bids) != 0 {
d.bids.updateInsertByPrice(update.Bids, d.stack, update.MaxDepth, tn)
d.bids.updateInsertByPrice(update.Bids, d.stack, d.options.maxDepth, tn)
}
if len(update.Asks) != 0 {
d.asks.updateInsertByPrice(update.Asks, d.stack, update.MaxDepth, tn)
d.asks.updateInsertByPrice(update.Asks, d.stack, d.options.maxDepth, tn)
}
d.updateAndAlert(update)
d.m.Unlock()
@@ -241,6 +242,7 @@ func (d *Depth) AssignOptions(b *Base) {
VerifyOrderbook: b.VerifyOrderbook,
restSnapshot: b.RestSnapshot,
idAligned: b.IDAlignment,
maxDepth: b.MaxDepth,
}
d.m.Unlock()
}

View File

@@ -97,6 +97,7 @@ func TestRetrieve(t *testing.T) {
VerifyOrderbook: true,
restSnapshot: true,
idAligned: true,
maxDepth: 10,
}
// If we add anymore options to the options struct later this will complain
@@ -122,6 +123,10 @@ func TestRetrieve(t *testing.T) {
if len(ob.Bids) != 1 {
t.Errorf("expected len %v, but received %v", 1, len(ob.Bids))
}
if ob.MaxDepth != 10 {
t.Errorf("expected max depth %v, but received %v", 10, ob.MaxDepth)
}
}
func TestTotalAmounts(t *testing.T) {

View File

@@ -95,6 +95,10 @@ type Base struct {
RestSnapshot bool
// Checks if the orderbook needs ID alignment as well as price alignment
IDAlignment bool
// Determines if there is a max depth of orderbooks and after an append we
// should remove any items that are outside of this scope. Bittrex and
// Kraken utilise this field.
MaxDepth int
}
type byOBPrice []Item
@@ -114,6 +118,7 @@ type options struct {
VerifyOrderbook bool
restSnapshot bool
idAligned bool
maxDepth int
}
// Action defines a set of differing states required to implement an incoming
@@ -143,10 +148,6 @@ type Update struct {
Pair currency.Pair
// Checksum defines the expected value when the books have been verified
Checksum uint32
// Determines if there is a max depth of orderbooks and after an append we
// should remove any items that are outside of this scope. Kraken is the
// only exchange utilising this field.
MaxDepth int
}
// Movement defines orderbook traversal details from either hitting the bids or