diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 64a57e5e..97973b7e 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1,8 +1,11 @@ package kraken import ( + "fmt" + "strings" "testing" + "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency" exchange "github.com/thrasher-/gocryptotrader/exchanges" @@ -36,6 +39,7 @@ func TestSetup(t *testing.T) { krakenConfig.APISecret = apiSecret krakenConfig.ClientID = clientID krakenConfig.WebsocketURL = k.WebsocketURL + subscribeToDefaultChannels = false k.Setup(&krakenConfig) } @@ -638,6 +642,113 @@ func TestWithdrawCancel(t *testing.T) { // ---------------------------- Websocket tests ----------------------------------------- +// TestOrderbookBufferReset websocket test +func TestOrderbookBufferReset(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if k.WebsocketConn == nil { + k.Websocket.Connect() + } + var obUpdates []string + obpartial := `[0,{"as":[["5541.30000","2.50700000","0"]],"bs":[["5541.20000","1.52900000","0"]]}]` + for i := 1; i < orderbookBufferLimit+2; i++ { + obUpdates = append(obUpdates, fmt.Sprintf(`[0,{"a":[["5541.30000","2.50700000","%v"]],"b":[["5541.30000","1.00000000","%v"]]}]`, i, i)) + } + k.Websocket.DataHandler = make(chan interface{}, 10) + var dataResponse WebsocketDataResponse + err := common.JSONDecode([]byte(obpartial), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData := dataResponse[1].(map[string]interface{}) + channelData := WebsocketChannelData{ + ChannelID: 0, + Subscription: "orderbook", + Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"), + } + + k.wsProcessOrderBookPartial( + &channelData, + obData, + ) + + for i := 0; i < len(obUpdates); i++ { + err = common.JSONDecode([]byte(obUpdates[i]), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + if i < len(obUpdates)-1 { + k.wsProcessOrderBookBuffer(&channelData, obData) + } else if i == len(obUpdates)-1 { + k.wsProcessOrderBookUpdate(&channelData) + k.wsProcessOrderBookBuffer(&channelData, obData) + if len(orderbookBuffer[channelData.ChannelID]) != 1 { + t.Error("Buffer should have 1 entry after being reset") + } + } + } +} + +// TestOrderbookBufferReset websocket test +func TestOrderBookOutOfOrder(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if k.WebsocketConn == nil { + k.Websocket.Connect() + } + obpartial := `[0,{"as":[["5541.30000","2.50700000","0"]],"bs":[["5541.20000","1.52900000","5"]]}]` + obupdate1 := `[0,{"a":[["5541.30000","0.00000000","1"]],"b":[["5541.30000","0.00000000","3"]]}]` + obupdate2 := `[0,{"a":[["5541.30000","2.50700000","2"]],"b":[["5541.30000","0.00000000","1"]]}]` + + k.Websocket.DataHandler = make(chan interface{}, 10) + var dataResponse WebsocketDataResponse + err := common.JSONDecode([]byte(obpartial), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData := dataResponse[1].(map[string]interface{}) + channelData := WebsocketChannelData{ + ChannelID: 0, + Subscription: "orderbook", + Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"), + } + + k.wsProcessOrderBookPartial( + &channelData, + obData, + ) + + err = common.JSONDecode([]byte(obupdate1), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + k.wsProcessOrderBookBuffer(&channelData, obData) + + err = common.JSONDecode([]byte(obupdate2), &dataResponse) + if err != nil { + t.Errorf("Could not parse, %v", err) + } + obData = dataResponse[1].(map[string]interface{}) + k.wsProcessOrderBookBuffer(&channelData, obData) + + err = k.wsProcessOrderBookUpdate(&channelData) + if !strings.Contains(err.Error(), "orderbook update out of order") { + t.Error("Expected out of order orderbook error") + } +} + // TestSubscribeToChannel websocket test func TestSubscribeToChannel(t *testing.T) { if k.Name == "" { @@ -647,12 +758,11 @@ func TestSubscribeToChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - <-k.Websocket.TrafficAlert - err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + err := k.WsSubscribeToChannel("ticker", []string{"XTZ/USD"}, 1) if err != nil { t.Error(err) } @@ -667,7 +777,7 @@ func TestSubscribeToNonExistentChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } err := k.WsSubscribeToChannel("ticker", []string{"pewdiepie"}, 1) @@ -696,14 +806,14 @@ func TestSubscribeUnsubscribeToChannel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + err := k.WsSubscribeToChannel("ticker", []string{"XRP/JPY"}, 1) if err != nil { t.Error(err) } - err = k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 2) + err = k.WsUnsubscribeToChannel("ticker", []string{"XRP/JPY"}, 2) if err != nil { t.Error(err) } @@ -718,18 +828,19 @@ func TestUnsubscribeWithoutSubscription(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 3) + err := k.WsUnsubscribeToChannel("ticker", []string{"QTUM/EUR"}, 3) if err != nil { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler + t.Log(response) if err, ok := response.(error); ok && err != nil { - if err.Error() == "Subscription Not Found" { + if err.Error() == "requestID: '3'. Error: Subscription Not Found" { unsubscriptionError = true break } @@ -749,18 +860,18 @@ func TestUnsubscribeWithChannelID(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } - err := k.WsUnsubscribeToChannelByChannelID(3) + err := k.WsUnsubscribeToChannelByChannelID(100) if err != nil { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { - if err.Error() == "Subscription Not Found" { + if err.Error() == "Not subscribed to the requested channelID" { unsubscriptionError = true break } @@ -771,8 +882,8 @@ func TestUnsubscribeWithChannelID(t *testing.T) { } } -// TestUnsubscribeFromNonExistentChennel websocket test -func TestUnsubscribeFromNonExistentChennel(t *testing.T) { +// TestUnsubscribeFromNonExistentChannel websocket test +func TestUnsubscribeFromNonExistentChannel(t *testing.T) { if k.Name == "" { k.SetDefaults() TestSetup(t) @@ -780,7 +891,7 @@ func TestUnsubscribeFromNonExistentChennel(t *testing.T) { if !k.Websocket.IsEnabled() { t.Skip("Websocket not enabled, skipping") } - if !k.Websocket.IsConnected() { + if k.WebsocketConn == nil { k.Websocket.Connect() } err := k.WsUnsubscribeToChannel("ticker", []string{"tseries"}, 0) @@ -788,11 +899,13 @@ func TestUnsubscribeFromNonExistentChennel(t *testing.T) { t.Error(err) } unsubscriptionError := false - for i := 0; i < 7; i++ { + for i := 0; i < 5; i++ { response := <-k.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { - unsubscriptionError = true - break + if err.Error() == "Currency pair not in ISO 4217-A3 format tseries" { + unsubscriptionError = true + break + } } } if !unsubscriptionError { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index f38c7cd1..eb62d7fd 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -415,6 +415,7 @@ type WebsocketEventResponse struct { Event string `json:"event"` Status string `json:"status"` Pair currency.Pair `json:"pair,omitempty"` + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` WebsocketSubscriptionEventResponse WebsocketStatusResponse @@ -422,7 +423,7 @@ type WebsocketEventResponse struct { } type WebsocketSubscriptionEventResponse struct { - ChannelID float64 `json:"channelID"` + ChannelID int64 `json:"channelID"` } type WebsocketSubscriptionResponseData struct { @@ -444,5 +445,5 @@ type WebsocketErrorResponse struct { type WebsocketChannelData struct { Subscription string Pair currency.Pair - ChannelID float64 + ChannelID int64 } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index ce03aa24..facd125a 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -9,6 +9,7 @@ import ( "math" "net/http" "net/url" + "sort" "strconv" "strings" "sync" @@ -44,20 +45,33 @@ const ( krakenWsSpread = "spread" krakenWsOrderbook = "book" // Only supported asset type - krakenWsAssetType = "SPOT" + krakenWsAssetType = "SPOT" + orderbookBufferLimit = 3 ) // orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time var orderbookMutex sync.Mutex var subscriptionChannelPair []WebsocketChannelData +// krakenOrderBooks TODO THIS IS A TEMPORARY SOLUTION UNTIL ENGINE BRANCH IS MERGED +// WS orderbook data can only rely on WS orderbook data +// Currently REST and WS runs simultaneously, dirtying the data +var krakenOrderBooks map[int64]orderbook.Base + +// orderbookBuffer Stores orderbook updates per channel +var orderbookBuffer map[int64][]orderbook.Base +var subscribeToDefaultChannels = true + // writeToWebsocket sends a message to the websocket endpoint func (k *Kraken) writeToWebsocket(message []byte) error { k.mu.Lock() defer k.mu.Unlock() if k.Verbose { - log.Debugf("Sending message to WS: %v", string(message)) + log.Debugf("Sending message to WS: %v", + string(message)) } + // Really basic WS rate limit + time.Sleep(30 * time.Millisecond) return k.WebsocketConn.WriteMessage(websocket.TextMessage, message) } @@ -79,7 +93,8 @@ func (k *Kraken) WsConnect() error { var err error if k.Verbose { - log.Debugf("Attempting to connect to %v", k.Websocket.GetWebsocketURL()) + log.Debugf("Attempting to connect to %v", + k.Websocket.GetWebsocketURL()) } k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(), http.Header{}) @@ -89,11 +104,14 @@ func (k *Kraken) WsConnect() error { err) } if k.Verbose { - log.Debugf("Successful connection to %v", k.Websocket.GetWebsocketURL()) + log.Debugf("Successful connection to %v", + k.Websocket.GetWebsocketURL()) } go k.WsHandleData() go k.wsPingHandler() - k.WsSubscribeToDefaults() + if subscribeToDefaultChannels { + k.WsSubscribeToDefaults() + } return nil } @@ -133,7 +151,9 @@ func (k *Kraken) WsReadData() (exchange.WebsocketResponse, error) { } } if k.Verbose { - log.Debugf("%v Websocket message received: %v", k.Name, string(standardMessage)) + log.Debugf("%v Websocket message received: %v", + k.Name, + string(standardMessage)) } return exchange.WebsocketResponse{Raw: standardMessage}, nil @@ -152,7 +172,8 @@ func (k *Kraken) wsPingHandler() { case <-ticker.C: pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing) if k.Verbose { - log.Debugf("%v sending ping", k.GetName()) + log.Debugf("%v sending ping", + k.GetName()) } err := k.writeToWebsocket([]byte(pingEvent)) if err != nil { @@ -207,7 +228,7 @@ func (k *Kraken) WsHandleData() { // WsHandleDataResponse classifies the WS response and sends to appropriate handler func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { - channelID := response[0].(float64) + channelID := int64(response[0].(float64)) channelData := getSubscriptionChannelData(channelID) switch channelData.Subscription { case krakenWsTicker: @@ -215,31 +236,31 @@ func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { log.Debugf("%v Websocket ticker data received", k.GetName()) } - k.wsProcessTickers(channelData, response[1]) + k.wsProcessTickers(&channelData, response[1]) case krakenWsOHLC: if k.Verbose { log.Debugf("%v Websocket OHLC data received", k.GetName()) } - k.wsProcessCandles(channelData, response[1]) + k.wsProcessCandles(&channelData, response[1]) case krakenWsOrderbook: if k.Verbose { log.Debugf("%v Websocket Orderbook data received", k.GetName()) } - k.wsProcessOrderBook(channelData, response[1]) + k.wsProcessOrderBook(&channelData, response[1]) case krakenWsSpread: if k.Verbose { log.Debugf("%v Websocket Spread data received", k.GetName()) } - k.wsProcessSpread(channelData, response[1]) + k.wsProcessSpread(&channelData, response[1]) case krakenWsTrade: if k.Verbose { log.Debugf("%v Websocket Trade data received", k.GetName()) } - k.wsProcessTrades(channelData, response[1]) + k.wsProcessTrades(&channelData, response[1]) default: log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response) @@ -251,15 +272,18 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { switch response.Event { case krakenWsHeartbeat: if k.Verbose { - log.Debugf("%v Websocket heartbeat data received", k.GetName()) + log.Debugf("%v Websocket heartbeat data received", + k.GetName()) } case krakenWsPong: if k.Verbose { - log.Debugf("%v Websocket pong data received", k.GetName()) + log.Debugf("%v Websocket pong data received", + k.GetName()) } case krakenWsSystemStatus: if k.Verbose { - log.Debugf("%v Websocket status data received", k.GetName()) + log.Debugf("%v Websocket status data received", + k.GetName()) } if response.Status != "online" { k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", @@ -275,8 +299,11 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { k.GetName()) } if response.Status != "subscribed" { - k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) - k.ResubscribeToChannel(response.Subscription.Name, response.Pair) + if response.RequestID > 0 { + k.Websocket.DataHandler <- fmt.Errorf("requestID: '%v'. Error: %v", response.RequestID, response.WebsocketErrorResponse.ErrorMessage) + } else { + k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) + } return } addNewSubscriptionChannelData(response) @@ -355,7 +382,7 @@ func addNewSubscriptionChannelData(response *WebsocketEventResponse) { } // getSubscriptionChannelData retrieves WebsocketChannelData based on response ID -func getSubscriptionChannelData(id float64) WebsocketChannelData { +func getSubscriptionChannelData(id int64) WebsocketChannelData { for i := range subscriptionChannelPair { if id == subscriptionChannelPair[i].ChannelID { return subscriptionChannelPair[i] @@ -406,7 +433,7 @@ func (k *Kraken) ResubscribeToChannel(channel string, pair currency.Pair) { } // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data interface{}) { tickerData := data.(map[string]interface{}) closeData := tickerData["c"].([]interface{}) openData := tickerData["o"].([]interface{}) @@ -433,7 +460,7 @@ func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interfa } // wsProcessTickers converts ticker data and sends it to the datahandler -func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data interface{}) { spreadData := data.([]interface{}) bestBid := spreadData[0].(string) bestAsk := spreadData[1].(string) @@ -442,12 +469,15 @@ func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interfac spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9))) if k.Verbose { log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'", - channelData.Pair, bestBid, bestAsk, spreadTimestamp) + channelData.Pair, + bestBid, + bestAsk, + spreadTimestamp) } } // wsProcessTrades converts trade data and sends it to the datahandler -func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data interface{}) { tradeData := data.([]interface{}) for i := range tradeData { trade := tradeData[i].([]interface{}) @@ -471,17 +501,29 @@ func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interfac // wsProcessOrderBook determines if the orderbook data is partial or update // Then sends to appropriate fun -func (k *Kraken) wsProcessOrderBook(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data interface{}) { obData := data.(map[string]interface{}) if _, ok := obData["as"]; ok { k.wsProcessOrderBookPartial(channelData, obData) - } else if _, ok := obData["a"]; ok { - k.wsProcessOrderBookUpdate(channelData, obData) + } else { + _, asksExist := obData["a"] + _, bidsExist := obData["b"] + if asksExist || bidsExist { + k.mu.Lock() + defer k.mu.Unlock() + k.wsProcessOrderBookBuffer(channelData, obData) + if len(orderbookBuffer[channelData.ChannelID]) >= orderbookBufferLimit { + err := k.wsProcessOrderBookUpdate(channelData) + if err != nil { + k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) + } + } + } } } // wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obData map[string]interface{}) { +func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, obData map[string]interface{}) { ob := orderbook.Base{ Pair: channelData.Pair, AssetType: krakenWsAssetType, @@ -537,17 +579,20 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obD Asset: krakenWsAssetType, Pair: channelData.Pair, } + + if krakenOrderBooks == nil { + krakenOrderBooks = make(map[int64]orderbook.Base) + } + krakenOrderBooks[channelData.ChannelID] = ob } -// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair -func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obData map[string]interface{}) { - ob, err := k.GetOrderbookEx(channelData.Pair, krakenWsAssetType) - if err != nil { - k.Websocket.DataHandler <- err - return +func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obData map[string]interface{}) { + ob := orderbook.Base{ + AssetType: krakenWsAssetType, + ExchangeName: k.GetName(), + Pair: channelData.Pair, } - // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry - // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time // Ask data is not always sent if _, ok := obData["a"]; ok { @@ -556,16 +601,6 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa asks := askData[i].([]interface{}) price, _ := strconv.ParseFloat(asks[0].(string), 64) amount, _ := strconv.ParseFloat(asks[1].(string), 64) - - if amount == 0 { - for j := 0; j < len(ob.Asks); j++ { - if ob.Asks[j].Price == price { - ob.Asks = append(ob.Asks[:j], ob.Asks[j+1:]...) - j-- - continue - } - } - } ob.Asks = append(ob.Asks, orderbook.Item{ Amount: amount, Price: price, @@ -586,17 +621,6 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa bids := bidData[i].([]interface{}) price, _ := strconv.ParseFloat(bids[0].(string), 64) amount, _ := strconv.ParseFloat(bids[1].(string), 64) - - if amount == 0 { - for j := 0; j < len(ob.Bids); j++ { - if ob.Bids[j].Price == price { - ob.Bids = append(ob.Bids[:j], ob.Bids[j+1:]...) - j-- - continue - } - } - } - ob.Bids = append(ob.Bids, orderbook.Item{ Amount: amount, Price: price, @@ -610,27 +634,183 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obDa } } } - - if ob.LastUpdated.After(highestLastUpdate) { - log.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", ob.LastUpdated, highestLastUpdate) - k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) - return - } ob.LastUpdated = highestLastUpdate - err = k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if orderbookBuffer == nil { + orderbookBuffer = make(map[int64][]orderbook.Base) + } + orderbookBuffer[channelData.ChannelID] = append(orderbookBuffer[channelData.ChannelID], ob) + if k.Verbose { + log.Debugf("Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v", + channelData.ChannelID, + ob.LastUpdated, + len(orderbookBuffer[channelData.ChannelID]), + orderbookBufferLimit) + } +} + +// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) error { + if k.Verbose { + log.Debugf("Current orderbook 'LastUpdated': %v", + krakenOrderBooks[channelData.ChannelID].LastUpdated) + } + lowestLastUpdated := orderbookBuffer[channelData.ChannelID][0].LastUpdated + if k.Verbose { + log.Debugf("Sorting orderbook. Earliest 'LastUpdated' entry: %v", + lowestLastUpdated) + } + sort.Slice(orderbookBuffer[channelData.ChannelID], func(i, j int) bool { + return orderbookBuffer[channelData.ChannelID][i].LastUpdated.Before(orderbookBuffer[channelData.ChannelID][j].LastUpdated) + }) + + lowestLastUpdated = orderbookBuffer[channelData.ChannelID][0].LastUpdated + if k.Verbose { + log.Debugf("Sorted orderbook. Earliest 'LastUpdated' entry: %v", + lowestLastUpdated) + } + // The earliest update has to be after the previously stored orderbook + if krakenOrderBooks[channelData.ChannelID].LastUpdated.After(lowestLastUpdated) { + err := fmt.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", + krakenOrderBooks[channelData.ChannelID].LastUpdated, + lowestLastUpdated) + k.Websocket.DataHandler <- err + return err + } + + k.updateChannelOrderbookEntries(channelData) + highestLastUpdate := orderbookBuffer[channelData.ChannelID][len(orderbookBuffer[channelData.ChannelID])-1].LastUpdated + if k.Verbose { + log.Debugf("Saving orderbook. Lastupdated: %v", + highestLastUpdate) + } + + ob := krakenOrderBooks[channelData.ChannelID] + ob.LastUpdated = highestLastUpdate + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) if err != nil { k.Websocket.DataHandler <- err - return + return err } + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ Exchange: k.GetName(), Asset: krakenWsAssetType, Pair: channelData.Pair, } + // Reset the buffer + orderbookBuffer[channelData.ChannelID] = []orderbook.Base{} + return nil +} + +func (k *Kraken) updateChannelOrderbookEntries(channelData *WebsocketChannelData) { + for i := 0; i < len(orderbookBuffer[channelData.ChannelID]); i++ { + for j := 0; j < len(orderbookBuffer[channelData.ChannelID][i].Asks); j++ { + k.updateChannelOrderbookAsks(i, j, channelData) + } + for j := 0; j < len(orderbookBuffer[channelData.ChannelID][i].Bids); j++ { + k.updateChannelOrderbookBids(i, j, channelData) + } + } +} + +func (k *Kraken) updateChannelOrderbookAsks(i, j int, channelData *WebsocketChannelData) { + askFound := k.updateChannelOrderbookAsk(i, j, channelData) + if !askFound { + if k.Verbose { + log.Debugf("Adding Ask for channel %v. Price %v. Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Asks = append(ob.Asks, orderbookBuffer[channelData.ChannelID][i].Asks[j]) + krakenOrderBooks[channelData.ChannelID] = ob + } +} + +func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChannelData) bool { + askFound := false + for l := 0; l < len(krakenOrderBooks[channelData.ChannelID].Asks); l++ { + if krakenOrderBooks[channelData.ChannelID].Asks[l].Price == orderbookBuffer[channelData.ChannelID][i].Asks[j].Price { + askFound = true + if orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount == 0 { + // Remove existing entry + if k.Verbose { + log.Debugf("Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, i) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Asks = append(ob.Asks[:l], ob.Asks[l+1:]...) + krakenOrderBooks[channelData.ChannelID] = ob + l-- + } else if krakenOrderBooks[channelData.ChannelID].Asks[l].Amount != orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount { + if k.Verbose { + log.Debugf("Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, + orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount) + } + krakenOrderBooks[channelData.ChannelID].Asks[l].Amount = orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount + } + return askFound + } + } + return askFound +} + +func (k *Kraken) updateChannelOrderbookBids(i, j int, channelData *WebsocketChannelData) { + bidFound := k.updateChannelOrderbookBid(i, j, channelData) + if !bidFound { + if k.Verbose { + log.Debugf("Adding Bid for channel %v. Price %v. Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Bids = append(ob.Bids, orderbookBuffer[channelData.ChannelID][i].Bids[j]) + krakenOrderBooks[channelData.ChannelID] = ob + } +} + +func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChannelData) bool { + bidFound := false + for l := 0; l < len(krakenOrderBooks[channelData.ChannelID].Bids); l++ { + if krakenOrderBooks[channelData.ChannelID].Bids[l].Price == orderbookBuffer[channelData.ChannelID][i].Bids[j].Price { + bidFound = true + if orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount == 0 { + // Remove existing entry + if k.Verbose { + log.Debugf("Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, i) + } + ob := krakenOrderBooks[channelData.ChannelID] + ob.Bids = append(ob.Bids[:l], ob.Bids[l+1:]...) + krakenOrderBooks[channelData.ChannelID] = ob + l-- + } else if krakenOrderBooks[channelData.ChannelID].Bids[l].Amount != orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount { + if k.Verbose { + log.Debugf("Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v", + channelData.ChannelID, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, + orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount) + } + krakenOrderBooks[channelData.ChannelID].Bids[l].Amount = orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount + } + return bidFound + } + } + return bidFound } // wsProcessCandles converts candle data and sends it to the data handler -func (k *Kraken) wsProcessCandles(channelData WebsocketChannelData, data interface{}) { +func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data interface{}) { candleData := data.([]interface{}) startTimeData, _ := strconv.ParseInt(candleData[0].(string), 10, 64) startTimeUnix := time.Unix(startTimeData, 0)