Kraken websocket support (#264)

* Initial commit. Adds ticker, candle and trade, subscription support

* Adds support for spread and orderbooks

* Adds new currency pair delimiter ("/"), Adds dedicated websocket Connected channel handler, Updates Kraken websocket capability definition, Refines websocket tests to connect and disconnect without freezing, separates WebsocketUnsubscribeEventRequest ChannelID into its own struct WebsocketUnsubscribeByChannelIDEventRequest to prevent bad json WS requests, Adds asset type to orderbook, Kraken WS handles connection better

* Removes duplicate type, reverts config value

* Addresses error returns and changes writeToWebsocket to use byte array. Removes deferred funcs in tests. Increases test listening limit for rare cases

* Fixes verbose log. Rearranges WS Connect async ordering. Fixes DATA RACE. Fixes random okex tests. Ensures Kraken WS tests only connect once
This commit is contained in:
Scott
2019-04-04 10:21:44 +11:00
committed by Adrian Gallagher
parent a0e291097e
commit 107cf76373
10 changed files with 960 additions and 19 deletions

View File

@@ -62,7 +62,7 @@ func NewPairFromIndex(currencyPair, index string) (Pair, error) {
// NewPairFromString converts currency string into a new CurrencyPair
// with or without delimeter
func NewPairFromString(currencyPair string) Pair {
delimiters := []string{"_", "-"}
delimiters := []string{"_", "-", "/"}
var delimiter string
for _, x := range delimiters {
if strings.Contains(currencyPair, x) {

View File

@@ -141,7 +141,6 @@ func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) {
select {
case <-w.ShutdownC: // Returns on shutdown channel close
return
case <-w.TrafficAlert: // Resets timer on traffic
if !w.connected {
w.Connected <- struct{}{}
@@ -194,20 +193,21 @@ func (w *Websocket) Connect() error {
w.ShutdownC = make(chan struct{}, 1)
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go w.trafficMonitor(&anotherWG)
anotherWG.Wait()
err := w.connector()
if err != nil {
return fmt.Errorf("exchange_websocket.go connection error %s",
err)
}
// Divert for incoming websocket traffic
w.Connected <- struct{}{}
w.connected = true
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
}
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go w.trafficMonitor(&anotherWG)
anotherWG.Wait()
return nil
}

View File

@@ -7,8 +7,10 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/config"
"github.com/thrasher-/gocryptotrader/currency"
@@ -56,7 +58,9 @@ const (
// Kraken is the overarching type across the alphapoint package
type Kraken struct {
exchange.Base
WebsocketConn *websocket.Conn
CryptoFee, FiatFee float64
mu sync.Mutex
}
// SetDefaults sets current default settings
@@ -86,6 +90,12 @@ func (k *Kraken) SetDefaults() {
k.APIUrlDefault = krakenAPIURL
k.APIUrl = k.APIUrlDefault
k.WebsocketInit()
k.WebsocketURL = krakenWSURL
k.Websocket.Functionality = exchange.WebsocketTickerSupported |
exchange.WebsocketTradeDataSupported |
exchange.WebsocketKlineSupported |
exchange.WebsocketOrderbookSupported
}
// Setup sets current exchange configuration
@@ -100,6 +110,7 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) {
k.SetHTTPClientUserAgent(exch.HTTPUserAgent)
k.RESTPollingDelay = exch.RESTPollingDelay
k.Verbose = exch.Verbose
k.Websocket.SetWsStatusAndConnection(exch.Websocket)
k.BaseCurrencies = exch.BaseCurrencies
k.AvailablePairs = exch.AvailablePairs
k.EnabledPairs = exch.EnabledPairs
@@ -123,6 +134,14 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) {
if err != nil {
log.Fatal(err)
}
err = k.WebsocketSetup(k.WsConnect,
exch.Name,
exch.Websocket,
krakenWSURL,
exch.WebsocketURL)
if err != nil {
log.Fatal(err)
}
}
}

View File

@@ -18,10 +18,12 @@ const (
canManipulateRealOrders = false
)
// TestSetDefaults setup func
func TestSetDefaults(t *testing.T) {
k.SetDefaults()
}
// TestSetup setup func
func TestSetup(t *testing.T) {
cfg := config.GetConfig()
cfg.LoadConfig("../../testdata/configtest.json")
@@ -33,10 +35,11 @@ func TestSetup(t *testing.T) {
krakenConfig.APIKey = apiKey
krakenConfig.APISecret = apiSecret
krakenConfig.ClientID = clientID
krakenConfig.WebsocketURL = k.WebsocketURL
k.Setup(krakenConfig)
}
// TestGetServerTime API endpoint test
func TestGetServerTime(t *testing.T) {
t.Parallel()
_, err := k.GetServerTime()
@@ -45,6 +48,7 @@ func TestGetServerTime(t *testing.T) {
}
}
// TestGetAssets API endpoint test
func TestGetAssets(t *testing.T) {
t.Parallel()
_, err := k.GetAssets()
@@ -53,6 +57,7 @@ func TestGetAssets(t *testing.T) {
}
}
// TestGetAssetPairs API endpoint test
func TestGetAssetPairs(t *testing.T) {
t.Parallel()
_, err := k.GetAssetPairs()
@@ -61,6 +66,7 @@ func TestGetAssetPairs(t *testing.T) {
}
}
// TestGetTicker API endpoint test
func TestGetTicker(t *testing.T) {
t.Parallel()
_, err := k.GetTicker("BCHEUR")
@@ -69,6 +75,7 @@ func TestGetTicker(t *testing.T) {
}
}
// TestGetTickers API endpoint test
func TestGetTickers(t *testing.T) {
t.Parallel()
_, err := k.GetTickers("LTCUSD,ETCUSD")
@@ -77,6 +84,7 @@ func TestGetTickers(t *testing.T) {
}
}
// TestGetOHLC API endpoint test
func TestGetOHLC(t *testing.T) {
t.Parallel()
_, err := k.GetOHLC("BCHEUR")
@@ -85,6 +93,7 @@ func TestGetOHLC(t *testing.T) {
}
}
// TestGetDepth API endpoint test
func TestGetDepth(t *testing.T) {
t.Parallel()
_, err := k.GetDepth("BCHEUR")
@@ -93,6 +102,7 @@ func TestGetDepth(t *testing.T) {
}
}
// TestGetTrades API endpoint test
func TestGetTrades(t *testing.T) {
t.Parallel()
_, err := k.GetTrades("BCHEUR")
@@ -101,6 +111,7 @@ func TestGetTrades(t *testing.T) {
}
}
// TestGetSpread API endpoint test
func TestGetSpread(t *testing.T) {
t.Parallel()
_, err := k.GetSpread("BCHEUR")
@@ -109,6 +120,7 @@ func TestGetSpread(t *testing.T) {
}
}
// TestGetBalance API endpoint test
func TestGetBalance(t *testing.T) {
t.Parallel()
_, err := k.GetBalance()
@@ -117,6 +129,7 @@ func TestGetBalance(t *testing.T) {
}
}
// TestGetTradeBalance API endpoint test
func TestGetTradeBalance(t *testing.T) {
t.Parallel()
args := TradeBalanceOptions{Asset: "ZEUR"}
@@ -126,6 +139,7 @@ func TestGetTradeBalance(t *testing.T) {
}
}
// TestGetOpenOrders API endpoint test
func TestGetOpenOrders(t *testing.T) {
t.Parallel()
args := OrderInfoOptions{Trades: true}
@@ -135,6 +149,7 @@ func TestGetOpenOrders(t *testing.T) {
}
}
// TestGetClosedOrders API endpoint test
func TestGetClosedOrders(t *testing.T) {
t.Parallel()
args := GetClosedOrdersOptions{Trades: true, Start: "OE4KV4-4FVQ5-V7XGPU"}
@@ -144,6 +159,7 @@ func TestGetClosedOrders(t *testing.T) {
}
}
// TestQueryOrdersInfo API endpoint test
func TestQueryOrdersInfo(t *testing.T) {
t.Parallel()
args := OrderInfoOptions{Trades: true}
@@ -153,6 +169,7 @@ func TestQueryOrdersInfo(t *testing.T) {
}
}
// TestGetTradesHistory API endpoint test
func TestGetTradesHistory(t *testing.T) {
t.Parallel()
args := GetTradesHistoryOptions{Trades: true, Start: "TMZEDR-VBJN2-NGY6DX", End: "TVRXG2-R62VE-RWP3UW"}
@@ -162,6 +179,7 @@ func TestGetTradesHistory(t *testing.T) {
}
}
// TestQueryTrades API endpoint test
func TestQueryTrades(t *testing.T) {
t.Parallel()
_, err := k.QueryTrades(true, "TMZEDR-VBJN2-NGY6DX", "TFLWIB-KTT7L-4TWR3L", "TDVRAH-2H6OS-SLSXRX")
@@ -170,6 +188,7 @@ func TestQueryTrades(t *testing.T) {
}
}
// TestOpenPositions API endpoint test
func TestOpenPositions(t *testing.T) {
t.Parallel()
_, err := k.OpenPositions(false)
@@ -178,6 +197,7 @@ func TestOpenPositions(t *testing.T) {
}
}
// TestGetLedgers API endpoint test
func TestGetLedgers(t *testing.T) {
t.Parallel()
args := GetLedgersOptions{Start: "LRUHXI-IWECY-K4JYGO", End: "L5NIY7-JZQJD-3J4M2V", Ofs: 15}
@@ -187,6 +207,7 @@ func TestGetLedgers(t *testing.T) {
}
}
// TestQueryLedgers API endpoint test
func TestQueryLedgers(t *testing.T) {
t.Parallel()
_, err := k.QueryLedgers("LVTSFS-NHZVM-EXNZ5M")
@@ -195,6 +216,7 @@ func TestQueryLedgers(t *testing.T) {
}
}
// TestGetTradeVolume API endpoint test
func TestGetTradeVolume(t *testing.T) {
t.Parallel()
_, err := k.GetTradeVolume(true, "OAVY7T-MV5VK-KHDF5X")
@@ -203,6 +225,7 @@ func TestGetTradeVolume(t *testing.T) {
}
}
// TestAddOrder API endpoint test
func TestAddOrder(t *testing.T) {
t.Parallel()
args := AddOrderOptions{Oflags: "fcib"}
@@ -212,6 +235,7 @@ func TestAddOrder(t *testing.T) {
}
}
// TestCancelExistingOrder API endpoint test
func TestCancelExistingOrder(t *testing.T) {
t.Parallel()
_, err := k.CancelExistingOrder("OAVY7T-MV5VK-KHDF5X")
@@ -231,6 +255,7 @@ func setFeeBuilder() *exchange.FeeBuilder {
}
}
// TestGetFee logic test
func TestGetFee(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -313,6 +338,7 @@ func TestGetFee(t *testing.T) {
}
}
// TestFormatWithdrawPermissions logic test
func TestFormatWithdrawPermissions(t *testing.T) {
k.SetDefaults()
expectedResult := exchange.AutoWithdrawCryptoWithSetupText + " & " + exchange.WithdrawCryptoWith2FAText + " & " + exchange.AutoWithdrawFiatWithSetupText + " & " + exchange.WithdrawFiatWith2FAText
@@ -324,6 +350,7 @@ func TestFormatWithdrawPermissions(t *testing.T) {
}
}
// TestGetActiveOrders wrapper test
func TestGetActiveOrders(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -340,6 +367,7 @@ func TestGetActiveOrders(t *testing.T) {
}
}
// TestGetOrderHistory wrapper test
func TestGetOrderHistory(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -366,6 +394,7 @@ func areTestAPIKeysSet() bool {
return false
}
// TestSubmitOrder wrapper test
func TestSubmitOrder(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -387,6 +416,7 @@ func TestSubmitOrder(t *testing.T) {
}
}
// TestCancelExchangeOrder wrapper test
func TestCancelExchangeOrder(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -413,6 +443,7 @@ func TestCancelExchangeOrder(t *testing.T) {
}
}
// TestCancelAllExchangeOrders wrapper test
func TestCancelAllExchangeOrders(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -444,6 +475,7 @@ func TestCancelAllExchangeOrders(t *testing.T) {
}
}
// TestGetAccountInfo wrapper test
func TestGetAccountInfo(t *testing.T) {
if apiKey != "" || apiSecret != "" || clientID != "" {
_, err := k.GetAccountInfo()
@@ -458,6 +490,7 @@ func TestGetAccountInfo(t *testing.T) {
}
}
// TestModifyOrder wrapper test
func TestModifyOrder(t *testing.T) {
_, err := k.ModifyOrder(&exchange.ModifyOrder{})
if err == nil {
@@ -465,6 +498,7 @@ func TestModifyOrder(t *testing.T) {
}
}
// TestWithdraw wrapper test
func TestWithdraw(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -489,6 +523,7 @@ func TestWithdraw(t *testing.T) {
}
}
// TestWithdrawFiat wrapper test
func TestWithdrawFiat(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -514,6 +549,7 @@ func TestWithdrawFiat(t *testing.T) {
}
}
// TestWithdrawInternationalBank wrapper test
func TestWithdrawInternationalBank(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -539,6 +575,7 @@ func TestWithdrawInternationalBank(t *testing.T) {
}
}
// TestGetDepositAddress wrapper test
func TestGetDepositAddress(t *testing.T) {
if areTestAPIKeysSet() {
_, err := k.GetDepositAddress(currency.BTC, "")
@@ -553,6 +590,7 @@ func TestGetDepositAddress(t *testing.T) {
}
}
// TestWithdrawStatus wrapper test
func TestWithdrawStatus(t *testing.T) {
k.SetDefaults()
TestSetup(t)
@@ -570,10 +608,10 @@ func TestWithdrawStatus(t *testing.T) {
}
}
// TestWithdrawCancel wrapper test
func TestWithdrawCancel(t *testing.T) {
k.SetDefaults()
TestSetup(t)
_, err := k.WithdrawCancel(currency.BTC, "")
if areTestAPIKeysSet() && err == nil {
t.Error("Test Failed - WithdrawCancel() error cannot be nil")
@@ -581,3 +619,167 @@ func TestWithdrawCancel(t *testing.T) {
t.Errorf("Test Failed - WithdrawCancel() error - expecting an error when no keys are set but received nil")
}
}
// ---------------------------- Websocket tests -----------------------------------------
// TestSubscribeToChannel websocket test
func TestSubscribeToChannel(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
<-k.Websocket.TrafficAlert
err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1)
if err != nil {
t.Error(err)
}
}
// TestSubscribeToNonExistentChannel websocket test
func TestSubscribeToNonExistentChannel(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
err := k.WsSubscribeToChannel("ticker", []string{"pewdiepie"}, 1)
if err != nil {
t.Error(err)
}
subscriptionError := false
for i := 0; i < 7; i++ {
response := <-k.Websocket.DataHandler
if err, ok := response.(error); ok && err != nil {
subscriptionError = true
break
}
}
if !subscriptionError {
t.Error("Expected error")
}
}
// TestSubscribeUnsubscribeToChannel websocket test
func TestSubscribeUnsubscribeToChannel(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1)
if err != nil {
t.Error(err)
}
err = k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 2)
if err != nil {
t.Error(err)
}
}
// TestUnsubscribeWithoutSubscription websocket test
func TestUnsubscribeWithoutSubscription(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
err := k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 3)
if err != nil {
t.Error(err)
}
unsubscriptionError := false
for i := 0; i < 7; i++ {
response := <-k.Websocket.DataHandler
if err, ok := response.(error); ok && err != nil {
if err.Error() == "Subscription Not Found" {
unsubscriptionError = true
break
}
}
}
if !unsubscriptionError {
t.Error("Expected error")
}
}
// TestUnsubscribeWithChannelID websocket test
func TestUnsubscribeWithChannelID(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
err := k.WsUnsubscribeToChannelByChannelID(3)
if err != nil {
t.Error(err)
}
unsubscriptionError := false
for i := 0; i < 7; i++ {
response := <-k.Websocket.DataHandler
if err, ok := response.(error); ok && err != nil {
if err.Error() == "Subscription Not Found" {
unsubscriptionError = true
break
}
}
}
if !unsubscriptionError {
t.Error("Expected error")
}
}
// TestUnsubscribeFromNonExistentChennel websocket test
func TestUnsubscribeFromNonExistentChennel(t *testing.T) {
if k.Name == "" {
k.SetDefaults()
TestSetup(t)
}
if !k.Websocket.IsEnabled() {
t.Skip("Websocket not enabled, skipping")
}
if !k.Websocket.IsConnected() {
k.Websocket.Connect()
}
err := k.WsUnsubscribeToChannel("ticker", []string{"tseries"}, 0)
if err != nil {
t.Error(err)
}
unsubscriptionError := false
for i := 0; i < 7; i++ {
response := <-k.Websocket.DataHandler
if err, ok := response.(error); ok && err != nil {
unsubscriptionError = true
break
}
}
if !unsubscriptionError {
t.Error("Expected error")
}
}

View File

@@ -386,3 +386,63 @@ type WithdrawStatusResponse struct {
Time float64 `json:"time"`
Status string `json:"status"`
}
// WebsocketSubscriptionEventRequest handles WS subscription events
type WebsocketSubscriptionEventRequest struct {
Event string `json:"event"` // subscribe
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
Pairs []string `json:"pair"` // Array of currency pairs (pair1,pair2,pair3).
Subscription WebsocketSubscriptionData `json:"subscription,omitempty"`
}
// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events
type WebsocketUnsubscribeByChannelIDEventRequest struct {
Event string `json:"event"` // unsubscribe
RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message.
Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3).
ChannelID int64 `json:"channelID,omitempty"`
}
// WebsocketSubscriptionData contains details on WS channel
type WebsocketSubscriptionData struct {
Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed)
Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600
Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000
}
// WebsocketDataResponse holds all data response types
type WebsocketEventResponse struct {
Event string `json:"event"`
Status string `json:"status"`
Pair currency.Pair `json:"pair,omitempty"`
Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"`
WebsocketSubscriptionEventResponse
WebsocketStatusResponse
WebsocketErrorResponse
}
type WebsocketSubscriptionEventResponse struct {
ChannelID float64 `json:"channelID"`
}
type WebsocketSubscriptionResponseData struct {
Name string `json:"name"`
}
type WebsocketStatusResponse struct {
ConnectionID float64 `json:"connectionID"`
Version string `json:"version"`
}
type WebsocketDataResponse []interface{}
type WebsocketErrorResponse struct {
ErrorMessage string `json:"errorMessage"`
}
// Holds relevant data for channels to identify what we're doing
type WebsocketChannelData struct {
Subscription string
Pair currency.Pair
ChannelID float64
}

View File

@@ -0,0 +1,660 @@
package kraken
import (
"bytes"
"compress/flate"
"errors"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
log "github.com/thrasher-/gocryptotrader/logger"
)
// List of all websocket channels to subscribe to
const (
krakenWSURL = "wss://ws.kraken.com"
krakenWSSandboxURL = "wss://sandbox.kraken.com"
krakenWSSupportedVersion = "0.1.1"
// If a checksum fails, then resubscribing to the channel fails, fatal after these attempts
krakenWsResubscribeFailureLimit = 3
krakenWsResubscribeDelayInSeconds = 3
// WS endpoints
krakenWsHeartbeat = "heartbeat"
krakenWsPing = "ping"
krakenWsPong = "pong"
krakenWsSystemStatus = "systemStatus"
krakenWsSubscribe = "subscribe"
krakenWsSubscriptionStatus = "subscriptionStatus"
krakenWsUnsubscribe = "unsubscribe"
krakenWsTicker = "ticker"
krakenWsOHLC = "ohlc"
krakenWsTrade = "trade"
krakenWsSpread = "spread"
krakenWsOrderbook = "book"
// Only supported asset type
krakenWsAssetType = "SPOT"
)
// orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time
var orderbookMutex sync.Mutex
var subscriptionChannelPair []WebsocketChannelData
// writeToWebsocket sends a message to the websocket endpoint
func (k *Kraken) writeToWebsocket(message []byte) error {
k.mu.Lock()
defer k.mu.Unlock()
if k.Verbose {
log.Debugf("Sending message to WS: %v", string(message))
}
return k.WebsocketConn.WriteMessage(websocket.TextMessage, message)
}
// WsConnect initiates a websocket connection
func (k *Kraken) WsConnect() error {
if !k.Websocket.IsEnabled() || !k.IsEnabled() {
return errors.New(exchange.WebsocketNotEnabled)
}
var dialer websocket.Dialer
if k.Websocket.GetProxyAddress() != "" {
proxy, err := url.Parse(k.Websocket.GetProxyAddress())
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
var err error
if k.Verbose {
log.Debugf("Attempting to connect to %v", k.Websocket.GetWebsocketURL())
}
k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(),
http.Header{})
if err != nil {
return fmt.Errorf("%s Unable to connect to Websocket. Error: %s",
k.Name,
err)
}
if k.Verbose {
log.Debugf("Successful connection to %v", k.Websocket.GetWebsocketURL())
}
go k.WsHandleData()
go k.wsPingHandler()
k.WsSubscribeToDefaults()
return nil
}
// WsSubscribeToDefaults subscribes to the websocket channels
func (k *Kraken) WsSubscribeToDefaults() {
channelsToSubscribe := []string{krakenWsTicker, krakenWsTrade, krakenWsOrderbook, krakenWsOHLC, krakenWsSpread}
for _, pair := range k.EnabledPairs {
// Kraken WS formats pairs with / but config and REST use -
formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1))
for _, channel := range channelsToSubscribe {
err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0)
if err != nil {
k.Websocket.DataHandler <- err
}
}
}
}
// WsReadData reads data from the websocket connection
func (k *Kraken) WsReadData() (exchange.WebsocketResponse, error) {
mType, resp, err := k.WebsocketConn.ReadMessage()
if err != nil {
return exchange.WebsocketResponse{}, err
}
k.Websocket.TrafficAlert <- struct{}{}
var standardMessage []byte
switch mType {
case websocket.TextMessage:
standardMessage = resp
case websocket.BinaryMessage:
reader := flate.NewReader(bytes.NewReader(resp))
standardMessage, err = ioutil.ReadAll(reader)
reader.Close()
if err != nil {
return exchange.WebsocketResponse{}, err
}
}
if k.Verbose {
log.Debugf("%v Websocket message received: %v", k.Name, string(standardMessage))
}
return exchange.WebsocketResponse{Raw: standardMessage}, nil
}
// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket
func (k *Kraken) wsPingHandler() {
k.Websocket.Wg.Add(1)
defer k.Websocket.Wg.Done()
ticker := time.NewTicker(time.Second * 27)
for {
select {
case <-k.Websocket.ShutdownC:
return
case <-ticker.C:
pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing)
if k.Verbose {
log.Debugf("%v sending ping", k.GetName())
}
err := k.writeToWebsocket([]byte(pingEvent))
if err != nil {
k.Websocket.DataHandler <- err
}
}
}
}
// WsHandleData handles the read data from the websocket connection
func (k *Kraken) WsHandleData() {
k.Websocket.Wg.Add(1)
defer func() {
err := k.WebsocketConn.Close()
if err != nil {
k.Websocket.DataHandler <- fmt.Errorf("%v unable to to close Websocket connection. Error: %s",
k.GetName(), err)
}
k.Websocket.Wg.Done()
}()
for {
select {
case <-k.Websocket.ShutdownC:
return
default:
resp, err := k.WsReadData()
if err != nil {
k.Websocket.DataHandler <- err
return
}
// event response handling
var eventResponse WebsocketEventResponse
err = common.JSONDecode(resp.Raw, &eventResponse)
if err == nil && eventResponse.Event != "" {
k.WsHandleEventResponse(&eventResponse)
continue
}
// Data response handling
var dataResponse WebsocketDataResponse
err = common.JSONDecode(resp.Raw, &dataResponse)
if err == nil && dataResponse[0].(float64) >= 0 {
k.WsHandleDataResponse(dataResponse)
continue
}
// Unknown data handling
k.Websocket.DataHandler <- fmt.Errorf("unrecognised response: %v", string(resp.Raw))
continue
}
}
}
// WsHandleDataResponse classifies the WS response and sends to appropriate handler
func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) {
channelID := response[0].(float64)
channelData := getSubscriptionChannelData(channelID)
switch channelData.Subscription {
case krakenWsTicker:
if k.Verbose {
log.Debugf("%v Websocket ticker data received",
k.GetName())
}
k.wsProcessTickers(channelData, response[1])
case krakenWsOHLC:
if k.Verbose {
log.Debugf("%v Websocket OHLC data received",
k.GetName())
}
k.wsProcessCandles(channelData, response[1])
case krakenWsOrderbook:
if k.Verbose {
log.Debugf("%v Websocket Orderbook data received",
k.GetName())
}
k.wsProcessOrderBook(channelData, response[1])
case krakenWsSpread:
if k.Verbose {
log.Debugf("%v Websocket Spread data received",
k.GetName())
}
k.wsProcessSpread(channelData, response[1])
case krakenWsTrade:
if k.Verbose {
log.Debugf("%v Websocket Trade data received",
k.GetName())
}
k.wsProcessTrades(channelData, response[1])
default:
log.Errorf("%v Unidentified websocket data received: %v",
k.GetName(), response)
}
}
// WsHandleDataResponse classifies the WS response and sends to appropriate handler
func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) {
switch response.Event {
case krakenWsHeartbeat:
if k.Verbose {
log.Debugf("%v Websocket heartbeat data received", k.GetName())
}
case krakenWsPong:
if k.Verbose {
log.Debugf("%v Websocket pong data received", k.GetName())
}
case krakenWsSystemStatus:
if k.Verbose {
log.Debugf("%v Websocket status data received", k.GetName())
}
if response.Status != "online" {
k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'",
k.GetName(), response.Status)
}
if response.WebsocketStatusResponse.Version != krakenWSSupportedVersion {
log.Warnf("%v New version of Websocket API released. Was %v Now %v",
k.GetName(), krakenWSSupportedVersion, response.WebsocketStatusResponse.Version)
}
case krakenWsSubscriptionStatus:
if k.Verbose {
log.Debugf("%v Websocket subscription status data received",
k.GetName())
}
if response.Status != "subscribed" {
k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage)
k.ResubscribeToChannel(response.Subscription.Name, response.Pair)
return
}
addNewSubscriptionChannelData(response)
default:
log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response)
}
}
// WsSubscribeToChannel sends a request to WS to subscribe to supplied channel name and pairs
func (k *Kraken) WsSubscribeToChannel(topic string, currencies []string, requestID int64) error {
resp := WebsocketSubscriptionEventRequest{
Event: krakenWsSubscribe,
Pairs: currencies,
Subscription: WebsocketSubscriptionData{
Name: topic,
},
}
if requestID > 0 {
resp.RequestID = requestID
}
json, err := common.JSONEncode(resp)
if err != nil {
return err
}
return k.writeToWebsocket(json)
}
// WsUnsubscribeToChannel sends a request to WS to unsubscribe to supplied channel name and pairs
func (k *Kraken) WsUnsubscribeToChannel(topic string, currencies []string, requestID int64) error {
resp := WebsocketSubscriptionEventRequest{
Event: krakenWsUnsubscribe,
Pairs: currencies,
Subscription: WebsocketSubscriptionData{
Name: topic,
},
}
if requestID > 0 {
resp.RequestID = requestID
}
json, err := common.JSONEncode(resp)
if err != nil {
return err
}
return k.writeToWebsocket(json)
}
// WsUnsubscribeToChannelByChannelID sends a request to WS to unsubscribe to supplied channel ID
func (k *Kraken) WsUnsubscribeToChannelByChannelID(channelID int64) error {
resp := WebsocketUnsubscribeByChannelIDEventRequest{
Event: krakenWsUnsubscribe,
ChannelID: channelID,
}
json, err := common.JSONEncode(resp)
if err != nil {
return err
}
return k.writeToWebsocket(json)
}
// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array
// allowing correlation between subscriptions and returned data
func addNewSubscriptionChannelData(response *WebsocketEventResponse) {
for i := range subscriptionChannelPair {
if response.ChannelID == subscriptionChannelPair[i].ChannelID {
return
}
}
// We change the / to - to maintain compatibility with REST/config
pair := currency.NewPairWithDelimiter(response.Pair.Base.String(), response.Pair.Quote.String(), "-")
subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{
Subscription: response.Subscription.Name,
Pair: pair,
ChannelID: response.ChannelID,
})
}
// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID
func getSubscriptionChannelData(id float64) WebsocketChannelData {
for i := range subscriptionChannelPair {
if id == subscriptionChannelPair[i].ChannelID {
return subscriptionChannelPair[i]
}
}
return WebsocketChannelData{}
}
// resubscribeToChannel will attempt to unsubscribe and resubscribe to a channel
func (k *Kraken) ResubscribeToChannel(channel string, pair currency.Pair) {
// Kraken WS formats pairs with / but config and REST use -
formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1))
if krakenWsResubscribeFailureLimit > 0 {
var successfulUnsubscribe bool
for i := 0; i < krakenWsResubscribeFailureLimit; i++ {
err := k.WsUnsubscribeToChannel(channel, []string{formattedPair}, 0)
if err != nil {
log.Error(err)
time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second)
continue
}
successfulUnsubscribe = true
break
}
if !successfulUnsubscribe {
log.Fatalf("%v websocket channel %v failed to unsubscribe after %v attempts",
k.GetName(), channel, krakenWsResubscribeFailureLimit)
}
successfulSubscribe := true
for i := 0; i < krakenWsResubscribeFailureLimit; i++ {
err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0)
if err != nil {
log.Error(err)
time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second)
continue
}
successfulSubscribe = true
break
}
if !successfulSubscribe {
log.Fatalf("%v websocket channel %v failed to resubscribe after %v attempts",
k.GetName(), channel, krakenWsResubscribeFailureLimit)
}
} else {
log.Fatalf("%v websocket channel %v cannot resubscribe. Limit: %v",
k.GetName(), channel, krakenWsResubscribeFailureLimit)
}
}
// wsProcessTickers converts ticker data and sends it to the datahandler
func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interface{}) {
tickerData := data.(map[string]interface{})
closeData := tickerData["c"].([]interface{})
openData := tickerData["o"].([]interface{})
lowData := tickerData["l"].([]interface{})
highData := tickerData["h"].([]interface{})
volumeData := tickerData["v"].([]interface{})
closePrice, _ := strconv.ParseFloat(closeData[0].(string), 64)
openPrice, _ := strconv.ParseFloat(openData[0].(string), 64)
highPrice, _ := strconv.ParseFloat(highData[0].(string), 64)
lowPrice, _ := strconv.ParseFloat(lowData[0].(string), 64)
quantity, _ := strconv.ParseFloat(volumeData[0].(string), 64)
k.Websocket.DataHandler <- exchange.TickerData{
Timestamp: time.Now(),
Exchange: k.GetName(),
AssetType: krakenWsAssetType,
Pair: channelData.Pair,
ClosePrice: closePrice,
OpenPrice: openPrice,
HighPrice: highPrice,
LowPrice: lowPrice,
Quantity: quantity,
}
}
// wsProcessTickers converts ticker data and sends it to the datahandler
func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interface{}) {
spreadData := data.([]interface{})
bestBid := spreadData[0].(string)
bestAsk := spreadData[1].(string)
timeData, _ := strconv.ParseFloat(spreadData[2].(string), 64)
sec, dec := math.Modf(timeData)
spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9)))
if k.Verbose {
log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'",
channelData.Pair, bestBid, bestAsk, spreadTimestamp)
}
}
// wsProcessTrades converts trade data and sends it to the datahandler
func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interface{}) {
tradeData := data.([]interface{})
for i := range tradeData {
trade := tradeData[i].([]interface{})
timeData, _ := strconv.ParseInt(trade[2].(string), 10, 64)
timeUnix := time.Unix(timeData, 0)
price, _ := strconv.ParseFloat(trade[0].(string), 64)
amount, _ := strconv.ParseFloat(trade[1].(string), 64)
k.Websocket.DataHandler <- exchange.TradeData{
AssetType: krakenWsAssetType,
CurrencyPair: channelData.Pair,
EventTime: time.Now().Unix(),
Exchange: k.GetName(),
Price: price,
Amount: amount,
Timestamp: timeUnix,
Side: trade[3].(string),
}
}
}
// wsProcessOrderBook determines if the orderbook data is partial or update
// Then sends to appropriate fun
func (k *Kraken) wsProcessOrderBook(channelData WebsocketChannelData, data interface{}) {
obData := data.(map[string]interface{})
if _, ok := obData["as"]; ok {
k.wsProcessOrderBookPartial(channelData, obData)
} else if _, ok := obData["a"]; ok {
k.wsProcessOrderBookUpdate(channelData, obData)
}
}
// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obData map[string]interface{}) {
ob := orderbook.Base{
Pair: channelData.Pair,
AssetType: krakenWsAssetType,
}
// Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry
// Using the highest last update time, we can attempt to respect both within a reasonable degree
var highestLastUpdate time.Time
askData := obData["as"].([]interface{})
for i := range askData {
asks := askData[i].([]interface{})
price, _ := strconv.ParseFloat(asks[0].(string), 64)
amount, _ := strconv.ParseFloat(asks[1].(string), 64)
ob.Asks = append(ob.Asks, orderbook.Item{
Amount: amount,
Price: price,
})
timeData, _ := strconv.ParseFloat(asks[2].(string), 64)
sec, dec := math.Modf(timeData)
askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9)))
if highestLastUpdate.Before(askUpdatedTime) {
highestLastUpdate = askUpdatedTime
}
}
bidData := obData["bs"].([]interface{})
for i := range bidData {
bids := bidData[i].([]interface{})
price, _ := strconv.ParseFloat(bids[0].(string), 64)
amount, _ := strconv.ParseFloat(bids[1].(string), 64)
ob.Bids = append(ob.Bids, orderbook.Item{
Amount: amount,
Price: price,
})
timeData, _ := strconv.ParseFloat(bids[2].(string), 64)
sec, dec := math.Modf(timeData)
bidUpdateTime := time.Unix(int64(sec), int64(dec*(1e9)))
if highestLastUpdate.Before(bidUpdateTime) {
highestLastUpdate = bidUpdateTime
}
}
ob.LastUpdated = highestLastUpdate
err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true)
if err != nil {
k.Websocket.DataHandler <- err
return
}
k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: k.GetName(),
Asset: krakenWsAssetType,
Pair: channelData.Pair,
}
}
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obData map[string]interface{}) {
ob, err := k.GetOrderbookEx(channelData.Pair, krakenWsAssetType)
if err != nil {
k.Websocket.DataHandler <- err
return
}
// Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry
// Using the highest last update time, we can attempt to respect both within a reasonable degree
var highestLastUpdate time.Time
// Ask data is not always sent
if _, ok := obData["a"]; ok {
askData := obData["a"].([]interface{})
for i := range askData {
asks := askData[i].([]interface{})
price, _ := strconv.ParseFloat(asks[0].(string), 64)
amount, _ := strconv.ParseFloat(asks[1].(string), 64)
if amount == 0 {
for j := 0; j < len(ob.Asks); j++ {
if ob.Asks[j].Price == price {
ob.Asks = append(ob.Asks[:j], ob.Asks[j+1:]...)
j--
continue
}
}
}
ob.Asks = append(ob.Asks, orderbook.Item{
Amount: amount,
Price: price,
})
timeData, _ := strconv.ParseFloat(asks[2].(string), 64)
sec, dec := math.Modf(timeData)
askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9)))
if highestLastUpdate.Before(askUpdatedTime) {
highestLastUpdate = askUpdatedTime
}
}
}
// Bid data is not always sent
if _, ok := obData["b"]; ok {
bidData := obData["b"].([]interface{})
for i := range bidData {
bids := bidData[i].([]interface{})
price, _ := strconv.ParseFloat(bids[0].(string), 64)
amount, _ := strconv.ParseFloat(bids[1].(string), 64)
if amount == 0 {
for j := 0; j < len(ob.Bids); j++ {
if ob.Bids[j].Price == price {
ob.Bids = append(ob.Bids[:j], ob.Bids[j+1:]...)
j--
continue
}
}
}
ob.Bids = append(ob.Bids, orderbook.Item{
Amount: amount,
Price: price,
})
timeData, _ := strconv.ParseFloat(bids[2].(string), 64)
sec, dec := math.Modf(timeData)
bidUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9)))
if highestLastUpdate.Before(bidUpdatedTime) {
highestLastUpdate = bidUpdatedTime
}
}
}
if ob.LastUpdated.After(highestLastUpdate) {
log.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", ob.LastUpdated, highestLastUpdate)
k.ResubscribeToChannel(channelData.Subscription, channelData.Pair)
return
}
ob.LastUpdated = highestLastUpdate
err = k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true)
if err != nil {
k.Websocket.DataHandler <- err
return
}
k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: k.GetName(),
Asset: krakenWsAssetType,
Pair: channelData.Pair,
}
}
// wsProcessCandles converts candle data and sends it to the data handler
func (k *Kraken) wsProcessCandles(channelData WebsocketChannelData, data interface{}) {
candleData := data.([]interface{})
startTimeData, _ := strconv.ParseInt(candleData[0].(string), 10, 64)
startTimeUnix := time.Unix(startTimeData, 0)
endTimeData, _ := strconv.ParseInt(candleData[1].(string), 10, 64)
endTimeUnix := time.Unix(endTimeData, 0)
openPrice, _ := strconv.ParseFloat(candleData[2].(string), 64)
highPrice, _ := strconv.ParseFloat(candleData[3].(string), 64)
lowPrice, _ := strconv.ParseFloat(candleData[4].(string), 64)
closePrice, _ := strconv.ParseFloat(candleData[5].(string), 64)
volume, _ := strconv.ParseFloat(candleData[7].(string), 64)
k.Websocket.DataHandler <- exchange.KlineData{
AssetType: krakenWsAssetType,
Pair: channelData.Pair,
Timestamp: time.Now(),
Exchange: k.GetName(),
StartTime: startTimeUnix,
CloseTime: endTimeUnix,
// Candles are sent every 60 seconds
Interval: "60",
HighPrice: highPrice,
LowPrice: lowPrice,
OpenPrice: openPrice,
ClosePrice: closePrice,
Volume: volume,
}
}

