From 32b43387fd283573d8f982e54fbc0a6fc55e2f79 Mon Sep 17 00:00:00 2001 From: Scott Date: Fri, 12 Apr 2019 09:21:37 +1000 Subject: [PATCH] Kraken websocket orderbook buffer (#276) * Adds a orderbook buffer to prevent out of order issues with WS orderbooks. Internalises latest orderbook for comparisons sake as getorderbookex is unreliable due to REST updates * Adds a basic rate limiter for WS requests. Updates buffer to support multiple channels * Uses earliest buffer 'lastupdate' to compate to existing orderbook's 'lastupdate' instead of last * Adds WS test for buffer use * Adds out of order test * Fixes blocking bid updates, fixes issue where orderbook processing occured in wrong area, adds detailed verbose debugging, uses pointer for channelData use * Updates test to adapt to buffer limits, reduces kraken buffer to 3 * Change websocket connection check in tests. Change error handling. Implement requestID in data responses for WS. Change test to prevent default subscriptions from preventing test execution with extra data. Updates orderbook tests to call correct functions to prevent resubscriptions. Removes resubscribe on subscription status failure * Fixes linting issues * Fixes error handling --- exchanges/kraken/kraken_test.go | 155 ++++++++++++-- exchanges/kraken/kraken_types.go | 5 +- exchanges/kraken/kraken_websocket.go | 310 +++++++++++++++++++++------ 3 files changed, 382 insertions(+), 88 deletions(-) diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 64a57e5e..97973b7e 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1,8 +1,11 @@ package kraken import ( + "fmt" + "strings" "testing" + "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency" exchange "github.com/thrasher-/gocryptotrader/exchanges" @@ -36,6 +39,7 @@ func TestSetup(t *testing.T) { krakenConfig.APISecret = apiSecret krakenConfig.ClientID = clientID krakenConfig.WebsocketURL = k.WebsocketURL + subscribeToDefaultChannels = false k.Setup(&krakenConfig) } @@ -638,6 +642,113 @@ func TestWithdrawCancel(t *testing.T) { // ---------------------------- Websocket tests ----------------------------------------- +// TestOrderbookBufferReset websocket test +func TestOrderbookBufferReset(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if k.WebsocketConn == nil { + k.Websocket.Connect() + } + var obUpdates []string + obpartial := `[0,{"as":[["5541.30000","2.50700000","0"]],"bs":[["5541.20000","1.52900000","0"]]}]` + for i := 1; i < orderbookBufferLimit+2; i++ { + obUpdates = append(obUpdates, fmt.Sprintf(`[0,{"a":[["5541.30000","2.50700000","%v"]],"b":[["5541.30000","1.00000000","%v"]]}]`, i, i)) + } + k.Websocket.DataHandler = make(chan interface{}, 10) + var dataResponse WebsocketDataResponse + err := common.JSONDecode([]byte(obpartial), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData := dataResponse[1].(map[string]interface{}) + channelData := WebsocketChannelData{ + ChannelID: 0, + Subscription: "orderbook", + Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"), + } + + k.wsProcessOrderBookPartial( + &channelData, + obData, + ) + + for i := 0; i < len(obUpdates); i++ { + err = common.JSONDecode([]byte(obUpdates[i]), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + if i < len(obUpdates)-1 { + k.wsProcessOrderBookBuffer(&channelData, obData) + } else if i == len(obUpdates)-1 { + k.wsProcessOrderBookUpdate(&channelData) + k.wsProcessOrderBookBuffer(&channelData, obData) + if len(orderbookBuffer[channelData.ChannelID]) != 1 { + t.Error("Buffer should have 1 entry after being reset") + } + } + } +} + +// TestOrderbookBufferReset websocket test +func TestOrderBookOutOfOrder(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if k.WebsocketConn == nil { + k.Websocket.Connect() + } + obpartial := `[0,{"as":[["5541.30000","2.50700000","0"]],"bs":[["5541.20000","1.52900000","5"]]}]` + obupdate1 := `[0,{"a":[["5541.30000","0.00000000","1"]],"b":[["5541.30000","0.00000000","3"]]}]` + obupdate2 := `[0,{"a":[["5541.30000","2.50700000","2"]],"b":[["5541.30000","0.00000000","1"]]}]` + + k.Websocket.DataHandler = make(chan interface{}, 10) + var dataResponse WebsocketDataResponse + err := common.JSONDecode([]byte(obpartial), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData := dataResponse[1].(map[string]interface{}) + channelData := WebsocketChannelData{ + ChannelID: 0, + Subscription: "orderbook", + Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"), + } + + k.wsProcessOrderBookPartial( + &channelData, + obData, + ) + + err = common.JSONDecode([]byte(obupdate1), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + k.wsProcessOrderBookBuffer(&channelData, obData) + + err = common.JSONDecode([]byte(obupdate2), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + k.wsProcessOrderBookBuffer(&channelData, obData) + + err = k.wsProcessOrderBookUpdate(&channelData) + if !strings.Contains(err.Error(), "orderbook update out of order") { + t.Error("Expected out of order orderbook error") + } +} + // TestSubscribeToChannel websocket test func TestSubscribeToChannel(t *testing.T) { if k.Name == "" { @@ -647,12 +758,11 @@ func TestSubscribeToChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - <-k.Websocket.TrafficAlert - err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + err := k.WsSubscribeToChannel("ticker", []string{"XTZ/USD"}, 1) if err != nil { t.Error(err) } @@ -667,7 +777,7 @@ func TestSubscribeToNonExistentChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } err := k.WsSubscribeToChannel("ticker", []string{"pewdiepie"}, 1) @@ -696,14 +806,14 @@ func TestSubscribeUnsubscribeToChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + err := k.WsSubscribeToChannel("ticker", []string{"XRP/JPY"}, 1) if err != nil { t.Error(err) } - err = k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 2) + err = k.WsUnsubscribeToChannel("ticker", []string{"XRP/JPY"}, 2) if err != nil { t.Error(err) } @@ -718,18 +828,19 @@ func TestUnsubscribeWithoutSubscription(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 3) + err := k.WsUnsubscribeToChannel("ticker", []string{"QTUM/EUR"}, 3) if err != nil { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler + t.Log(response) if err, ok := response.(error); ok && err != nil { - if err.Error() == "Subscription Not Found" { + if err.Error() == "requestID: '3'. Error: Subscription Not Found" { unsubscriptionError = true break } @@ -749,18 +860,18 @@ func TestUnsubscribeWithChannelID(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsUnsubscribeToChannelByChannelID(3) + err := k.WsUnsubscribeToChannelByChannelID(100) if err != nil { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { - if err.Error() == "Subscription Not Found" { + if err.Error() == "Not subscribed to the requested channelID" { unsubscriptionError = true break } @@ -771,8 +882,8 @@ func TestUnsubscribeWithChannelID(t *testing.T) { } } -// TestUnsubscribeFromNonExistentChennel websocket test -func TestUnsubscribeFromNonExistentChennel(t *testing.T) { +// TestUnsubscribeFromNonExistentChannel websocket test +func TestUnsubscribeFromNonExistentChannel(t *testing.T) { if k.Name == "" { k.SetDefaults() TestSetup(t) @@ -780,7 +891,7 @@ func TestUnsubscribeFromNonExistentChennel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } err := k.WsUnsubscribeToChannel("ticker", []string{"tseries"}, 0) @@ -788,11 +899,13 @@ func TestUnsubscribeFromNonExistentChennel(t *testing.T) { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { - unsubscriptionError = true - break + if err.Error() == "Currency pair not in ISO 4217-A3 format tseries" { + unsubscriptionError = true + break + } } } if !unsubscriptionError { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index f38c7cd1..eb62d7fd 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -415,6 +415,7 @@ type WebsocketEventResponse struct { Event string `json:"event"` Status string `json:"status"` Pair currency.Pair `json:"pair,omitempty"` + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` WebsocketSubscriptionEventResponse WebsocketStatusResponse @@ -422,7 +423,7 @@ type WebsocketEventResponse struct { } type WebsocketSubscriptionEventResponse struct { - ChannelID float64 `json:"channelID"` + ChannelID int64 `json:"channelID"` } type WebsocketSubscriptionResponseData struct { @@ -444,5 +445,5 @@ type WebsocketErrorResponse struct { type WebsocketChannelData struct { Subscription string Pair currency.Pair - ChannelID float64 + ChannelID int64 } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index ce03aa24..facd125a 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -9,6 +9,7 @@ import ( "math" "net/http" "net/url" + "sort" "strconv" "strings" "sync" @@ -44,20 +45,33 @@ const ( krakenWsSpread = "spread" krakenWsOrderbook = "book" // Only supported asset type - krakenWsAssetType = "SPOT" + krakenWsAssetType = "SPOT" + orderbookBufferLimit = 3 ) // orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time var orderbookMutex sync.Mutex var subscriptionChannelPair []WebsocketChannelData +// krakenOrderBooks TODO THIS IS A TEMPORARY SOLUTION UNTIL ENGINE BRANCH IS MERGED +// WS orderbook data can only rely on WS orderbook data +// Currently REST and WS runs simultaneously, dirtying the data +var krakenOrderBooks map[int64]orderbook.Base + +// orderbookBuffer Stores orderbook updates per channel +var orderbookBuffer map[int64][]orderbook.Base +var subscribeToDefaultChannels = true + // writeToWebsocket sends a message to the websocket endpoint func (k *Kraken) writeToWebsocket(message []byte) error { k.mu.Lock() defer k.mu.Unlock() if k.Verbose { - log.Debugf("Sending message to WS: %v", string(message)) + log.Debugf("Sending message to WS: %v", + string(message)) } + // Really basic WS rate limit + time.Sleep(30 * time.Millisecond) return k.WebsocketConn.WriteMessage(websocket.TextMessage, message) } @@ -79,7 +93,8 @@ func (k *Kraken) WsConnect() error { var err error if k.Verbose { - log.Debugf("Attempting to connect to %v", k.Websocket.GetWebsocketURL()) + log.Debugf("Attempting to connect to %v", + k.Websocket.GetWebsocketURL()) } k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(), http.Header{}) @@ -89,11 +104,14 @@ func (k *Kraken) WsConnect() error { err) } if k.Verbose { - log.Debugf("Successful connection to %v", k.Websocket.GetWebsocketURL()) + log.Debugf("Successful connection to %v", + k.Websocket.GetWebsocketURL()) } go k.WsHandleData() go k.wsPingHandler() - k.WsSubscribeToDefaults() + if subscribeToDefaultChannels { + k.WsSubscribeToDefaults() + } return nil } @@ -133,7 +151,9 @@ func (k *Kraken) WsReadData() (exchange.WebsocketResponse, error) { } } if k.Verbose { - log.Debugf("%v Websocket message received: %v", k.Name, string(standardMessage)) + log.Debugf("%v Websocket message received: %v", + k.Name, + string(standardMessage)) } return exchange.WebsocketResponse{Raw: standardMessage}, nil @@ -152,7 +172,8 @@ func (k *Kraken) wsPingHandler() { case <-ticker.C: pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing) if k.Verbose { - log.Debugf("%v sending ping", k.GetName()) + log.Debugf("%v sending ping", + k.GetName()) } err := k.writeToWebsocket([]byte(pingEvent)) if err != nil { @@ -207,7 +228,7 @@ func (k *Kraken) WsHandleData() { // WsHandleDataResponse classifies the WS response and sends to appropriate handler func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { - channelID := response[0].(float64) + channelID := int64(response[0].(float64)) channelData := getSubscriptionChannelData(channelID) switch channelData.Subscription { case krakenWsTicker: @@ -215,31 +236,31 @@ func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { log.Debugf("%v Websocket ticker data received", k.GetName()) } - k.wsProcessTickers(channelData, response[1]) + k.wsProcessTickers(&channelData, response[1]) case krakenWsOHLC: if k.Verbose { log.Debugf("%v Websocket OHLC data received", k.GetName()) } - k.wsProcessCandles(channelData, response[1]) + k.wsProcessCandles(&channelData, response[1]) case krakenWsOrderbook: if k.Verbose { log.Debugf("%v Websocket Orderbook data received", k.GetName()) } - k.wsProcessOrderBook(channelData, response[1]) + k.wsProcessOrderBook(&channelData, response[1]) case krakenWsSpread: if k.Verbose { log.Debugf("%v Websocket Spread data received", k.GetName()) } - k.wsProcessSpread(channelData, response[1]) + k.wsProcessSpread(&channelData, response[1]) case krakenWsTrade: if k.Verbose { log.Debugf("%v Websocket Trade data received", k.GetName()) } - k.wsProcessTrades(channelData, response[1]) + k.wsProcessTrades(&channelData, response[1]) default: log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response) @@ -251,15 +272,18 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { switch response.Event { case krakenWsHeartbeat: if k.Verbose { - log.Debugf("%v Websocket heartbeat data received", k.GetName()) + log.Debugf("%v Websocket heartbeat data received", + k.GetName()) } case krakenWsPong: if k.Verbose { - log.Debugf("%v Websocket pong data received", k.GetName()) + log.Debugf("%v Websocket pong data received", + k.GetName()) } case krakenWsSystemStatus: if k.Verbose { - log.Debugf("%v Websocket status data received", k.GetName()) + log.Debugf("%v Websocket status data received", + k.GetName()) } if response.Status != "online" { k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", @@ -275,8 +299,11 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { k.GetName()) } if response.Status != "subscribed" { - k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) - k.ResubscribeToChannel(response.Subscription.Name, response.Pair) + if response.RequestID > 0 { + k.Websocket.DataHandler <- fmt.Errorf("requestID: '%v'. Error: %v", response.RequestID, response.WebsocketErrorResponse.ErrorMessage) + } else { + k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) + } return } addNewSubscriptionChannelData(response) @@ -355,7 +382,7 @@ func addNewSubscriptionChannelData(response *WebsocketEventResponse) { } // getSubscriptionChannelData retrieves WebsocketChannelData based on response ID -func getSubscriptionChannelData(id float64) WebsocketChannelData { +func getSubscriptionChannelData(id int64) WebsocketChannelData { for i := range subscriptionChannelPair { if id == subscriptionChannelPair[i].ChannelID { return subscriptionChannelPair[i] @@ -406,7 +433,7 @@ func (k *Kraken) ResubscribeToChannel(channel string, pair currency.Pair) { } // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data interface{}) { tickerData := data.(map[string]interface{}) closeData := tickerData["c"].([]interface{}) openData := tickerData["o"].([]interface{}) @@ -433,7 +460,7 @@ func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interfa } // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data interface{}) { spreadData := data.([]interface{}) bestBid := spreadData[0].(string) bestAsk := spreadData[1].(string) @@ -442,12 +469,15 @@ func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interfac spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9))) if k.Verbose { log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'", - channelData.Pair, bestBid, bestAsk, spreadTimestamp) + channelData.Pair, + bestBid, + bestAsk, + spreadTimestamp) } } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data interface{}) { tradeData := data.([]interface{}) for i := range tradeData { trade := tradeData[i].([]interface{}) @@ -471,17 +501,29 @@ func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interfac // wsProcessOrderBook determines if the orderbook data is partial or update // Then sends to appropriate fun -func (k *Kraken) wsProcessOrderBook(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data interface{}) { obData := data.(map[string]interface{}) if _, ok := obData["as"]; ok { k.wsProcessOrderBookPartial(channelData, obData) - } else if _, ok := obData["a"]; ok { - k.wsProcessOrderBookUpdate(channelData, obData) + } else { + _, asksExist := obData["a"] + _, bidsExist := obData["b"] + if asksExist || bidsExist { + k.mu.Lock() + defer k.mu.Unlock() + k.wsProcessOrderBookBuffer(channelData, obData) + if len(orderbookBuffer[channelData.ChannelID]) >= orderbookBufferLimit { + err := k.wsProcessOrderBookUpdate(channelData) + if err != nil { + k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) + } + } + } } } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obData map[string]interface{}) { +func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, obData map[string]interface{}) { ob := orderbook.Base{ Pair: channelData.Pair, AssetType: krakenWsAssetType, @@ -537,17 +579,20 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obD Asset: krakenWsAssetType, Pair: channelData.Pair, } + + if krakenOrderBooks == nil { + krakenOrderBooks = make(map[int64]orderbook.Base) + } + krakenOrderBooks[channelData.ChannelID] = ob } -// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obData map[string]interface{}) { - ob, err := k.GetOrderbookEx(channelData.Pair, krakenWsAssetType) - if err != nil { - k.Websocket.DataHandler <- err - return +func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obData map[string]interface{}) { + ob := orderbook.Base{ + AssetType: krakenWsAssetType, + ExchangeName: k.GetName(), + Pair: channelData.Pair, } - // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry - // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time // Ask data is not always sent if _, ok := obData["a"]; ok { @@ -556,16 +601,6 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa asks := askData[i].([]interface{}) price, _ := strconv.ParseFloat(asks[0].(string), 64) amount, _ := strconv.ParseFloat(asks[1].(string), 64) - - if amount == 0 { - for j := 0; j < len(ob.Asks); j++ { - if ob.Asks[j].Price == price { - ob.Asks = append(ob.Asks[:j], ob.Asks[j+1:]...) - j-- - continue - } - } - } ob.Asks = append(ob.Asks, orderbook.Item{ Amount: amount, Price: price, @@ -586,17 +621,6 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa bids := bidData[i].([]interface{}) price, _ := strconv.ParseFloat(bids[0].(string), 64) amount, _ := strconv.ParseFloat(bids[1].(string), 64) - - if amount == 0 { - for j := 0; j < len(ob.Bids); j++ { - if ob.Bids[j].Price == price { - ob.Bids = append(ob.Bids[:j], ob.Bids[j+1:]...) - j-- - continue - } - } - } - ob.Bids = append(ob.Bids, orderbook.Item{ Amount: amount, Price: price, @@ -610,27 +634,183 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa } } } - - if ob.LastUpdated.After(highestLastUpdate) { - log.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", ob.LastUpdated, highestLastUpdate) - k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) - return - } ob.LastUpdated = highestLastUpdate - err = k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if orderbookBuffer == nil { + orderbookBuffer = make(map[int64][]orderbook.Base) + } + orderbookBuffer[channelData.ChannelID] = append(orderbookBuffer[channelData.ChannelID], ob) + if k.Verbose { + log.Debugf("Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v", + channelData.ChannelID, + ob.LastUpdated, + len(orderbookBuffer[channelData.ChannelID]), + orderbookBufferLimit) + } +} + +// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) error { + if k.Verbose { + log.Debugf("Current orderbook 'LastUpdated': %v", + krakenOrderBooks[channelData.ChannelID].LastUpdated) + } + lowestLastUpdated := orderbookBuffer[channelData.ChannelID][0].LastUpdated + if k.Verbose { + log.Debugf("Sorting orderbook. Earliest 'LastUpdated' entry: %v", + lowestLastUpdated) + } + sort.Slice(orderbookBuffer[channelData.ChannelID], func(i, j int) bool { + return orderbookBuffer[channelData.ChannelID][i].LastUpdated.Before(orderbookBuffer[channelData.ChannelID][j].LastUpdated) + }) + + lowestLastUpdated = orderbookBuffer[channelData.ChannelID][0].LastUpdated + if k.Verbose { + log.Debugf("Sorted orderbook. Earliest 'LastUpdated' entry: %v", + lowestLastUpdated) + } + // The earliest update has to be after the previously stored orderbook + if krakenOrderBooks[channelData.ChannelID].LastUpdated.After(lowestLastUpdated) { + err := fmt.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", + krakenOrderBooks[channelData.ChannelID].LastUpdated, + lowestLastUpdated) + k.Websocket.DataHandler <- err + return err + } + + k.updateChannelOrderbookEntries(channelData) + highestLastUpdate := orderbookBuffer[channelData.ChannelID][len(orderbookBuffer[channelData.ChannelID])-1].LastUpdated + if k.Verbose { + log.Debugf("Saving orderbook. Lastupdated: %v", + highestLastUpdate) + } + + ob := krakenOrderBooks[channelData.ChannelID] + ob.LastUpdated = highestLastUpdate + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) if err != nil { k.Websocket.DataHandler <- err - return + return err } + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ Exchange: k.GetName(), Asset: krakenWsAssetType, Pair: channelData.Pair, } + // Reset the buffer + orderbookBuffer[channelData.ChannelID] = []orderbook.Base{} + return nil +} + +func (k *Kraken) updateChannelOrderbookEntries(channelData *WebsocketChannelData) { + for i := 0; i < len(orderbookBuffer[channelData.ChannelID]); i++ { + for j := 0; j < len(orderbookBuffer[channelData.ChannelID][i].Asks); j++ { + k.updateChannelOrderbookAsks(i, j, channelData) + } + for j := 0; j < len(orderbookBuffer[channelData.ChannelID][i].Bids); j++ { + k.updateChannelOrderbookBids(i, j, channelData) + } + } +} + +func (k *Kraken) updateChannelOrderbookAsks(i, j int, channelData *WebsocketChannelData) { + askFound := k.updateChannelOrderbookAsk(i, j, channelData) + if !askFound { + if k.Verbose { + log.Debugf("Adding Ask for channel %v. Price %v. Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Asks = append(ob.Asks, orderbookBuffer[channelData.ChannelID][i].Asks[j]) + krakenOrderBooks[channelData.ChannelID] = ob + } +} + +func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChannelData) bool { + askFound := false + for l := 0; l < len(krakenOrderBooks[channelData.ChannelID].Asks); l++ { + if krakenOrderBooks[channelData.ChannelID].Asks[l].Price == orderbookBuffer[channelData.ChannelID][i].Asks[j].Price { + askFound = true + if orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount == 0 { + // Remove existing entry + if k.Verbose { + log.Debugf("Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, i) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Asks = append(ob.Asks[:l], ob.Asks[l+1:]...) + krakenOrderBooks[channelData.ChannelID] = ob + l-- + } else if krakenOrderBooks[channelData.ChannelID].Asks[l].Amount != orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount { + if k.Verbose { + log.Debugf("Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount) + } + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount = orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount + } + return askFound + } + } + return askFound +} + +func (k *Kraken) updateChannelOrderbookBids(i, j int, channelData *WebsocketChannelData) { + bidFound := k.updateChannelOrderbookBid(i, j, channelData) + if !bidFound { + if k.Verbose { + log.Debugf("Adding Bid for channel %v. Price %v. Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Bids = append(ob.Bids, orderbookBuffer[channelData.ChannelID][i].Bids[j]) + krakenOrderBooks[channelData.ChannelID] = ob + } +} + +func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChannelData) bool { + bidFound := false + for l := 0; l < len(krakenOrderBooks[channelData.ChannelID].Bids); l++ { + if krakenOrderBooks[channelData.ChannelID].Bids[l].Price == orderbookBuffer[channelData.ChannelID][i].Bids[j].Price { + bidFound = true + if orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount == 0 { + // Remove existing entry + if k.Verbose { + log.Debugf("Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, i) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Bids = append(ob.Bids[:l], ob.Bids[l+1:]...) + krakenOrderBooks[channelData.ChannelID] = ob + l-- + } else if krakenOrderBooks[channelData.ChannelID].Bids[l].Amount != orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount { + if k.Verbose { + log.Debugf("Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount) + } + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount = orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount + } + return bidFound + } + } + return bidFound } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data interface{}) { candleData := data.([]interface{}) startTimeData, _ := strconv.ParseInt(candleData[0].(string), 10, 64) startTimeUnix := time.Unix(startTimeData, 0)