From 107cf76373d861ceb96cb3f09e9d4c43aa19e060 Mon Sep 17 00:00:00 2001 From: Scott Date: Thu, 4 Apr 2019 10:21:44 +1100 Subject: [PATCH] Kraken websocket support (#264) * Initial commit. Adds ticker, candle and trade, subscription support * Adds support for spread and orderbooks * Adds new currency pair delimiter ("/"), Adds dedicated websocket Connected channel handler, Updates Kraken websocket capability definition, Refines websocket tests to connect and disconnect without freezing, separates WebsocketUnsubscribeEventRequest ChannelID into its own struct WebsocketUnsubscribeByChannelIDEventRequest to prevent bad json WS requests, Adds asset type to orderbook, Kraken WS handles connection better * Removes duplicate type, reverts config value * Addresses error returns and changes writeToWebsocket to use byte array. Removes deferred funcs in tests. Increases test listening limit for rare cases * Fixes verbose log. Rearranges WS Connect async ordering. Fixes DATA RACE. Fixes random okex tests. Ensures Kraken WS tests only connect once --- currency/pair.go | 2 +- exchanges/exchange_websocket.go | 18 +- exchanges/kraken/kraken.go | 19 + exchanges/kraken/kraken_test.go | 206 ++++++++- exchanges/kraken/kraken_types.go | 60 +++ exchanges/kraken/kraken_websocket.go | 660 +++++++++++++++++++++++++++ exchanges/kraken/kraken_wrapper.go | 2 +- exchanges/okex/okex_test.go | 2 +- exchanges/okgroup/okgroup_types.go | 8 +- testdata/configtest.json | 2 +- 10 files changed, 960 insertions(+), 19 deletions(-) create mode 100644 exchanges/kraken/kraken_websocket.go diff --git a/currency/pair.go b/currency/pair.go index b9e4df43..0078ced8 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -62,7 +62,7 @@ func NewPairFromIndex(currencyPair, index string) (Pair, error) { // NewPairFromString converts currency string into a new CurrencyPair // with or without delimeter func NewPairFromString(currencyPair string) Pair { - delimiters := []string{"_", "-"} + delimiters := []string{"_", "-", "/"} var delimiter string for _, x := range delimiters { if strings.Contains(currencyPair, x) { diff --git a/exchanges/exchange_websocket.go b/exchanges/exchange_websocket.go index dcb92a1b..57276ad9 100644 --- a/exchanges/exchange_websocket.go +++ b/exchanges/exchange_websocket.go @@ -141,7 +141,6 @@ func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) { select { case <-w.ShutdownC: // Returns on shutdown channel close return - case <-w.TrafficAlert: // Resets timer on traffic if !w.connected { w.Connected <- struct{}{} @@ -194,20 +193,21 @@ func (w *Websocket) Connect() error { w.ShutdownC = make(chan struct{}, 1) - var anotherWG sync.WaitGroup - anotherWG.Add(1) - go w.trafficMonitor(&anotherWG) - anotherWG.Wait() - err := w.connector() if err != nil { return fmt.Errorf("exchange_websocket.go connection error %s", err) } - // Divert for incoming websocket traffic - w.Connected <- struct{}{} - w.connected = true + if !w.connected { + w.Connected <- struct{}{} + w.connected = true + } + + var anotherWG sync.WaitGroup + anotherWG.Add(1) + go w.trafficMonitor(&anotherWG) + anotherWG.Wait() return nil } diff --git a/exchanges/kraken/kraken.go b/exchanges/kraken/kraken.go index 9b2e70c9..93802410 100644 --- a/exchanges/kraken/kraken.go +++ b/exchanges/kraken/kraken.go @@ -7,8 +7,10 @@ import ( "net/url" "strconv" "strings" + "sync" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency" @@ -56,7 +58,9 @@ const ( // Kraken is the overarching type across the alphapoint package type Kraken struct { exchange.Base + WebsocketConn *websocket.Conn CryptoFee, FiatFee float64 + mu sync.Mutex } // SetDefaults sets current default settings @@ -86,6 +90,12 @@ func (k *Kraken) SetDefaults() { k.APIUrlDefault = krakenAPIURL k.APIUrl = k.APIUrlDefault k.WebsocketInit() + k.WebsocketURL = krakenWSURL + k.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketKlineSupported | + exchange.WebsocketOrderbookSupported + } // Setup sets current exchange configuration @@ -100,6 +110,7 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) { k.SetHTTPClientUserAgent(exch.HTTPUserAgent) k.RESTPollingDelay = exch.RESTPollingDelay k.Verbose = exch.Verbose + k.Websocket.SetWsStatusAndConnection(exch.Websocket) k.BaseCurrencies = exch.BaseCurrencies k.AvailablePairs = exch.AvailablePairs k.EnabledPairs = exch.EnabledPairs @@ -123,6 +134,14 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = k.WebsocketSetup(k.WsConnect, + exch.Name, + exch.Websocket, + krakenWSURL, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index da8f5651..3cf48bd6 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -18,10 +18,12 @@ const ( canManipulateRealOrders = false ) +// TestSetDefaults setup func func TestSetDefaults(t *testing.T) { k.SetDefaults() } +// TestSetup setup func func TestSetup(t *testing.T) { cfg := config.GetConfig() cfg.LoadConfig("../../testdata/configtest.json") @@ -33,10 +35,11 @@ func TestSetup(t *testing.T) { krakenConfig.APIKey = apiKey krakenConfig.APISecret = apiSecret krakenConfig.ClientID = clientID - + krakenConfig.WebsocketURL = k.WebsocketURL k.Setup(krakenConfig) } +// TestGetServerTime API endpoint test func TestGetServerTime(t *testing.T) { t.Parallel() _, err := k.GetServerTime() @@ -45,6 +48,7 @@ func TestGetServerTime(t *testing.T) { } } +// TestGetAssets API endpoint test func TestGetAssets(t *testing.T) { t.Parallel() _, err := k.GetAssets() @@ -53,6 +57,7 @@ func TestGetAssets(t *testing.T) { } } +// TestGetAssetPairs API endpoint test func TestGetAssetPairs(t *testing.T) { t.Parallel() _, err := k.GetAssetPairs() @@ -61,6 +66,7 @@ func TestGetAssetPairs(t *testing.T) { } } +// TestGetTicker API endpoint test func TestGetTicker(t *testing.T) { t.Parallel() _, err := k.GetTicker("BCHEUR") @@ -69,6 +75,7 @@ func TestGetTicker(t *testing.T) { } } +// TestGetTickers API endpoint test func TestGetTickers(t *testing.T) { t.Parallel() _, err := k.GetTickers("LTCUSD,ETCUSD") @@ -77,6 +84,7 @@ func TestGetTickers(t *testing.T) { } } +// TestGetOHLC API endpoint test func TestGetOHLC(t *testing.T) { t.Parallel() _, err := k.GetOHLC("BCHEUR") @@ -85,6 +93,7 @@ func TestGetOHLC(t *testing.T) { } } +// TestGetDepth API endpoint test func TestGetDepth(t *testing.T) { t.Parallel() _, err := k.GetDepth("BCHEUR") @@ -93,6 +102,7 @@ func TestGetDepth(t *testing.T) { } } +// TestGetTrades API endpoint test func TestGetTrades(t *testing.T) { t.Parallel() _, err := k.GetTrades("BCHEUR") @@ -101,6 +111,7 @@ func TestGetTrades(t *testing.T) { } } +// TestGetSpread API endpoint test func TestGetSpread(t *testing.T) { t.Parallel() _, err := k.GetSpread("BCHEUR") @@ -109,6 +120,7 @@ func TestGetSpread(t *testing.T) { } } +// TestGetBalance API endpoint test func TestGetBalance(t *testing.T) { t.Parallel() _, err := k.GetBalance() @@ -117,6 +129,7 @@ func TestGetBalance(t *testing.T) { } } +// TestGetTradeBalance API endpoint test func TestGetTradeBalance(t *testing.T) { t.Parallel() args := TradeBalanceOptions{Asset: "ZEUR"} @@ -126,6 +139,7 @@ func TestGetTradeBalance(t *testing.T) { } } +// TestGetOpenOrders API endpoint test func TestGetOpenOrders(t *testing.T) { t.Parallel() args := OrderInfoOptions{Trades: true} @@ -135,6 +149,7 @@ func TestGetOpenOrders(t *testing.T) { } } +// TestGetClosedOrders API endpoint test func TestGetClosedOrders(t *testing.T) { t.Parallel() args := GetClosedOrdersOptions{Trades: true, Start: "OE4KV4-4FVQ5-V7XGPU"} @@ -144,6 +159,7 @@ func TestGetClosedOrders(t *testing.T) { } } +// TestQueryOrdersInfo API endpoint test func TestQueryOrdersInfo(t *testing.T) { t.Parallel() args := OrderInfoOptions{Trades: true} @@ -153,6 +169,7 @@ func TestQueryOrdersInfo(t *testing.T) { } } +// TestGetTradesHistory API endpoint test func TestGetTradesHistory(t *testing.T) { t.Parallel() args := GetTradesHistoryOptions{Trades: true, Start: "TMZEDR-VBJN2-NGY6DX", End: "TVRXG2-R62VE-RWP3UW"} @@ -162,6 +179,7 @@ func TestGetTradesHistory(t *testing.T) { } } +// TestQueryTrades API endpoint test func TestQueryTrades(t *testing.T) { t.Parallel() _, err := k.QueryTrades(true, "TMZEDR-VBJN2-NGY6DX", "TFLWIB-KTT7L-4TWR3L", "TDVRAH-2H6OS-SLSXRX") @@ -170,6 +188,7 @@ func TestQueryTrades(t *testing.T) { } } +// TestOpenPositions API endpoint test func TestOpenPositions(t *testing.T) { t.Parallel() _, err := k.OpenPositions(false) @@ -178,6 +197,7 @@ func TestOpenPositions(t *testing.T) { } } +// TestGetLedgers API endpoint test func TestGetLedgers(t *testing.T) { t.Parallel() args := GetLedgersOptions{Start: "LRUHXI-IWECY-K4JYGO", End: "L5NIY7-JZQJD-3J4M2V", Ofs: 15} @@ -187,6 +207,7 @@ func TestGetLedgers(t *testing.T) { } } +// TestQueryLedgers API endpoint test func TestQueryLedgers(t *testing.T) { t.Parallel() _, err := k.QueryLedgers("LVTSFS-NHZVM-EXNZ5M") @@ -195,6 +216,7 @@ func TestQueryLedgers(t *testing.T) { } } +// TestGetTradeVolume API endpoint test func TestGetTradeVolume(t *testing.T) { t.Parallel() _, err := k.GetTradeVolume(true, "OAVY7T-MV5VK-KHDF5X") @@ -203,6 +225,7 @@ func TestGetTradeVolume(t *testing.T) { } } +// TestAddOrder API endpoint test func TestAddOrder(t *testing.T) { t.Parallel() args := AddOrderOptions{Oflags: "fcib"} @@ -212,6 +235,7 @@ func TestAddOrder(t *testing.T) { } } +// TestCancelExistingOrder API endpoint test func TestCancelExistingOrder(t *testing.T) { t.Parallel() _, err := k.CancelExistingOrder("OAVY7T-MV5VK-KHDF5X") @@ -231,6 +255,7 @@ func setFeeBuilder() *exchange.FeeBuilder { } } +// TestGetFee logic test func TestGetFee(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -313,6 +338,7 @@ func TestGetFee(t *testing.T) { } } +// TestFormatWithdrawPermissions logic test func TestFormatWithdrawPermissions(t *testing.T) { k.SetDefaults() expectedResult := exchange.AutoWithdrawCryptoWithSetupText + " & " + exchange.WithdrawCryptoWith2FAText + " & " + exchange.AutoWithdrawFiatWithSetupText + " & " + exchange.WithdrawFiatWith2FAText @@ -324,6 +350,7 @@ func TestFormatWithdrawPermissions(t *testing.T) { } } +// TestGetActiveOrders wrapper test func TestGetActiveOrders(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -340,6 +367,7 @@ func TestGetActiveOrders(t *testing.T) { } } +// TestGetOrderHistory wrapper test func TestGetOrderHistory(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -366,6 +394,7 @@ func areTestAPIKeysSet() bool { return false } +// TestSubmitOrder wrapper test func TestSubmitOrder(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -387,6 +416,7 @@ func TestSubmitOrder(t *testing.T) { } } +// TestCancelExchangeOrder wrapper test func TestCancelExchangeOrder(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -413,6 +443,7 @@ func TestCancelExchangeOrder(t *testing.T) { } } +// TestCancelAllExchangeOrders wrapper test func TestCancelAllExchangeOrders(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -444,6 +475,7 @@ func TestCancelAllExchangeOrders(t *testing.T) { } } +// TestGetAccountInfo wrapper test func TestGetAccountInfo(t *testing.T) { if apiKey != "" || apiSecret != "" || clientID != "" { _, err := k.GetAccountInfo() @@ -458,6 +490,7 @@ func TestGetAccountInfo(t *testing.T) { } } +// TestModifyOrder wrapper test func TestModifyOrder(t *testing.T) { _, err := k.ModifyOrder(&exchange.ModifyOrder{}) if err == nil { @@ -465,6 +498,7 @@ func TestModifyOrder(t *testing.T) { } } +// TestWithdraw wrapper test func TestWithdraw(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -489,6 +523,7 @@ func TestWithdraw(t *testing.T) { } } +// TestWithdrawFiat wrapper test func TestWithdrawFiat(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -514,6 +549,7 @@ func TestWithdrawFiat(t *testing.T) { } } +// TestWithdrawInternationalBank wrapper test func TestWithdrawInternationalBank(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -539,6 +575,7 @@ func TestWithdrawInternationalBank(t *testing.T) { } } +// TestGetDepositAddress wrapper test func TestGetDepositAddress(t *testing.T) { if areTestAPIKeysSet() { _, err := k.GetDepositAddress(currency.BTC, "") @@ -553,6 +590,7 @@ func TestGetDepositAddress(t *testing.T) { } } +// TestWithdrawStatus wrapper test func TestWithdrawStatus(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -570,10 +608,10 @@ func TestWithdrawStatus(t *testing.T) { } } +// TestWithdrawCancel wrapper test func TestWithdrawCancel(t *testing.T) { k.SetDefaults() TestSetup(t) - _, err := k.WithdrawCancel(currency.BTC, "") if areTestAPIKeysSet() && err == nil { t.Error("Test Failed - WithdrawCancel() error cannot be nil") @@ -581,3 +619,167 @@ func TestWithdrawCancel(t *testing.T) { t.Errorf("Test Failed - WithdrawCancel() error - expecting an error when no keys are set but received nil") } } + +// ---------------------------- Websocket tests ----------------------------------------- + +// TestSubscribeToChannel websocket test +func TestSubscribeToChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + + <-k.Websocket.TrafficAlert + err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + if err != nil { + t.Error(err) + } +} + +// TestSubscribeToNonExistentChannel websocket test +func TestSubscribeToNonExistentChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsSubscribeToChannel("ticker", []string{"pewdiepie"}, 1) + if err != nil { + t.Error(err) + } + subscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + subscriptionError = true + break + } + } + if !subscriptionError { + t.Error("Expected error") + } +} + +// TestSubscribeUnsubscribeToChannel websocket test +func TestSubscribeUnsubscribeToChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + if err != nil { + t.Error(err) + } + err = k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 2) + if err != nil { + t.Error(err) + } +} + +// TestUnsubscribeWithoutSubscription websocket test +func TestUnsubscribeWithoutSubscription(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 3) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + if err.Error() == "Subscription Not Found" { + unsubscriptionError = true + break + } + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} + +// TestUnsubscribeWithChannelID websocket test +func TestUnsubscribeWithChannelID(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannelByChannelID(3) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + if err.Error() == "Subscription Not Found" { + unsubscriptionError = true + break + } + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} + +// TestUnsubscribeFromNonExistentChennel websocket test +func TestUnsubscribeFromNonExistentChennel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannel("ticker", []string{"tseries"}, 0) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + unsubscriptionError = true + break + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 3865a415..f38c7cd1 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -386,3 +386,63 @@ type WithdrawStatusResponse struct { Time float64 `json:"time"` Status string `json:"status"` } + +// WebsocketSubscriptionEventRequest handles WS subscription events +type WebsocketSubscriptionEventRequest struct { + Event string `json:"event"` // subscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + Pairs []string `json:"pair"` // Array of currency pairs (pair1,pair2,pair3). + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` +} + +// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events +type WebsocketUnsubscribeByChannelIDEventRequest struct { + Event string `json:"event"` // unsubscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). + ChannelID int64 `json:"channelID,omitempty"` +} + +// WebsocketSubscriptionData contains details on WS channel +type WebsocketSubscriptionData struct { + Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed) + Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600 + Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 +} + +// WebsocketDataResponse holds all data response types +type WebsocketEventResponse struct { + Event string `json:"event"` + Status string `json:"status"` + Pair currency.Pair `json:"pair,omitempty"` + Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` + WebsocketSubscriptionEventResponse + WebsocketStatusResponse + WebsocketErrorResponse +} + +type WebsocketSubscriptionEventResponse struct { + ChannelID float64 `json:"channelID"` +} + +type WebsocketSubscriptionResponseData struct { + Name string `json:"name"` +} + +type WebsocketStatusResponse struct { + ConnectionID float64 `json:"connectionID"` + Version string `json:"version"` +} + +type WebsocketDataResponse []interface{} + +type WebsocketErrorResponse struct { + ErrorMessage string `json:"errorMessage"` +} + +// Holds relevant data for channels to identify what we're doing +type WebsocketChannelData struct { + Subscription string + Pair currency.Pair + ChannelID float64 +} diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go new file mode 100644 index 00000000..ce03aa24 --- /dev/null +++ b/exchanges/kraken/kraken_websocket.go @@ -0,0 +1,660 @@ +package kraken + +import ( + "bytes" + "compress/flate" + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency" + exchange "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" + log "github.com/thrasher-/gocryptotrader/logger" +) + +// List of all websocket channels to subscribe to +const ( + krakenWSURL = "wss://ws.kraken.com" + krakenWSSandboxURL = "wss://sandbox.kraken.com" + krakenWSSupportedVersion = "0.1.1" + // If a checksum fails, then resubscribing to the channel fails, fatal after these attempts + krakenWsResubscribeFailureLimit = 3 + krakenWsResubscribeDelayInSeconds = 3 + // WS endpoints + krakenWsHeartbeat = "heartbeat" + krakenWsPing = "ping" + krakenWsPong = "pong" + krakenWsSystemStatus = "systemStatus" + krakenWsSubscribe = "subscribe" + krakenWsSubscriptionStatus = "subscriptionStatus" + krakenWsUnsubscribe = "unsubscribe" + krakenWsTicker = "ticker" + krakenWsOHLC = "ohlc" + krakenWsTrade = "trade" + krakenWsSpread = "spread" + krakenWsOrderbook = "book" + // Only supported asset type + krakenWsAssetType = "SPOT" +) + +// orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time +var orderbookMutex sync.Mutex +var subscriptionChannelPair []WebsocketChannelData + +// writeToWebsocket sends a message to the websocket endpoint +func (k *Kraken) writeToWebsocket(message []byte) error { + k.mu.Lock() + defer k.mu.Unlock() + if k.Verbose { + log.Debugf("Sending message to WS: %v", string(message)) + } + return k.WebsocketConn.WriteMessage(websocket.TextMessage, message) +} + +// WsConnect initiates a websocket connection +func (k *Kraken) WsConnect() error { + if !k.Websocket.IsEnabled() || !k.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + if k.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(k.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + var err error + if k.Verbose { + log.Debugf("Attempting to connect to %v", k.Websocket.GetWebsocketURL()) + } + k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(), + http.Header{}) + if err != nil { + return fmt.Errorf("%s Unable to connect to Websocket. Error: %s", + k.Name, + err) + } + if k.Verbose { + log.Debugf("Successful connection to %v", k.Websocket.GetWebsocketURL()) + } + go k.WsHandleData() + go k.wsPingHandler() + k.WsSubscribeToDefaults() + return nil +} + +// WsSubscribeToDefaults subscribes to the websocket channels +func (k *Kraken) WsSubscribeToDefaults() { + channelsToSubscribe := []string{krakenWsTicker, krakenWsTrade, krakenWsOrderbook, krakenWsOHLC, krakenWsSpread} + for _, pair := range k.EnabledPairs { + // Kraken WS formats pairs with / but config and REST use - + formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1)) + for _, channel := range channelsToSubscribe { + err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + k.Websocket.DataHandler <- err + } + } + } +} + +// WsReadData reads data from the websocket connection +func (k *Kraken) WsReadData() (exchange.WebsocketResponse, error) { + mType, resp, err := k.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + k.Websocket.TrafficAlert <- struct{}{} + var standardMessage []byte + switch mType { + case websocket.TextMessage: + standardMessage = resp + + case websocket.BinaryMessage: + reader := flate.NewReader(bytes.NewReader(resp)) + standardMessage, err = ioutil.ReadAll(reader) + reader.Close() + if err != nil { + return exchange.WebsocketResponse{}, err + } + } + if k.Verbose { + log.Debugf("%v Websocket message received: %v", k.Name, string(standardMessage)) + } + + return exchange.WebsocketResponse{Raw: standardMessage}, nil +} + +// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket +func (k *Kraken) wsPingHandler() { + k.Websocket.Wg.Add(1) + defer k.Websocket.Wg.Done() + ticker := time.NewTicker(time.Second * 27) + for { + select { + case <-k.Websocket.ShutdownC: + return + + case <-ticker.C: + pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing) + if k.Verbose { + log.Debugf("%v sending ping", k.GetName()) + } + err := k.writeToWebsocket([]byte(pingEvent)) + if err != nil { + k.Websocket.DataHandler <- err + } + } + } +} + +// WsHandleData handles the read data from the websocket connection +func (k *Kraken) WsHandleData() { + k.Websocket.Wg.Add(1) + defer func() { + err := k.WebsocketConn.Close() + if err != nil { + k.Websocket.DataHandler <- fmt.Errorf("%v unable to to close Websocket connection. Error: %s", + k.GetName(), err) + } + k.Websocket.Wg.Done() + }() + + for { + select { + case <-k.Websocket.ShutdownC: + return + default: + resp, err := k.WsReadData() + if err != nil { + k.Websocket.DataHandler <- err + return + } + // event response handling + var eventResponse WebsocketEventResponse + err = common.JSONDecode(resp.Raw, &eventResponse) + if err == nil && eventResponse.Event != "" { + k.WsHandleEventResponse(&eventResponse) + continue + } + // Data response handling + var dataResponse WebsocketDataResponse + err = common.JSONDecode(resp.Raw, &dataResponse) + if err == nil && dataResponse[0].(float64) >= 0 { + k.WsHandleDataResponse(dataResponse) + continue + } + // Unknown data handling + k.Websocket.DataHandler <- fmt.Errorf("unrecognised response: %v", string(resp.Raw)) + continue + } + } +} + +// WsHandleDataResponse classifies the WS response and sends to appropriate handler +func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { + channelID := response[0].(float64) + channelData := getSubscriptionChannelData(channelID) + switch channelData.Subscription { + case krakenWsTicker: + if k.Verbose { + log.Debugf("%v Websocket ticker data received", + k.GetName()) + } + k.wsProcessTickers(channelData, response[1]) + case krakenWsOHLC: + if k.Verbose { + log.Debugf("%v Websocket OHLC data received", + k.GetName()) + } + k.wsProcessCandles(channelData, response[1]) + case krakenWsOrderbook: + if k.Verbose { + log.Debugf("%v Websocket Orderbook data received", + k.GetName()) + } + k.wsProcessOrderBook(channelData, response[1]) + case krakenWsSpread: + if k.Verbose { + log.Debugf("%v Websocket Spread data received", + k.GetName()) + } + k.wsProcessSpread(channelData, response[1]) + case krakenWsTrade: + if k.Verbose { + log.Debugf("%v Websocket Trade data received", + k.GetName()) + } + k.wsProcessTrades(channelData, response[1]) + default: + log.Errorf("%v Unidentified websocket data received: %v", + k.GetName(), response) + } +} + +// WsHandleDataResponse classifies the WS response and sends to appropriate handler +func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { + switch response.Event { + case krakenWsHeartbeat: + if k.Verbose { + log.Debugf("%v Websocket heartbeat data received", k.GetName()) + } + case krakenWsPong: + if k.Verbose { + log.Debugf("%v Websocket pong data received", k.GetName()) + } + case krakenWsSystemStatus: + if k.Verbose { + log.Debugf("%v Websocket status data received", k.GetName()) + } + if response.Status != "online" { + k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", + k.GetName(), response.Status) + } + if response.WebsocketStatusResponse.Version != krakenWSSupportedVersion { + log.Warnf("%v New version of Websocket API released. Was %v Now %v", + k.GetName(), krakenWSSupportedVersion, response.WebsocketStatusResponse.Version) + } + case krakenWsSubscriptionStatus: + if k.Verbose { + log.Debugf("%v Websocket subscription status data received", + k.GetName()) + } + if response.Status != "subscribed" { + k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) + k.ResubscribeToChannel(response.Subscription.Name, response.Pair) + return + } + addNewSubscriptionChannelData(response) + default: + log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response) + } +} + +// WsSubscribeToChannel sends a request to WS to subscribe to supplied channel name and pairs +func (k *Kraken) WsSubscribeToChannel(topic string, currencies []string, requestID int64) error { + resp := WebsocketSubscriptionEventRequest{ + Event: krakenWsSubscribe, + Pairs: currencies, + Subscription: WebsocketSubscriptionData{ + Name: topic, + }, + } + if requestID > 0 { + resp.RequestID = requestID + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// WsUnsubscribeToChannel sends a request to WS to unsubscribe to supplied channel name and pairs +func (k *Kraken) WsUnsubscribeToChannel(topic string, currencies []string, requestID int64) error { + resp := WebsocketSubscriptionEventRequest{ + Event: krakenWsUnsubscribe, + Pairs: currencies, + Subscription: WebsocketSubscriptionData{ + Name: topic, + }, + } + if requestID > 0 { + resp.RequestID = requestID + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// WsUnsubscribeToChannelByChannelID sends a request to WS to unsubscribe to supplied channel ID +func (k *Kraken) WsUnsubscribeToChannelByChannelID(channelID int64) error { + resp := WebsocketUnsubscribeByChannelIDEventRequest{ + Event: krakenWsUnsubscribe, + ChannelID: channelID, + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array +// allowing correlation between subscriptions and returned data +func addNewSubscriptionChannelData(response *WebsocketEventResponse) { + for i := range subscriptionChannelPair { + if response.ChannelID == subscriptionChannelPair[i].ChannelID { + return + } + } + + // We change the / to - to maintain compatibility with REST/config + pair := currency.NewPairWithDelimiter(response.Pair.Base.String(), response.Pair.Quote.String(), "-") + subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{ + Subscription: response.Subscription.Name, + Pair: pair, + ChannelID: response.ChannelID, + }) +} + +// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID +func getSubscriptionChannelData(id float64) WebsocketChannelData { + for i := range subscriptionChannelPair { + if id == subscriptionChannelPair[i].ChannelID { + return subscriptionChannelPair[i] + } + } + return WebsocketChannelData{} +} + +// resubscribeToChannel will attempt to unsubscribe and resubscribe to a channel +func (k *Kraken) ResubscribeToChannel(channel string, pair currency.Pair) { + // Kraken WS formats pairs with / but config and REST use - + formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1)) + if krakenWsResubscribeFailureLimit > 0 { + var successfulUnsubscribe bool + for i := 0; i < krakenWsResubscribeFailureLimit; i++ { + err := k.WsUnsubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + log.Error(err) + time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second) + continue + } + successfulUnsubscribe = true + break + } + if !successfulUnsubscribe { + log.Fatalf("%v websocket channel %v failed to unsubscribe after %v attempts", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } + successfulSubscribe := true + for i := 0; i < krakenWsResubscribeFailureLimit; i++ { + err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + log.Error(err) + time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second) + continue + } + successfulSubscribe = true + break + } + if !successfulSubscribe { + log.Fatalf("%v websocket channel %v failed to resubscribe after %v attempts", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } + } else { + log.Fatalf("%v websocket channel %v cannot resubscribe. Limit: %v", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } +} + +// wsProcessTickers converts ticker data and sends it to the datahandler +func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interface{}) { + tickerData := data.(map[string]interface{}) + closeData := tickerData["c"].([]interface{}) + openData := tickerData["o"].([]interface{}) + lowData := tickerData["l"].([]interface{}) + highData := tickerData["h"].([]interface{}) + volumeData := tickerData["v"].([]interface{}) + closePrice, _ := strconv.ParseFloat(closeData[0].(string), 64) + openPrice, _ := strconv.ParseFloat(openData[0].(string), 64) + highPrice, _ := strconv.ParseFloat(highData[0].(string), 64) + lowPrice, _ := strconv.ParseFloat(lowData[0].(string), 64) + quantity, _ := strconv.ParseFloat(volumeData[0].(string), 64) + + k.Websocket.DataHandler <- exchange.TickerData{ + Timestamp: time.Now(), + Exchange: k.GetName(), + AssetType: krakenWsAssetType, + Pair: channelData.Pair, + ClosePrice: closePrice, + OpenPrice: openPrice, + HighPrice: highPrice, + LowPrice: lowPrice, + Quantity: quantity, + } +} + +// wsProcessTickers converts ticker data and sends it to the datahandler +func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interface{}) { + spreadData := data.([]interface{}) + bestBid := spreadData[0].(string) + bestAsk := spreadData[1].(string) + timeData, _ := strconv.ParseFloat(spreadData[2].(string), 64) + sec, dec := math.Modf(timeData) + spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9))) + if k.Verbose { + log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'", + channelData.Pair, bestBid, bestAsk, spreadTimestamp) + } +} + +// wsProcessTrades converts trade data and sends it to the datahandler +func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interface{}) { + tradeData := data.([]interface{}) + for i := range tradeData { + trade := tradeData[i].([]interface{}) + timeData, _ := strconv.ParseInt(trade[2].(string), 10, 64) + timeUnix := time.Unix(timeData, 0) + price, _ := strconv.ParseFloat(trade[0].(string), 64) + amount, _ := strconv.ParseFloat(trade[1].(string), 64) + + k.Websocket.DataHandler <- exchange.TradeData{ + AssetType: krakenWsAssetType, + CurrencyPair: channelData.Pair, + EventTime: time.Now().Unix(), + Exchange: k.GetName(), + Price: price, + Amount: amount, + Timestamp: timeUnix, + Side: trade[3].(string), + } + } +} + +// wsProcessOrderBook determines if the orderbook data is partial or update +// Then sends to appropriate fun +func (k *Kraken) wsProcessOrderBook(channelData WebsocketChannelData, data interface{}) { + obData := data.(map[string]interface{}) + if _, ok := obData["as"]; ok { + k.wsProcessOrderBookPartial(channelData, obData) + } else if _, ok := obData["a"]; ok { + k.wsProcessOrderBookUpdate(channelData, obData) + } +} + +// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obData map[string]interface{}) { + ob := orderbook.Base{ + Pair: channelData.Pair, + AssetType: krakenWsAssetType, + } + // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry + // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time + askData := obData["as"].([]interface{}) + for i := range askData { + asks := askData[i].([]interface{}) + price, _ := strconv.ParseFloat(asks[0].(string), 64) + amount, _ := strconv.ParseFloat(asks[1].(string), 64) + ob.Asks = append(ob.Asks, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(asks[2].(string), 64) + sec, dec := math.Modf(timeData) + askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(askUpdatedTime) { + highestLastUpdate = askUpdatedTime + } + } + + bidData := obData["bs"].([]interface{}) + for i := range bidData { + bids := bidData[i].([]interface{}) + price, _ := strconv.ParseFloat(bids[0].(string), 64) + amount, _ := strconv.ParseFloat(bids[1].(string), 64) + ob.Bids = append(ob.Bids, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(bids[2].(string), 64) + sec, dec := math.Modf(timeData) + bidUpdateTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(bidUpdateTime) { + highestLastUpdate = bidUpdateTime + } + } + + ob.LastUpdated = highestLastUpdate + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if err != nil { + k.Websocket.DataHandler <- err + return + } + + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Exchange: k.GetName(), + Asset: krakenWsAssetType, + Pair: channelData.Pair, + } +} + +// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obData map[string]interface{}) { + ob, err := k.GetOrderbookEx(channelData.Pair, krakenWsAssetType) + if err != nil { + k.Websocket.DataHandler <- err + return + } + // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry + // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time + // Ask data is not always sent + if _, ok := obData["a"]; ok { + askData := obData["a"].([]interface{}) + for i := range askData { + asks := askData[i].([]interface{}) + price, _ := strconv.ParseFloat(asks[0].(string), 64) + amount, _ := strconv.ParseFloat(asks[1].(string), 64) + + if amount == 0 { + for j := 0; j < len(ob.Asks); j++ { + if ob.Asks[j].Price == price { + ob.Asks = append(ob.Asks[:j], ob.Asks[j+1:]...) + j-- + continue + } + } + } + ob.Asks = append(ob.Asks, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(asks[2].(string), 64) + sec, dec := math.Modf(timeData) + askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(askUpdatedTime) { + highestLastUpdate = askUpdatedTime + } + } + } + // Bid data is not always sent + if _, ok := obData["b"]; ok { + bidData := obData["b"].([]interface{}) + for i := range bidData { + bids := bidData[i].([]interface{}) + price, _ := strconv.ParseFloat(bids[0].(string), 64) + amount, _ := strconv.ParseFloat(bids[1].(string), 64) + + if amount == 0 { + for j := 0; j < len(ob.Bids); j++ { + if ob.Bids[j].Price == price { + ob.Bids = append(ob.Bids[:j], ob.Bids[j+1:]...) + j-- + continue + } + } + } + + ob.Bids = append(ob.Bids, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(bids[2].(string), 64) + sec, dec := math.Modf(timeData) + bidUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(bidUpdatedTime) { + highestLastUpdate = bidUpdatedTime + } + } + } + + if ob.LastUpdated.After(highestLastUpdate) { + log.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", ob.LastUpdated, highestLastUpdate) + k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) + return + } + ob.LastUpdated = highestLastUpdate + err = k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if err != nil { + k.Websocket.DataHandler <- err + return + } + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Exchange: k.GetName(), + Asset: krakenWsAssetType, + Pair: channelData.Pair, + } +} + +// wsProcessCandles converts candle data and sends it to the data handler +func (k *Kraken) wsProcessCandles(channelData WebsocketChannelData, data interface{}) { + candleData := data.([]interface{}) + startTimeData, _ := strconv.ParseInt(candleData[0].(string), 10, 64) + startTimeUnix := time.Unix(startTimeData, 0) + endTimeData, _ := strconv.ParseInt(candleData[1].(string), 10, 64) + endTimeUnix := time.Unix(endTimeData, 0) + openPrice, _ := strconv.ParseFloat(candleData[2].(string), 64) + highPrice, _ := strconv.ParseFloat(candleData[3].(string), 64) + lowPrice, _ := strconv.ParseFloat(candleData[4].(string), 64) + closePrice, _ := strconv.ParseFloat(candleData[5].(string), 64) + volume, _ := strconv.ParseFloat(candleData[7].(string), 64) + + k.Websocket.DataHandler <- exchange.KlineData{ + AssetType: krakenWsAssetType, + Pair: channelData.Pair, + Timestamp: time.Now(), + Exchange: k.GetName(), + StartTime: startTimeUnix, + CloseTime: endTimeUnix, + // Candles are sent every 60 seconds + Interval: "60", + HighPrice: highPrice, + LowPrice: lowPrice, + OpenPrice: openPrice, + ClosePrice: closePrice, + Volume: volume, + } +} diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 1635a899..7749292f 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -307,7 +307,7 @@ func (k *Kraken) WithdrawFiatFundsToInternationalBank(withdrawRequest *exchange. // GetWebsocket returns a pointer to the exchange websocket func (k *Kraken) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrFunctionNotSupported + return k.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/okex/okex_test.go b/exchanges/okex/okex_test.go index bb4a42bc..343a6b49 100644 --- a/exchanges/okex/okex_test.go +++ b/exchanges/okex/okex_test.go @@ -1662,7 +1662,7 @@ func TestSubscribeToNonExistantChannel(t *testing.T) { return } var errorReceived bool - for i := 0; i < 5; i++ { + for i := 0; i < 7; i++ { response := <-o.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { t.Log(response) diff --git a/exchanges/okgroup/okgroup_types.go b/exchanges/okgroup/okgroup_types.go index 53ca27bf..2811e3fe 100644 --- a/exchanges/okgroup/okgroup_types.go +++ b/exchanges/okgroup/okgroup_types.go @@ -1135,12 +1135,12 @@ type GetSwapForceLiquidatedOrdersRequest struct { // GetSwapForceLiquidatedOrdersResponse response data for GetSwapForceLiquidatedOrders type GetSwapForceLiquidatedOrdersResponse struct { - Loss float64 `json:"loss"` - Size int64 `json:"size"` - Price float64 `json:"price"` + Loss float64 `json:"loss,string"` + Size int64 `json:"size,string"` + Price float64 `json:"price,string"` CreatedAt string `json:"created_at"` InstrumentID string `json:"instrument_id"` - Type int64 `json:"type"` + Type int64 `json:"type,string"` } // GetSwapOnHoldAmountForOpenOrdersResponse response data for GetSwapOnHoldAmountForOpenOrders diff --git a/testdata/configtest.json b/testdata/configtest.json index 9ead24e4..67a737bf 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -935,7 +935,7 @@ "name": "Kraken", "enabled": true, "verbose": false, - "websocket": false, + "websocket": true, "useSandbox": false, "restPollingDelay": 10, "httpTimeout": 15000000000,