View File

@@ -307,7 +307,7 @@ func (k *Kraken) WithdrawFiatFundsToInternationalBank(withdrawRequest *exchange.
// GetWebsocket returns a pointer to the exchange websocket
func (k *Kraken) GetWebsocket() (*exchange.Websocket, error) {
return nil, common.ErrFunctionNotSupported
return k.Websocket, nil
}
// GetFeeByType returns an estimate of fee based on type of transaction

View File

@@ -1662,7 +1662,7 @@ func TestSubscribeToNonExistantChannel(t *testing.T) {
return
}
var errorReceived bool
for i := 0; i < 5; i++ {
for i := 0; i < 7; i++ {
response := <-o.Websocket.DataHandler
if err, ok := response.(error); ok && err != nil {
t.Log(response)

View File

@@ -1135,12 +1135,12 @@ type GetSwapForceLiquidatedOrdersRequest struct {
// GetSwapForceLiquidatedOrdersResponse response data for GetSwapForceLiquidatedOrders
type GetSwapForceLiquidatedOrdersResponse struct {
Loss float64 `json:"loss"`
Size int64 `json:"size"`
Price float64 `json:"price"`
Loss float64 `json:"loss,string"`
Size int64 `json:"size,string"`
Price float64 `json:"price,string"`
CreatedAt string `json:"created_at"`
InstrumentID string `json:"instrument_id"`
Type int64 `json:"type"`
Type int64 `json:"type,string"`
}
// GetSwapOnHoldAmountForOpenOrdersResponse response data for GetSwapOnHoldAmountForOpenOrders

View File

@@ -935,7 +935,7 @@
"name": "Kraken",
"enabled": true,
"verbose": false,
"websocket": false,
"websocket": true,
"useSandbox": false,
"restPollingDelay": 10,
"httpTimeout": 15000000000,