Adds support for kraken websocket 0.2.0 (#324)

* Adds support for kraken 0.2.0

* Updates Kraken websocket logs. Removes commented out code

* Runs gofmt a second time
This commit is contained in:
Scott
2019-07-05 06:02:55 +10:00
committed by Adrian Gallagher
parent e60dda0879
commit 5b270f5ec9
2 changed files with 67 additions and 43 deletions

View File

@@ -417,6 +417,7 @@ type WebsocketEventResponse struct {
Pair currency.Pair `json:"pair,omitempty"`
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"`
ChannelName string `json:"channelName,omitempty"`
WebsocketSubscriptionEventResponse
WebsocketStatusResponse
WebsocketErrorResponse

View File

@@ -26,7 +26,7 @@ import (
const (
krakenWSURL = "wss://ws.kraken.com"
krakenWSSandboxURL = "wss://sandbox.kraken.com"
krakenWSSupportedVersion = "0.1.1"
krakenWSSupportedVersion = "0.2.0"
// If a checksum fails, then resubscribing to the channel fails, fatal after these attempts
krakenWsResubscribeFailureLimit = 3
krakenWsResubscribeDelayInSeconds = 3
@@ -71,7 +71,8 @@ func (k *Kraken) writeToWebsocket(message []byte) error {
k.wsRequestMtx.Lock()
defer k.wsRequestMtx.Unlock()
if k.Verbose {
log.Debugf("Sending message to WS: %v",
log.Debugf("%v Sending message to WS: %v",
k.Name,
string(message))
}
// Really basic WS rate limit
@@ -97,7 +98,8 @@ func (k *Kraken) WsConnect() error {
var err error
if k.Verbose {
log.Debugf("Attempting to connect to %v",
log.Debugf("%v Attempting to connect to %v",
k.Name,
k.Websocket.GetWebsocketURL())
}
k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(),
@@ -108,7 +110,8 @@ func (k *Kraken) WsConnect() error {
err)
}
if k.Verbose {
log.Debugf("Successful connection to %v",
log.Debugf("%v Successful connection to %v",
k.Name,
k.Websocket.GetWebsocketURL())
}
go k.WsHandleData()
@@ -165,7 +168,7 @@ func (k *Kraken) wsPingHandler() {
pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing)
if k.Verbose {
log.Debugf("%v sending ping",
k.GetName())
k.Name)
}
err := k.writeToWebsocket([]byte(pingEvent))
if err != nil {
@@ -192,7 +195,7 @@ func (k *Kraken) WsHandleData() {
k.Websocket.DataHandler <- fmt.Errorf("%v WsHandleData: %v",
k.Name,
err)
time.Sleep(time.Second)
return
}
// event response handling
var eventResponse WebsocketEventResponse
@@ -221,36 +224,37 @@ func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) {
case krakenWsTicker:
if k.Verbose {
log.Debugf("%v Websocket ticker data received",
k.GetName())
k.Name)
}
k.wsProcessTickers(&channelData, response[1])
case krakenWsOHLC:
if k.Verbose {
log.Debugf("%v Websocket OHLC data received",
k.GetName())
k.Name)
}
k.wsProcessCandles(&channelData, response[1])
case krakenWsOrderbook:
if k.Verbose {
log.Debugf("%v Websocket Orderbook data received",
k.GetName())
k.Name)
}
k.wsProcessOrderBook(&channelData, response[1])
case krakenWsSpread:
if k.Verbose {
log.Debugf("%v Websocket Spread data received",
k.GetName())
k.Name)
}
k.wsProcessSpread(&channelData, response[1])
case krakenWsTrade:
if k.Verbose {
log.Debugf("%v Websocket Trade data received",
k.GetName())
k.Name)
}
k.wsProcessTrades(&channelData, response[1])
default:
log.Errorf("%v Unidentified websocket data received: %v",
k.GetName(), response)
k.Name,
response)
}
}
@@ -260,34 +264,37 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) {
case krakenWsHeartbeat:
if k.Verbose {
log.Debugf("%v Websocket heartbeat data received",
k.GetName())
k.Name)
}
case krakenWsPong:
if k.Verbose {
log.Debugf("%v Websocket pong data received",
k.GetName())
k.Name)
}
case krakenWsSystemStatus:
if k.Verbose {
log.Debugf("%v Websocket status data received",
k.GetName())
k.Name)
}
if response.Status != "online" {
k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'",
k.GetName(), response.Status)
k.Name, response.Status)
}
if response.WebsocketStatusResponse.Version != krakenWSSupportedVersion {
log.Warnf("%v New version of Websocket API released. Was %v Now %v",
k.GetName(), krakenWSSupportedVersion, response.WebsocketStatusResponse.Version)
k.Name, krakenWSSupportedVersion, response.WebsocketStatusResponse.Version)
}
case krakenWsSubscriptionStatus:
if k.Verbose {
log.Debugf("%v Websocket subscription status data received",
k.GetName())
k.Name)
}
if response.Status != "subscribed" {
if response.RequestID > 0 {
k.Websocket.DataHandler <- fmt.Errorf("requestID: '%v'. Error: %v", response.RequestID, response.WebsocketErrorResponse.ErrorMessage)
k.Websocket.DataHandler <- fmt.Errorf("%v requestID: '%v'. Error: %v",
k.Name,
response.RequestID,
response.WebsocketErrorResponse.ErrorMessage)
} else {
k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage)
}
@@ -295,7 +302,7 @@ func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) {
}
addNewSubscriptionChannelData(response)
default:
log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response)
log.Errorf("%v Unidentified websocket data received: %v", k.Name, response)
}
}
@@ -353,7 +360,7 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data interf
k.Websocket.DataHandler <- exchange.TickerData{
Timestamp: time.Now(),
Exchange: k.GetName(),
Exchange: k.Name,
AssetType: krakenWsAssetType,
Pair: channelData.Pair,
ClosePrice: closePrice,
@@ -370,14 +377,19 @@ func (k *Kraken) wsProcessSpread(channelData *WebsocketChannelData, data interfa
bestBid := spreadData[0].(string)
bestAsk := spreadData[1].(string)
timeData, _ := strconv.ParseFloat(spreadData[2].(string), 64)
bidVolume := spreadData[3].(string)
askVolume := spreadData[4].(string)
sec, dec := math.Modf(timeData)
spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9)))
if k.Verbose {
log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'",
log.Debugf("%v Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v', Bid volume '%v', Ask volume '%v'",
k.Name,
channelData.Pair,
bestBid,
bestAsk,
spreadTimestamp)
spreadTimestamp,
bidVolume,
askVolume)
}
}
@@ -395,7 +407,7 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data interfa
AssetType: krakenWsAssetType,
CurrencyPair: channelData.Pair,
EventTime: time.Now().Unix(),
Exchange: k.GetName(),
Exchange: k.Name,
Price: price,
Amount: amount,
Timestamp: timeUnix,
@@ -477,14 +489,14 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob
}
ob.LastUpdated = highestLastUpdate
err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true)
err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.Name, true)
if err != nil {
k.Websocket.DataHandler <- err
return
}
k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: k.GetName(),
Exchange: k.Name,
Asset: krakenWsAssetType,
Pair: channelData.Pair,
}
@@ -498,7 +510,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob
func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obData map[string]interface{}) {
ob := orderbook.Base{
AssetType: krakenWsAssetType,
ExchangeName: k.GetName(),
ExchangeName: k.Name,
Pair: channelData.Pair,
}
@@ -534,7 +546,6 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD
Amount: amount,
Price: price,
})
timeData, _ := strconv.ParseFloat(bids[2].(string), 64)
sec, dec := math.Modf(timeData)
bidUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9)))
@@ -549,7 +560,8 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD
}
orderbookBuffer[channelData.ChannelID] = append(orderbookBuffer[channelData.ChannelID], ob)
if k.Verbose {
log.Debugf("Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v",
log.Debugf("%v Adding orderbook to buffer for channel %v. Lastupdated: %v. %v / %v",
k.Name,
channelData.ChannelID,
ob.LastUpdated,
len(orderbookBuffer[channelData.ChannelID]),
@@ -560,12 +572,14 @@ func (k *Kraken) wsProcessOrderBookBuffer(channelData *WebsocketChannelData, obD
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) error {
if k.Verbose {
log.Debugf("Current orderbook 'LastUpdated': %v",
log.Debugf("%v Current orderbook 'LastUpdated': %v",
k.Name,
krakenOrderBooks[channelData.ChannelID].LastUpdated)
}
lowestLastUpdated := orderbookBuffer[channelData.ChannelID][0].LastUpdated
if k.Verbose {
log.Debugf("Sorting orderbook. Earliest 'LastUpdated' entry: %v",
log.Debugf("%v Sorting orderbook. Earliest 'LastUpdated' entry: %v",
k.Name,
lowestLastUpdated)
}
sort.Slice(orderbookBuffer[channelData.ChannelID], func(i, j int) bool {
@@ -574,12 +588,14 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) err
lowestLastUpdated = orderbookBuffer[channelData.ChannelID][0].LastUpdated
if k.Verbose {
log.Debugf("Sorted orderbook. Earliest 'LastUpdated' entry: %v",
log.Debugf("%v Sorted orderbook. Earliest 'LastUpdated' entry: %v",
k.Name,
lowestLastUpdated)
}
// The earliest update has to be after the previously stored orderbook
if krakenOrderBooks[channelData.ChannelID].LastUpdated.After(lowestLastUpdated) {
err := fmt.Errorf("orderbook update out of order. Existing: %v, Attempted: %v",
err := fmt.Errorf("%v orderbook update out of order. Existing: %v, Attempted: %v",
k.Name,
krakenOrderBooks[channelData.ChannelID].LastUpdated,
lowestLastUpdated)
k.Websocket.DataHandler <- err
@@ -589,20 +605,21 @@ func (k *Kraken) wsProcessOrderBookUpdate(channelData *WebsocketChannelData) err
k.updateChannelOrderbookEntries(channelData)
highestLastUpdate := orderbookBuffer[channelData.ChannelID][len(orderbookBuffer[channelData.ChannelID])-1].LastUpdated
if k.Verbose {
log.Debugf("Saving orderbook. Lastupdated: %v",
log.Debugf("%v Saving orderbook. Lastupdated: %v",
k.Name,
highestLastUpdate)
}
ob := krakenOrderBooks[channelData.ChannelID]
ob.LastUpdated = highestLastUpdate
err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true)
err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.Name, true)
if err != nil {
k.Websocket.DataHandler <- err
return err
}
k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: k.GetName(),
Exchange: k.Name,
Asset: krakenWsAssetType,
Pair: channelData.Pair,
}
@@ -626,7 +643,8 @@ func (k *Kraken) updateChannelOrderbookAsks(i, j int, channelData *WebsocketChan
askFound := k.updateChannelOrderbookAsk(i, j, channelData)
if !askFound {
if k.Verbose {
log.Debugf("Adding Ask for channel %v. Price %v. Amount %v",
log.Debugf("%v Adding Ask for channel %v. Price %v. Amount %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Asks[j].Price,
orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount)
@@ -645,7 +663,8 @@ func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChann
if orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount == 0 {
// Remove existing entry
if k.Verbose {
log.Debugf("Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v",
log.Debugf("%v Removing Ask for channel %v. Price %v. Old amount %v. Buffer %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Asks[j].Price,
krakenOrderBooks[channelData.ChannelID].Asks[l].Amount, i)
@@ -656,7 +675,8 @@ func (k *Kraken) updateChannelOrderbookAsk(i, j int, channelData *WebsocketChann
l--
} else if krakenOrderBooks[channelData.ChannelID].Asks[l].Amount != orderbookBuffer[channelData.ChannelID][i].Asks[j].Amount {
if k.Verbose {
log.Debugf("Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v",
log.Debugf("%v Updating Ask for channel %v. Price %v. Old amount %v, New Amount %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Asks[j].Price,
krakenOrderBooks[channelData.ChannelID].Asks[l].Amount,
@@ -674,7 +694,8 @@ func (k *Kraken) updateChannelOrderbookBids(i, j int, channelData *WebsocketChan
bidFound := k.updateChannelOrderbookBid(i, j, channelData)
if !bidFound {
if k.Verbose {
log.Debugf("Adding Bid for channel %v. Price %v. Amount %v",
log.Debugf("%v Adding Bid for channel %v. Price %v. Amount %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Bids[j].Price,
orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount)
@@ -693,7 +714,8 @@ func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChann
if orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount == 0 {
// Remove existing entry
if k.Verbose {
log.Debugf("Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v",
log.Debugf("%v Removing Bid for channel %v. Price %v. Old amount %v. Buffer %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Bids[j].Price,
krakenOrderBooks[channelData.ChannelID].Bids[l].Amount, i)
@@ -704,7 +726,8 @@ func (k *Kraken) updateChannelOrderbookBid(i, j int, channelData *WebsocketChann
l--
} else if krakenOrderBooks[channelData.ChannelID].Bids[l].Amount != orderbookBuffer[channelData.ChannelID][i].Bids[j].Amount {
if k.Verbose {
log.Debugf("Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v",
log.Debugf("%v Updating Bid for channel %v. Price %v. Old amount %v, New Amount %v",
k.Name,
channelData.ChannelID,
orderbookBuffer[channelData.ChannelID][i].Bids[j].Price,
krakenOrderBooks[channelData.ChannelID].Bids[l].Amount,
@@ -735,7 +758,7 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data interf
AssetType: krakenWsAssetType,
Pair: channelData.Pair,
Timestamp: time.Now(),
Exchange: k.GetName(),
Exchange: k.Name,
StartTime: startTimeUnix,
CloseTime: endTimeUnix,
// Candles are sent every 60 seconds