From 5b270f5ec9fd1370600e3adc27f70c023f9ab111 Mon Sep 17 00:00:00 2001 From: Scott Date: Fri, 5 Jul 2019 06:02:55 +1000 Subject: [PATCH] Adds support for kraken websocket 0.2.0 (#324) * Adds support for kraken 0.2.0 * Updates Kraken websocket logs. Removes commented out code * Runs gofmt a second time --- exchanges/kraken/kraken_types.go | 1 + exchanges/kraken/kraken_websocket.go | 109 ++++++++++++++++----------- 2 files changed, 67 insertions(+), 43 deletions(-) diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index cd245fdc..f72c4ecf 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -417,6 +417,7 @@ type WebsocketEventResponse struct { Pair currency.Pair `json:"pair,omitempty"` RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` + ChannelName string `json:"channelName,omitempty"` WebsocketSubscriptionEventResponse WebsocketStatusResponse WebsocketErrorResponse diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 2126f192..12f14cd0 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -26,7 +26,7 @@ import ( const ( krakenWSURL = "wss://ws.kraken.com" krakenWSSandboxURL = "wss://sandbox.kraken.com" - krakenWSSupportedVersion = "0.1.1" + krakenWSSupportedVersion = "0.2.0" // If a checksum fails, then resubscribing to the channel fails, fatal after these attempts krakenWsResubscribeFailureLimit = 3 krakenWsResubscribeDelayInSeconds = 3 @@ -71,7 +71,8 @@ func (k *Kraken) writeToWebsocket(message []byte) error { k.wsRequestMtx.Lock() defer k.wsRequestMtx.Unlock() if k.Verbose { - log.Debugf("Sending message to WS: %v", + log.Debugf("%v Sending message to WS: %v", + k.Name, string(message)) } // Really basic WS rate limit @@ -97,7 +98,8 @@ func (k *Kraken) WsConnect() error { var err error if k.Verbose { - log.Debugf("Attempting to connect to %v", + log.Debugf("%v Attempting to connect to %v", + k.Name, k.Websocket.GetWebsocketURL()) } k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(), @@ -108,7 +110,8 @@ func (k *Kraken) WsConnect() error { err) } if k.Verbose { - log.Debugf("Successful connection to %v", + log.Debugf("%v Successful connection to %v", + k.Name, k.Websocket.GetWebsocketURL()) } go k.WsHandleData() @@ -165,7 +168,7 @@ func (k *Kraken) wsPingHandler() { pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing) if k.Verbose { log.Debugf("%v sending ping", - k.GetName()) + k.Name) } err := k.writeToWebsocket([]byte(pingEvent)) if err != nil { @@ -192,7 +195,7 @@ func (k *Kraken) WsHandleData() { k.Websocket.DataHandler <- fmt.Errorf("%v WsHandleData: %v", k.Name, err) - time.Sleep(time.Second) + return } // event response handling var eventResponse WebsocketEventResponse @@ -221,36 +224,37 @@ func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { case krakenWsTicker: if k.Verbose { log.Debugf("%v Websocket ticker data received", - k.GetName()) + k.Name) } k.wsProcessTickers(&channelData, response[1]) case krakenWsOHLC: if k.Verbose { log.Debugf("%v Websocket OHLC data received", - k.GetName()) + k.Name) } k.wsProcessCandles(&channelData, response[1]) case krakenWsOrderbook: if k.Verbose { log.Debugf("%v Websocket Orderbook data received", - k.GetName()) + k.Name) } k.wsProcessOrderBook(&channelData, response[1]) case krakenWsSpread: if k.Verbose { log.Debugf("%v Websocket Spread data received", - k.GetName()) + k.Name) } k.wsProcessSpread(&channelData, response[1]) case krakenWsTrade: if k.Verbose { log.Debugf("%v Websocket Trade data received", - k.GetName()) + k.Name) } k.wsProcessTrades(&channelData, response[1]) default: log.Errorf("%v Unidentified websocket data received: %v", - k.GetName(), response) + k.Name, + response) } } @@ -260,34 +264,37 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { case krakenWsHeartbeat: if k.Verbose { log.Debugf("%v Websocket heartbeat data received", - k.GetName()) + k.Name) } case krakenWsPong: if k.Verbose { log.Debugf("%v Websocket pong data received", - k.GetName()) + k.Name) } case krakenWsSystemStatus: if k.Verbose { log.Debugf("%v Websocket status data received", - k.GetName()) + k.Name) } if response.Status != "online" { k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", - k.GetName(), response.Status) + k.Name, response.Status) } if response.WebsocketStatusResponse.Version != krakenWSSupportedVersion { log.Warnf("%v New version of Websocket API released. Was %v Now %v", - k.GetName(), krakenWSSupportedVersion, response.WebsocketStatusResponse.Version) + k.Name, krakenWSSupportedVersion, response.WebsocketStatusResponse.Version) } case krakenWsSubscriptionStatus: if k.Verbose { log.Debugf("%v Websocket subscription status data received", - k.GetName()) + k.Name) } if response.Status != "subscribed" { if response.RequestID > 0 { - k.Websocket.DataHandler <- fmt.Errorf("requestID: '%v'. Error: %v", response.RequestID, response.WebsocketErrorResponse.ErrorMessage) + k.Websocket.DataHandler <- fmt.Errorf("%v requestID: '%v'. Error: %v", + k.Name, + response.RequestID, + response.WebsocketErrorResponse.ErrorMessage) } else { k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) } @@ -295,7 +302,7 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { } addNewSubscriptionChannelData(response) default: - log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response) + log.Errorf("%v Unidentified websocket data received: %v", k.Name, response) } } @@ -353,7 +360,7 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data interf k.Websocket.DataHandler <- exchange.TickerData{ Timestamp: time.Now(), - Exchange: k.GetName(), + Exchange: k.Name, AssetType: krakenWsAssetType, Pair: channelData.Pair, ClosePrice: closePrice, @@ -370,14 +377,19 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data interfa bestBid := spreadData[0].(string) bestAsk := spreadData[1].(string) timeData, _ := strconv.ParseFloat(spreadData[2].(string), 64) + bidVolume := spreadData[3].(string) + askVolume := spreadData[4].(string) sec, dec := math.Modf(timeData) spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9))) if k.Verbose { - log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'", + log.Debugf("%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'", + k.Name, channelData.Pair, bestBid, bestAsk, - spreadTimestamp) + spreadTimestamp, + bidVolume, + askVolume) } } @@ -395,7 +407,7 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data interfa AssetType: krakenWsAssetType, CurrencyPair: channelData.Pair, EventTime: time.Now().Unix(), - Exchange: k.GetName(), + Exchange: k.Name, Price: price, Amount: amount, Timestamp: timeUnix, @@ -477,14 +489,14 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob } ob.LastUpdated = highestLastUpdate - err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.Name, true) if err != nil { k.Websocket.DataHandler <- err return } k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ - Exchange: k.GetName(), + Exchange: k.Name, Asset: krakenWsAssetType, Pair: channelData.Pair, } @@ -498,7 +510,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obData map[string]interface{}) { ob := orderbook.Base{ AssetType: krakenWsAssetType, - ExchangeName: k.GetName(), + ExchangeName: k.Name, Pair: channelData.Pair, } @@ -534,7 +546,6 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD Amount: amount, Price: price, }) - timeData, _ := strconv.ParseFloat(bids[2].(string), 64) sec, dec := math.Modf(timeData) bidUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) @@ -549,7 +560,8 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD } orderbookBuffer[channelData.ChannelID] = append(orderbookBuffer[channelData.ChannelID], ob) if k.Verbose { - log.Debugf("Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v", + log.Debugf("%v Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v", + k.Name, channelData.ChannelID, ob.LastUpdated, len(orderbookBuffer[channelData.ChannelID]), @@ -560,12 +572,14 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD // wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) error { if k.Verbose { - log.Debugf("Current orderbook 'LastUpdated': %v", + log.Debugf("%v Current orderbook 'LastUpdated': %v", + k.Name, krakenOrderBooks[channelData.ChannelID].LastUpdated) } lowestLastUpdated := orderbookBuffer[channelData.ChannelID][0].LastUpdated if k.Verbose { - log.Debugf("Sorting orderbook. Earliest 'LastUpdated' entry: %v", + log.Debugf("%v Sorting orderbook. Earliest 'LastUpdated' entry: %v", + k.Name, lowestLastUpdated) } sort.Slice(orderbookBuffer[channelData.ChannelID], func(i, j int) bool { @@ -574,12 +588,14 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) err lowestLastUpdated = orderbookBuffer[channelData.ChannelID][0].LastUpdated if k.Verbose { - log.Debugf("Sorted orderbook. Earliest 'LastUpdated' entry: %v", + log.Debugf("%v Sorted orderbook. Earliest 'LastUpdated' entry: %v", + k.Name, lowestLastUpdated) } // The earliest update has to be after the previously stored orderbook if krakenOrderBooks[channelData.ChannelID].LastUpdated.After(lowestLastUpdated) { - err := fmt.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", + err := fmt.Errorf("%v orderbook update out of order. Existing: %v, Attempted: %v", + k.Name, krakenOrderBooks[channelData.ChannelID].LastUpdated, lowestLastUpdated) k.Websocket.DataHandler <- err @@ -589,20 +605,21 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) err k.updateChannelOrderbookEntries(channelData) highestLastUpdate := orderbookBuffer[channelData.ChannelID][len(orderbookBuffer[channelData.ChannelID])-1].LastUpdated if k.Verbose { - log.Debugf("Saving orderbook. Lastupdated: %v", + log.Debugf("%v Saving orderbook. Lastupdated: %v", + k.Name, highestLastUpdate) } ob := krakenOrderBooks[channelData.ChannelID] ob.LastUpdated = highestLastUpdate - err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.Name, true) if err != nil { k.Websocket.DataHandler <- err return err } k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ - Exchange: k.GetName(), + Exchange: k.Name, Asset: krakenWsAssetType, Pair: channelData.Pair, } @@ -626,7 +643,8 @@ func (k *Kraken) updateChannelOrderbookAsks(i, j int, channelData *WebsocketChan askFound := k.updateChannelOrderbookAsk(i, j, channelData) if !askFound { if k.Verbose { - log.Debugf("Adding Ask for channel %v. Price %v. Amount %v", + log.Debugf("%v Adding Ask for channel %v. Price %v. Amount %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount) @@ -645,7 +663,8 @@ func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChann if orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount == 0 { // Remove existing entry if k.Verbose { - log.Debugf("Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v", + log.Debugf("%v Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, i) @@ -656,7 +675,8 @@ func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChann l-- } else if krakenOrderBooks[channelData.ChannelID].Asks[l].Amount != orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount { if k.Verbose { - log.Debugf("Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v", + log.Debugf("%v Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Asks[j].Price, krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, @@ -674,7 +694,8 @@ func (k *Kraken) updateChannelOrderbookBids(i, j int, channelData *WebsocketChan bidFound := k.updateChannelOrderbookBid(i, j, channelData) if !bidFound { if k.Verbose { - log.Debugf("Adding Bid for channel %v. Price %v. Amount %v", + log.Debugf("%v Adding Bid for channel %v. Price %v. Amount %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount) @@ -693,7 +714,8 @@ func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChann if orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount == 0 { // Remove existing entry if k.Verbose { - log.Debugf("Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v", + log.Debugf("%v Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, i) @@ -704,7 +726,8 @@ func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChann l-- } else if krakenOrderBooks[channelData.ChannelID].Bids[l].Amount != orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount { if k.Verbose { - log.Debugf("Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v", + log.Debugf("%v Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v", + k.Name, channelData.ChannelID, orderbookBuffer[channelData.ChannelID][i].Bids[j].Price, krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, @@ -735,7 +758,7 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data interf AssetType: krakenWsAssetType, Pair: channelData.Pair, Timestamp: time.Now(), - Exchange: k.GetName(), + Exchange: k.Name, StartTime: startTimeUnix, CloseTime: endTimeUnix, // Candles are sent every 60 seconds