diff --git a/currency/pair.go b/currency/pair.go index b9e4df43..0078ced8 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -62,7 +62,7 @@ func NewPairFromIndex(currencyPair, index string) (Pair, error) { // NewPairFromString converts currency string into a new CurrencyPair // with or without delimeter func NewPairFromString(currencyPair string) Pair { - delimiters := []string{"_", "-"} + delimiters := []string{"_", "-", "/"} var delimiter string for _, x := range delimiters { if strings.Contains(currencyPair, x) { diff --git a/exchanges/exchange_websocket.go b/exchanges/exchange_websocket.go index dcb92a1b..57276ad9 100644 --- a/exchanges/exchange_websocket.go +++ b/exchanges/exchange_websocket.go @@ -141,7 +141,6 @@ func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) { select { case <-w.ShutdownC: // Returns on shutdown channel close return - case <-w.TrafficAlert: // Resets timer on traffic if !w.connected { w.Connected <- struct{}{} @@ -194,20 +193,21 @@ func (w *Websocket) Connect() error { w.ShutdownC = make(chan struct{}, 1) - var anotherWG sync.WaitGroup - anotherWG.Add(1) - go w.trafficMonitor(&anotherWG) - anotherWG.Wait() - err := w.connector() if err != nil { return fmt.Errorf("exchange_websocket.go connection error %s", err) } - // Divert for incoming websocket traffic - w.Connected <- struct{}{} - w.connected = true + if !w.connected { + w.Connected <- struct{}{} + w.connected = true + } + + var anotherWG sync.WaitGroup + anotherWG.Add(1) + go w.trafficMonitor(&anotherWG) + anotherWG.Wait() return nil } diff --git a/exchanges/kraken/kraken.go b/exchanges/kraken/kraken.go index 9b2e70c9..93802410 100644 --- a/exchanges/kraken/kraken.go +++ b/exchanges/kraken/kraken.go @@ -7,8 +7,10 @@ import ( "net/url" "strconv" "strings" + "sync" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency" @@ -56,7 +58,9 @@ const ( // Kraken is the overarching type across the alphapoint package type Kraken struct { exchange.Base + WebsocketConn *websocket.Conn CryptoFee, FiatFee float64 + mu sync.Mutex } // SetDefaults sets current default settings @@ -86,6 +90,12 @@ func (k *Kraken) SetDefaults() { k.APIUrlDefault = krakenAPIURL k.APIUrl = k.APIUrlDefault k.WebsocketInit() + k.WebsocketURL = krakenWSURL + k.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketKlineSupported | + exchange.WebsocketOrderbookSupported + } // Setup sets current exchange configuration @@ -100,6 +110,7 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) { k.SetHTTPClientUserAgent(exch.HTTPUserAgent) k.RESTPollingDelay = exch.RESTPollingDelay k.Verbose = exch.Verbose + k.Websocket.SetWsStatusAndConnection(exch.Websocket) k.BaseCurrencies = exch.BaseCurrencies k.AvailablePairs = exch.AvailablePairs k.EnabledPairs = exch.EnabledPairs @@ -123,6 +134,14 @@ func (k *Kraken) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = k.WebsocketSetup(k.WsConnect, + exch.Name, + exch.Websocket, + krakenWSURL, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index da8f5651..3cf48bd6 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -18,10 +18,12 @@ const ( canManipulateRealOrders = false ) +// TestSetDefaults setup func func TestSetDefaults(t *testing.T) { k.SetDefaults() } +// TestSetup setup func func TestSetup(t *testing.T) { cfg := config.GetConfig() cfg.LoadConfig("../../testdata/configtest.json") @@ -33,10 +35,11 @@ func TestSetup(t *testing.T) { krakenConfig.APIKey = apiKey krakenConfig.APISecret = apiSecret krakenConfig.ClientID = clientID - + krakenConfig.WebsocketURL = k.WebsocketURL k.Setup(krakenConfig) } +// TestGetServerTime API endpoint test func TestGetServerTime(t *testing.T) { t.Parallel() _, err := k.GetServerTime() @@ -45,6 +48,7 @@ func TestGetServerTime(t *testing.T) { } } +// TestGetAssets API endpoint test func TestGetAssets(t *testing.T) { t.Parallel() _, err := k.GetAssets() @@ -53,6 +57,7 @@ func TestGetAssets(t *testing.T) { } } +// TestGetAssetPairs API endpoint test func TestGetAssetPairs(t *testing.T) { t.Parallel() _, err := k.GetAssetPairs() @@ -61,6 +66,7 @@ func TestGetAssetPairs(t *testing.T) { } } +// TestGetTicker API endpoint test func TestGetTicker(t *testing.T) { t.Parallel() _, err := k.GetTicker("BCHEUR") @@ -69,6 +75,7 @@ func TestGetTicker(t *testing.T) { } } +// TestGetTickers API endpoint test func TestGetTickers(t *testing.T) { t.Parallel() _, err := k.GetTickers("LTCUSD,ETCUSD") @@ -77,6 +84,7 @@ func TestGetTickers(t *testing.T) { } } +// TestGetOHLC API endpoint test func TestGetOHLC(t *testing.T) { t.Parallel() _, err := k.GetOHLC("BCHEUR") @@ -85,6 +93,7 @@ func TestGetOHLC(t *testing.T) { } } +// TestGetDepth API endpoint test func TestGetDepth(t *testing.T) { t.Parallel() _, err := k.GetDepth("BCHEUR") @@ -93,6 +102,7 @@ func TestGetDepth(t *testing.T) { } } +// TestGetTrades API endpoint test func TestGetTrades(t *testing.T) { t.Parallel() _, err := k.GetTrades("BCHEUR") @@ -101,6 +111,7 @@ func TestGetTrades(t *testing.T) { } } +// TestGetSpread API endpoint test func TestGetSpread(t *testing.T) { t.Parallel() _, err := k.GetSpread("BCHEUR") @@ -109,6 +120,7 @@ func TestGetSpread(t *testing.T) { } } +// TestGetBalance API endpoint test func TestGetBalance(t *testing.T) { t.Parallel() _, err := k.GetBalance() @@ -117,6 +129,7 @@ func TestGetBalance(t *testing.T) { } } +// TestGetTradeBalance API endpoint test func TestGetTradeBalance(t *testing.T) { t.Parallel() args := TradeBalanceOptions{Asset: "ZEUR"} @@ -126,6 +139,7 @@ func TestGetTradeBalance(t *testing.T) { } } +// TestGetOpenOrders API endpoint test func TestGetOpenOrders(t *testing.T) { t.Parallel() args := OrderInfoOptions{Trades: true} @@ -135,6 +149,7 @@ func TestGetOpenOrders(t *testing.T) { } } +// TestGetClosedOrders API endpoint test func TestGetClosedOrders(t *testing.T) { t.Parallel() args := GetClosedOrdersOptions{Trades: true, Start: "OE4KV4-4FVQ5-V7XGPU"} @@ -144,6 +159,7 @@ func TestGetClosedOrders(t *testing.T) { } } +// TestQueryOrdersInfo API endpoint test func TestQueryOrdersInfo(t *testing.T) { t.Parallel() args := OrderInfoOptions{Trades: true} @@ -153,6 +169,7 @@ func TestQueryOrdersInfo(t *testing.T) { } } +// TestGetTradesHistory API endpoint test func TestGetTradesHistory(t *testing.T) { t.Parallel() args := GetTradesHistoryOptions{Trades: true, Start: "TMZEDR-VBJN2-NGY6DX", End: "TVRXG2-R62VE-RWP3UW"} @@ -162,6 +179,7 @@ func TestGetTradesHistory(t *testing.T) { } } +// TestQueryTrades API endpoint test func TestQueryTrades(t *testing.T) { t.Parallel() _, err := k.QueryTrades(true, "TMZEDR-VBJN2-NGY6DX", "TFLWIB-KTT7L-4TWR3L", "TDVRAH-2H6OS-SLSXRX") @@ -170,6 +188,7 @@ func TestQueryTrades(t *testing.T) { } } +// TestOpenPositions API endpoint test func TestOpenPositions(t *testing.T) { t.Parallel() _, err := k.OpenPositions(false) @@ -178,6 +197,7 @@ func TestOpenPositions(t *testing.T) { } } +// TestGetLedgers API endpoint test func TestGetLedgers(t *testing.T) { t.Parallel() args := GetLedgersOptions{Start: "LRUHXI-IWECY-K4JYGO", End: "L5NIY7-JZQJD-3J4M2V", Ofs: 15} @@ -187,6 +207,7 @@ func TestGetLedgers(t *testing.T) { } } +// TestQueryLedgers API endpoint test func TestQueryLedgers(t *testing.T) { t.Parallel() _, err := k.QueryLedgers("LVTSFS-NHZVM-EXNZ5M") @@ -195,6 +216,7 @@ func TestQueryLedgers(t *testing.T) { } } +// TestGetTradeVolume API endpoint test func TestGetTradeVolume(t *testing.T) { t.Parallel() _, err := k.GetTradeVolume(true, "OAVY7T-MV5VK-KHDF5X") @@ -203,6 +225,7 @@ func TestGetTradeVolume(t *testing.T) { } } +// TestAddOrder API endpoint test func TestAddOrder(t *testing.T) { t.Parallel() args := AddOrderOptions{Oflags: "fcib"} @@ -212,6 +235,7 @@ func TestAddOrder(t *testing.T) { } } +// TestCancelExistingOrder API endpoint test func TestCancelExistingOrder(t *testing.T) { t.Parallel() _, err := k.CancelExistingOrder("OAVY7T-MV5VK-KHDF5X") @@ -231,6 +255,7 @@ func setFeeBuilder() *exchange.FeeBuilder { } } +// TestGetFee logic test func TestGetFee(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -313,6 +338,7 @@ func TestGetFee(t *testing.T) { } } +// TestFormatWithdrawPermissions logic test func TestFormatWithdrawPermissions(t *testing.T) { k.SetDefaults() expectedResult := exchange.AutoWithdrawCryptoWithSetupText + " & " + exchange.WithdrawCryptoWith2FAText + " & " + exchange.AutoWithdrawFiatWithSetupText + " & " + exchange.WithdrawFiatWith2FAText @@ -324,6 +350,7 @@ func TestFormatWithdrawPermissions(t *testing.T) { } } +// TestGetActiveOrders wrapper test func TestGetActiveOrders(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -340,6 +367,7 @@ func TestGetActiveOrders(t *testing.T) { } } +// TestGetOrderHistory wrapper test func TestGetOrderHistory(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -366,6 +394,7 @@ func areTestAPIKeysSet() bool { return false } +// TestSubmitOrder wrapper test func TestSubmitOrder(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -387,6 +416,7 @@ func TestSubmitOrder(t *testing.T) { } } +// TestCancelExchangeOrder wrapper test func TestCancelExchangeOrder(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -413,6 +443,7 @@ func TestCancelExchangeOrder(t *testing.T) { } } +// TestCancelAllExchangeOrders wrapper test func TestCancelAllExchangeOrders(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -444,6 +475,7 @@ func TestCancelAllExchangeOrders(t *testing.T) { } } +// TestGetAccountInfo wrapper test func TestGetAccountInfo(t *testing.T) { if apiKey != "" || apiSecret != "" || clientID != "" { _, err := k.GetAccountInfo() @@ -458,6 +490,7 @@ func TestGetAccountInfo(t *testing.T) { } } +// TestModifyOrder wrapper test func TestModifyOrder(t *testing.T) { _, err := k.ModifyOrder(&exchange.ModifyOrder{}) if err == nil { @@ -465,6 +498,7 @@ func TestModifyOrder(t *testing.T) { } } +// TestWithdraw wrapper test func TestWithdraw(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -489,6 +523,7 @@ func TestWithdraw(t *testing.T) { } } +// TestWithdrawFiat wrapper test func TestWithdrawFiat(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -514,6 +549,7 @@ func TestWithdrawFiat(t *testing.T) { } } +// TestWithdrawInternationalBank wrapper test func TestWithdrawInternationalBank(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -539,6 +575,7 @@ func TestWithdrawInternationalBank(t *testing.T) { } } +// TestGetDepositAddress wrapper test func TestGetDepositAddress(t *testing.T) { if areTestAPIKeysSet() { _, err := k.GetDepositAddress(currency.BTC, "") @@ -553,6 +590,7 @@ func TestGetDepositAddress(t *testing.T) { } } +// TestWithdrawStatus wrapper test func TestWithdrawStatus(t *testing.T) { k.SetDefaults() TestSetup(t) @@ -570,10 +608,10 @@ func TestWithdrawStatus(t *testing.T) { } } +// TestWithdrawCancel wrapper test func TestWithdrawCancel(t *testing.T) { k.SetDefaults() TestSetup(t) - _, err := k.WithdrawCancel(currency.BTC, "") if areTestAPIKeysSet() && err == nil { t.Error("Test Failed - WithdrawCancel() error cannot be nil") @@ -581,3 +619,167 @@ func TestWithdrawCancel(t *testing.T) { t.Errorf("Test Failed - WithdrawCancel() error - expecting an error when no keys are set but received nil") } } + +// ---------------------------- Websocket tests ----------------------------------------- + +// TestSubscribeToChannel websocket test +func TestSubscribeToChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + + <-k.Websocket.TrafficAlert + err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + if err != nil { + t.Error(err) + } +} + +// TestSubscribeToNonExistentChannel websocket test +func TestSubscribeToNonExistentChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsSubscribeToChannel("ticker", []string{"pewdiepie"}, 1) + if err != nil { + t.Error(err) + } + subscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + subscriptionError = true + break + } + } + if !subscriptionError { + t.Error("Expected error") + } +} + +// TestSubscribeUnsubscribeToChannel websocket test +func TestSubscribeUnsubscribeToChannel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsSubscribeToChannel("ticker", []string{"XBT/USD"}, 1) + if err != nil { + t.Error(err) + } + err = k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 2) + if err != nil { + t.Error(err) + } +} + +// TestUnsubscribeWithoutSubscription websocket test +func TestUnsubscribeWithoutSubscription(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannel("ticker", []string{"XBT/USD"}, 3) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + if err.Error() == "Subscription Not Found" { + unsubscriptionError = true + break + } + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} + +// TestUnsubscribeWithChannelID websocket test +func TestUnsubscribeWithChannelID(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannelByChannelID(3) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + if err.Error() == "Subscription Not Found" { + unsubscriptionError = true + break + } + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} + +// TestUnsubscribeFromNonExistentChennel websocket test +func TestUnsubscribeFromNonExistentChennel(t *testing.T) { + if k.Name == "" { + k.SetDefaults() + TestSetup(t) + } + if !k.Websocket.IsEnabled() { + t.Skip("Websocket not enabled, skipping") + } + if !k.Websocket.IsConnected() { + k.Websocket.Connect() + } + err := k.WsUnsubscribeToChannel("ticker", []string{"tseries"}, 0) + if err != nil { + t.Error(err) + } + unsubscriptionError := false + for i := 0; i < 7; i++ { + response := <-k.Websocket.DataHandler + if err, ok := response.(error); ok && err != nil { + unsubscriptionError = true + break + } + } + if !unsubscriptionError { + t.Error("Expected error") + } +} diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index 3865a415..f38c7cd1 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -386,3 +386,63 @@ type WithdrawStatusResponse struct { Time float64 `json:"time"` Status string `json:"status"` } + +// WebsocketSubscriptionEventRequest handles WS subscription events +type WebsocketSubscriptionEventRequest struct { + Event string `json:"event"` // subscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + Pairs []string `json:"pair"` // Array of currency pairs (pair1,pair2,pair3). + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` +} + +// WebsocketUnsubscribeByChannelIDEventRequest handles WS unsubscribe events +type WebsocketUnsubscribeByChannelIDEventRequest struct { + Event string `json:"event"` // unsubscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). + ChannelID int64 `json:"channelID,omitempty"` +} + +// WebsocketSubscriptionData contains details on WS channel +type WebsocketSubscriptionData struct { + Name string `json:"name,omitempty"` // ticker|ohlc|trade|book|spread|*, * for all (ohlc interval value is 1 if all channels subscribed) + Interval int64 `json:"interval,omitempty"` // Optional - Time interval associated with ohlc subscription in minutes. Default 1. Valid Interval values: 1|5|15|30|60|240|1440|10080|21600 + Depth int64 `json:"depth,omitempty"` // Optional - depth associated with book subscription in number of levels each side, default 10. Valid Options are: 10, 25, 100, 500, 1000 +} + +// WebsocketDataResponse holds all data response types +type WebsocketEventResponse struct { + Event string `json:"event"` + Status string `json:"status"` + Pair currency.Pair `json:"pair,omitempty"` + Subscription WebsocketSubscriptionResponseData `json:"subscription,omitempty"` + WebsocketSubscriptionEventResponse + WebsocketStatusResponse + WebsocketErrorResponse +} + +type WebsocketSubscriptionEventResponse struct { + ChannelID float64 `json:"channelID"` +} + +type WebsocketSubscriptionResponseData struct { + Name string `json:"name"` +} + +type WebsocketStatusResponse struct { + ConnectionID float64 `json:"connectionID"` + Version string `json:"version"` +} + +type WebsocketDataResponse []interface{} + +type WebsocketErrorResponse struct { + ErrorMessage string `json:"errorMessage"` +} + +// Holds relevant data for channels to identify what we're doing +type WebsocketChannelData struct { + Subscription string + Pair currency.Pair + ChannelID float64 +} diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go new file mode 100644 index 00000000..ce03aa24 --- /dev/null +++ b/exchanges/kraken/kraken_websocket.go @@ -0,0 +1,660 @@ +package kraken + +import ( + "bytes" + "compress/flate" + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency" + exchange "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" + log "github.com/thrasher-/gocryptotrader/logger" +) + +// List of all websocket channels to subscribe to +const ( + krakenWSURL = "wss://ws.kraken.com" + krakenWSSandboxURL = "wss://sandbox.kraken.com" + krakenWSSupportedVersion = "0.1.1" + // If a checksum fails, then resubscribing to the channel fails, fatal after these attempts + krakenWsResubscribeFailureLimit = 3 + krakenWsResubscribeDelayInSeconds = 3 + // WS endpoints + krakenWsHeartbeat = "heartbeat" + krakenWsPing = "ping" + krakenWsPong = "pong" + krakenWsSystemStatus = "systemStatus" + krakenWsSubscribe = "subscribe" + krakenWsSubscriptionStatus = "subscriptionStatus" + krakenWsUnsubscribe = "unsubscribe" + krakenWsTicker = "ticker" + krakenWsOHLC = "ohlc" + krakenWsTrade = "trade" + krakenWsSpread = "spread" + krakenWsOrderbook = "book" + // Only supported asset type + krakenWsAssetType = "SPOT" +) + +// orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time +var orderbookMutex sync.Mutex +var subscriptionChannelPair []WebsocketChannelData + +// writeToWebsocket sends a message to the websocket endpoint +func (k *Kraken) writeToWebsocket(message []byte) error { + k.mu.Lock() + defer k.mu.Unlock() + if k.Verbose { + log.Debugf("Sending message to WS: %v", string(message)) + } + return k.WebsocketConn.WriteMessage(websocket.TextMessage, message) +} + +// WsConnect initiates a websocket connection +func (k *Kraken) WsConnect() error { + if !k.Websocket.IsEnabled() || !k.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + if k.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(k.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + var err error + if k.Verbose { + log.Debugf("Attempting to connect to %v", k.Websocket.GetWebsocketURL()) + } + k.WebsocketConn, _, err = dialer.Dial(k.Websocket.GetWebsocketURL(), + http.Header{}) + if err != nil { + return fmt.Errorf("%s Unable to connect to Websocket. Error: %s", + k.Name, + err) + } + if k.Verbose { + log.Debugf("Successful connection to %v", k.Websocket.GetWebsocketURL()) + } + go k.WsHandleData() + go k.wsPingHandler() + k.WsSubscribeToDefaults() + return nil +} + +// WsSubscribeToDefaults subscribes to the websocket channels +func (k *Kraken) WsSubscribeToDefaults() { + channelsToSubscribe := []string{krakenWsTicker, krakenWsTrade, krakenWsOrderbook, krakenWsOHLC, krakenWsSpread} + for _, pair := range k.EnabledPairs { + // Kraken WS formats pairs with / but config and REST use - + formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1)) + for _, channel := range channelsToSubscribe { + err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + k.Websocket.DataHandler <- err + } + } + } +} + +// WsReadData reads data from the websocket connection +func (k *Kraken) WsReadData() (exchange.WebsocketResponse, error) { + mType, resp, err := k.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + k.Websocket.TrafficAlert <- struct{}{} + var standardMessage []byte + switch mType { + case websocket.TextMessage: + standardMessage = resp + + case websocket.BinaryMessage: + reader := flate.NewReader(bytes.NewReader(resp)) + standardMessage, err = ioutil.ReadAll(reader) + reader.Close() + if err != nil { + return exchange.WebsocketResponse{}, err + } + } + if k.Verbose { + log.Debugf("%v Websocket message received: %v", k.Name, string(standardMessage)) + } + + return exchange.WebsocketResponse{Raw: standardMessage}, nil +} + +// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket +func (k *Kraken) wsPingHandler() { + k.Websocket.Wg.Add(1) + defer k.Websocket.Wg.Done() + ticker := time.NewTicker(time.Second * 27) + for { + select { + case <-k.Websocket.ShutdownC: + return + + case <-ticker.C: + pingEvent := fmt.Sprintf("{\"event\":\"%v\"}", krakenWsPing) + if k.Verbose { + log.Debugf("%v sending ping", k.GetName()) + } + err := k.writeToWebsocket([]byte(pingEvent)) + if err != nil { + k.Websocket.DataHandler <- err + } + } + } +} + +// WsHandleData handles the read data from the websocket connection +func (k *Kraken) WsHandleData() { + k.Websocket.Wg.Add(1) + defer func() { + err := k.WebsocketConn.Close() + if err != nil { + k.Websocket.DataHandler <- fmt.Errorf("%v unable to to close Websocket connection. Error: %s", + k.GetName(), err) + } + k.Websocket.Wg.Done() + }() + + for { + select { + case <-k.Websocket.ShutdownC: + return + default: + resp, err := k.WsReadData() + if err != nil { + k.Websocket.DataHandler <- err + return + } + // event response handling + var eventResponse WebsocketEventResponse + err = common.JSONDecode(resp.Raw, &eventResponse) + if err == nil && eventResponse.Event != "" { + k.WsHandleEventResponse(&eventResponse) + continue + } + // Data response handling + var dataResponse WebsocketDataResponse + err = common.JSONDecode(resp.Raw, &dataResponse) + if err == nil && dataResponse[0].(float64) >= 0 { + k.WsHandleDataResponse(dataResponse) + continue + } + // Unknown data handling + k.Websocket.DataHandler <- fmt.Errorf("unrecognised response: %v", string(resp.Raw)) + continue + } + } +} + +// WsHandleDataResponse classifies the WS response and sends to appropriate handler +func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { + channelID := response[0].(float64) + channelData := getSubscriptionChannelData(channelID) + switch channelData.Subscription { + case krakenWsTicker: + if k.Verbose { + log.Debugf("%v Websocket ticker data received", + k.GetName()) + } + k.wsProcessTickers(channelData, response[1]) + case krakenWsOHLC: + if k.Verbose { + log.Debugf("%v Websocket OHLC data received", + k.GetName()) + } + k.wsProcessCandles(channelData, response[1]) + case krakenWsOrderbook: + if k.Verbose { + log.Debugf("%v Websocket Orderbook data received", + k.GetName()) + } + k.wsProcessOrderBook(channelData, response[1]) + case krakenWsSpread: + if k.Verbose { + log.Debugf("%v Websocket Spread data received", + k.GetName()) + } + k.wsProcessSpread(channelData, response[1]) + case krakenWsTrade: + if k.Verbose { + log.Debugf("%v Websocket Trade data received", + k.GetName()) + } + k.wsProcessTrades(channelData, response[1]) + default: + log.Errorf("%v Unidentified websocket data received: %v", + k.GetName(), response) + } +} + +// WsHandleDataResponse classifies the WS response and sends to appropriate handler +func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse) { + switch response.Event { + case krakenWsHeartbeat: + if k.Verbose { + log.Debugf("%v Websocket heartbeat data received", k.GetName()) + } + case krakenWsPong: + if k.Verbose { + log.Debugf("%v Websocket pong data received", k.GetName()) + } + case krakenWsSystemStatus: + if k.Verbose { + log.Debugf("%v Websocket status data received", k.GetName()) + } + if response.Status != "online" { + k.Websocket.DataHandler <- fmt.Errorf("%v Websocket status '%v'", + k.GetName(), response.Status) + } + if response.WebsocketStatusResponse.Version != krakenWSSupportedVersion { + log.Warnf("%v New version of Websocket API released. Was %v Now %v", + k.GetName(), krakenWSSupportedVersion, response.WebsocketStatusResponse.Version) + } + case krakenWsSubscriptionStatus: + if k.Verbose { + log.Debugf("%v Websocket subscription status data received", + k.GetName()) + } + if response.Status != "subscribed" { + k.Websocket.DataHandler <- fmt.Errorf(response.WebsocketErrorResponse.ErrorMessage) + k.ResubscribeToChannel(response.Subscription.Name, response.Pair) + return + } + addNewSubscriptionChannelData(response) + default: + log.Errorf("%v Unidentified websocket data received: %v", k.GetName(), response) + } +} + +// WsSubscribeToChannel sends a request to WS to subscribe to supplied channel name and pairs +func (k *Kraken) WsSubscribeToChannel(topic string, currencies []string, requestID int64) error { + resp := WebsocketSubscriptionEventRequest{ + Event: krakenWsSubscribe, + Pairs: currencies, + Subscription: WebsocketSubscriptionData{ + Name: topic, + }, + } + if requestID > 0 { + resp.RequestID = requestID + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// WsUnsubscribeToChannel sends a request to WS to unsubscribe to supplied channel name and pairs +func (k *Kraken) WsUnsubscribeToChannel(topic string, currencies []string, requestID int64) error { + resp := WebsocketSubscriptionEventRequest{ + Event: krakenWsUnsubscribe, + Pairs: currencies, + Subscription: WebsocketSubscriptionData{ + Name: topic, + }, + } + if requestID > 0 { + resp.RequestID = requestID + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// WsUnsubscribeToChannelByChannelID sends a request to WS to unsubscribe to supplied channel ID +func (k *Kraken) WsUnsubscribeToChannelByChannelID(channelID int64) error { + resp := WebsocketUnsubscribeByChannelIDEventRequest{ + Event: krakenWsUnsubscribe, + ChannelID: channelID, + } + json, err := common.JSONEncode(resp) + if err != nil { + return err + } + return k.writeToWebsocket(json) +} + +// addNewSubscriptionChannelData stores channel ids, pairs and subscription types to an array +// allowing correlation between subscriptions and returned data +func addNewSubscriptionChannelData(response *WebsocketEventResponse) { + for i := range subscriptionChannelPair { + if response.ChannelID == subscriptionChannelPair[i].ChannelID { + return + } + } + + // We change the / to - to maintain compatibility with REST/config + pair := currency.NewPairWithDelimiter(response.Pair.Base.String(), response.Pair.Quote.String(), "-") + subscriptionChannelPair = append(subscriptionChannelPair, WebsocketChannelData{ + Subscription: response.Subscription.Name, + Pair: pair, + ChannelID: response.ChannelID, + }) +} + +// getSubscriptionChannelData retrieves WebsocketChannelData based on response ID +func getSubscriptionChannelData(id float64) WebsocketChannelData { + for i := range subscriptionChannelPair { + if id == subscriptionChannelPair[i].ChannelID { + return subscriptionChannelPair[i] + } + } + return WebsocketChannelData{} +} + +// resubscribeToChannel will attempt to unsubscribe and resubscribe to a channel +func (k *Kraken) ResubscribeToChannel(channel string, pair currency.Pair) { + // Kraken WS formats pairs with / but config and REST use - + formattedPair := strings.ToUpper(strings.Replace(pair.String(), "-", "/", 1)) + if krakenWsResubscribeFailureLimit > 0 { + var successfulUnsubscribe bool + for i := 0; i < krakenWsResubscribeFailureLimit; i++ { + err := k.WsUnsubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + log.Error(err) + time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second) + continue + } + successfulUnsubscribe = true + break + } + if !successfulUnsubscribe { + log.Fatalf("%v websocket channel %v failed to unsubscribe after %v attempts", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } + successfulSubscribe := true + for i := 0; i < krakenWsResubscribeFailureLimit; i++ { + err := k.WsSubscribeToChannel(channel, []string{formattedPair}, 0) + if err != nil { + log.Error(err) + time.Sleep(krakenWsResubscribeDelayInSeconds * time.Second) + continue + } + successfulSubscribe = true + break + } + if !successfulSubscribe { + log.Fatalf("%v websocket channel %v failed to resubscribe after %v attempts", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } + } else { + log.Fatalf("%v websocket channel %v cannot resubscribe. Limit: %v", + k.GetName(), channel, krakenWsResubscribeFailureLimit) + } +} + +// wsProcessTickers converts ticker data and sends it to the datahandler +func (k *Kraken) wsProcessTickers(channelData WebsocketChannelData, data interface{}) { + tickerData := data.(map[string]interface{}) + closeData := tickerData["c"].([]interface{}) + openData := tickerData["o"].([]interface{}) + lowData := tickerData["l"].([]interface{}) + highData := tickerData["h"].([]interface{}) + volumeData := tickerData["v"].([]interface{}) + closePrice, _ := strconv.ParseFloat(closeData[0].(string), 64) + openPrice, _ := strconv.ParseFloat(openData[0].(string), 64) + highPrice, _ := strconv.ParseFloat(highData[0].(string), 64) + lowPrice, _ := strconv.ParseFloat(lowData[0].(string), 64) + quantity, _ := strconv.ParseFloat(volumeData[0].(string), 64) + + k.Websocket.DataHandler <- exchange.TickerData{ + Timestamp: time.Now(), + Exchange: k.GetName(), + AssetType: krakenWsAssetType, + Pair: channelData.Pair, + ClosePrice: closePrice, + OpenPrice: openPrice, + HighPrice: highPrice, + LowPrice: lowPrice, + Quantity: quantity, + } +} + +// wsProcessTickers converts ticker data and sends it to the datahandler +func (k *Kraken) wsProcessSpread(channelData WebsocketChannelData, data interface{}) { + spreadData := data.([]interface{}) + bestBid := spreadData[0].(string) + bestAsk := spreadData[1].(string) + timeData, _ := strconv.ParseFloat(spreadData[2].(string), 64) + sec, dec := math.Modf(timeData) + spreadTimestamp := time.Unix(int64(sec), int64(dec*(1e9))) + if k.Verbose { + log.Debugf("Spread data for '%v' received. Best bid: '%v' Best ask: '%v' Time: '%v'", + channelData.Pair, bestBid, bestAsk, spreadTimestamp) + } +} + +// wsProcessTrades converts trade data and sends it to the datahandler +func (k *Kraken) wsProcessTrades(channelData WebsocketChannelData, data interface{}) { + tradeData := data.([]interface{}) + for i := range tradeData { + trade := tradeData[i].([]interface{}) + timeData, _ := strconv.ParseInt(trade[2].(string), 10, 64) + timeUnix := time.Unix(timeData, 0) + price, _ := strconv.ParseFloat(trade[0].(string), 64) + amount, _ := strconv.ParseFloat(trade[1].(string), 64) + + k.Websocket.DataHandler <- exchange.TradeData{ + AssetType: krakenWsAssetType, + CurrencyPair: channelData.Pair, + EventTime: time.Now().Unix(), + Exchange: k.GetName(), + Price: price, + Amount: amount, + Timestamp: timeUnix, + Side: trade[3].(string), + } + } +} + +// wsProcessOrderBook determines if the orderbook data is partial or update +// Then sends to appropriate fun +func (k *Kraken) wsProcessOrderBook(channelData WebsocketChannelData, data interface{}) { + obData := data.(map[string]interface{}) + if _, ok := obData["as"]; ok { + k.wsProcessOrderBookPartial(channelData, obData) + } else if _, ok := obData["a"]; ok { + k.wsProcessOrderBookUpdate(channelData, obData) + } +} + +// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookPartial(channelData WebsocketChannelData, obData map[string]interface{}) { + ob := orderbook.Base{ + Pair: channelData.Pair, + AssetType: krakenWsAssetType, + } + // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry + // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time + askData := obData["as"].([]interface{}) + for i := range askData { + asks := askData[i].([]interface{}) + price, _ := strconv.ParseFloat(asks[0].(string), 64) + amount, _ := strconv.ParseFloat(asks[1].(string), 64) + ob.Asks = append(ob.Asks, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(asks[2].(string), 64) + sec, dec := math.Modf(timeData) + askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(askUpdatedTime) { + highestLastUpdate = askUpdatedTime + } + } + + bidData := obData["bs"].([]interface{}) + for i := range bidData { + bids := bidData[i].([]interface{}) + price, _ := strconv.ParseFloat(bids[0].(string), 64) + amount, _ := strconv.ParseFloat(bids[1].(string), 64) + ob.Bids = append(ob.Bids, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(bids[2].(string), 64) + sec, dec := math.Modf(timeData) + bidUpdateTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(bidUpdateTime) { + highestLastUpdate = bidUpdateTime + } + } + + ob.LastUpdated = highestLastUpdate + err := k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if err != nil { + k.Websocket.DataHandler <- err + return + } + + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Exchange: k.GetName(), + Asset: krakenWsAssetType, + Pair: channelData.Pair, + } +} + +// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair +func (k *Kraken) wsProcessOrderBookUpdate(channelData WebsocketChannelData, obData map[string]interface{}) { + ob, err := k.GetOrderbookEx(channelData.Pair, krakenWsAssetType) + if err != nil { + k.Websocket.DataHandler <- err + return + } + // Kraken ob data is timestamped per price, GCT orderbook data is timestamped per entry + // Using the highest last update time, we can attempt to respect both within a reasonable degree + var highestLastUpdate time.Time + // Ask data is not always sent + if _, ok := obData["a"]; ok { + askData := obData["a"].([]interface{}) + for i := range askData { + asks := askData[i].([]interface{}) + price, _ := strconv.ParseFloat(asks[0].(string), 64) + amount, _ := strconv.ParseFloat(asks[1].(string), 64) + + if amount == 0 { + for j := 0; j < len(ob.Asks); j++ { + if ob.Asks[j].Price == price { + ob.Asks = append(ob.Asks[:j], ob.Asks[j+1:]...) + j-- + continue + } + } + } + ob.Asks = append(ob.Asks, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(asks[2].(string), 64) + sec, dec := math.Modf(timeData) + askUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(askUpdatedTime) { + highestLastUpdate = askUpdatedTime + } + } + } + // Bid data is not always sent + if _, ok := obData["b"]; ok { + bidData := obData["b"].([]interface{}) + for i := range bidData { + bids := bidData[i].([]interface{}) + price, _ := strconv.ParseFloat(bids[0].(string), 64) + amount, _ := strconv.ParseFloat(bids[1].(string), 64) + + if amount == 0 { + for j := 0; j < len(ob.Bids); j++ { + if ob.Bids[j].Price == price { + ob.Bids = append(ob.Bids[:j], ob.Bids[j+1:]...) + j-- + continue + } + } + } + + ob.Bids = append(ob.Bids, orderbook.Item{ + Amount: amount, + Price: price, + }) + + timeData, _ := strconv.ParseFloat(bids[2].(string), 64) + sec, dec := math.Modf(timeData) + bidUpdatedTime := time.Unix(int64(sec), int64(dec*(1e9))) + if highestLastUpdate.Before(bidUpdatedTime) { + highestLastUpdate = bidUpdatedTime + } + } + } + + if ob.LastUpdated.After(highestLastUpdate) { + log.Errorf("orderbook update out of order. Existing: %v, Attempted: %v", ob.LastUpdated, highestLastUpdate) + k.ResubscribeToChannel(channelData.Subscription, channelData.Pair) + return + } + ob.LastUpdated = highestLastUpdate + err = k.Websocket.Orderbook.LoadSnapshot(&ob, k.GetName(), true) + if err != nil { + k.Websocket.DataHandler <- err + return + } + k.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Exchange: k.GetName(), + Asset: krakenWsAssetType, + Pair: channelData.Pair, + } +} + +// wsProcessCandles converts candle data and sends it to the data handler +func (k *Kraken) wsProcessCandles(channelData WebsocketChannelData, data interface{}) { + candleData := data.([]interface{}) + startTimeData, _ := strconv.ParseInt(candleData[0].(string), 10, 64) + startTimeUnix := time.Unix(startTimeData, 0) + endTimeData, _ := strconv.ParseInt(candleData[1].(string), 10, 64) + endTimeUnix := time.Unix(endTimeData, 0) + openPrice, _ := strconv.ParseFloat(candleData[2].(string), 64) + highPrice, _ := strconv.ParseFloat(candleData[3].(string), 64) + lowPrice, _ := strconv.ParseFloat(candleData[4].(string), 64) + closePrice, _ := strconv.ParseFloat(candleData[5].(string), 64) + volume, _ := strconv.ParseFloat(candleData[7].(string), 64) + + k.Websocket.DataHandler <- exchange.KlineData{ + AssetType: krakenWsAssetType, + Pair: channelData.Pair, + Timestamp: time.Now(), + Exchange: k.GetName(), + StartTime: startTimeUnix, + CloseTime: endTimeUnix, + // Candles are sent every 60 seconds + Interval: "60", + HighPrice: highPrice, + LowPrice: lowPrice, + OpenPrice: openPrice, + ClosePrice: closePrice, + Volume: volume, + } +} diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 1635a899..7749292f 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -307,7 +307,7 @@ func (k *Kraken) WithdrawFiatFundsToInternationalBank(withdrawRequest *exchange. // GetWebsocket returns a pointer to the exchange websocket func (k *Kraken) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrFunctionNotSupported + return k.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/okex/okex_test.go b/exchanges/okex/okex_test.go index bb4a42bc..343a6b49 100644 --- a/exchanges/okex/okex_test.go +++ b/exchanges/okex/okex_test.go @@ -1662,7 +1662,7 @@ func TestSubscribeToNonExistantChannel(t *testing.T) { return } var errorReceived bool - for i := 0; i < 5; i++ { + for i := 0; i < 7; i++ { response := <-o.Websocket.DataHandler if err, ok := response.(error); ok && err != nil { t.Log(response) diff --git a/exchanges/okgroup/okgroup_types.go b/exchanges/okgroup/okgroup_types.go index 53ca27bf..2811e3fe 100644 --- a/exchanges/okgroup/okgroup_types.go +++ b/exchanges/okgroup/okgroup_types.go @@ -1135,12 +1135,12 @@ type GetSwapForceLiquidatedOrdersRequest struct { // GetSwapForceLiquidatedOrdersResponse response data for GetSwapForceLiquidatedOrders type GetSwapForceLiquidatedOrdersResponse struct { - Loss float64 `json:"loss"` - Size int64 `json:"size"` - Price float64 `json:"price"` + Loss float64 `json:"loss,string"` + Size int64 `json:"size,string"` + Price float64 `json:"price,string"` CreatedAt string `json:"created_at"` InstrumentID string `json:"instrument_id"` - Type int64 `json:"type"` + Type int64 `json:"type,string"` } // GetSwapOnHoldAmountForOpenOrdersResponse response data for GetSwapOnHoldAmountForOpenOrders diff --git a/testdata/configtest.json b/testdata/configtest.json index 9ead24e4..67a737bf 100644 --- a/testdata/configtest.json +++ b/testdata/configtest.json @@ -935,7 +935,7 @@ "name": "Kraken", "enabled": true, "verbose": false, - "websocket": false, + "websocket": true, "useSandbox": false, "restPollingDelay": 10, "httpTimeout": 15000000000,