Files
gocryptotrader/exchanges/kraken/kraken_websocket.go
Adrian Gallagher 7ebc392532 build/linters: Bump Go to v1.25 and golangci-lint to v2.4.0 (#2005)
* build/linters: Bump Go to v1.25 and golangci-lint to v2.4.0

* refactor: Update TODO comments for net.Listen and net.DialTimeout; improve variable naming in websocket and exchange methods

* refactor: Rename massageMissingData to backfillMissingData for clarity and update references in RSI and MFI calculations

* fix: Correct typo in TODO comment for net.Listen in RPC server
2025-08-20 11:55:15 +10:00

1167 lines
36 KiB
Go

package kraken
import (
"bytes"
"context"
"errors"
"fmt"
"hash/crc32"
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/buger/jsonparser"
gws "github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
// List of all websocket channels to subscribe to
const (
krakenWSURL = "wss://ws.kraken.com"
krakenAuthWSURL = "wss://ws-auth.kraken.com"
krakenWSSandboxURL = "wss://sandbox.kraken.com"
krakenWSSupportedVersion = "1.4.0"
// Websocket Channels
krakenWsHeartbeat = "heartbeat"
krakenWsSystemStatus = "systemStatus"
krakenWsSubscribe = "subscribe"
krakenWsUnsubscribe = "unsubscribe"
krakenWsSubscribed = "subscribed"
krakenWsUnsubscribed = "unsubscribed"
krakenWsSubscriptionStatus = "subscriptionStatus"
krakenWsTicker = "ticker"
krakenWsOHLC = "ohlc"
krakenWsTrade = "trade"
krakenWsSpread = "spread"
krakenWsOrderbook = "book"
krakenWsOwnTrades = "ownTrades"
krakenWsOpenOrders = "openOrders"
krakenWsAddOrder = "addOrder"
krakenWsCancelOrder = "cancelOrder"
krakenWsCancelAll = "cancelAll"
krakenWsAddOrderStatus = "addOrderStatus"
krakenWsCancelOrderStatus = "cancelOrderStatus"
krakenWsCancelAllOrderStatus = "cancelAllStatus"
krakenWsPong = "pong"
krakenWsPingDelay = time.Second * 27
)
var channelNames = map[string]string{
subscription.TickerChannel: krakenWsTicker,
subscription.OrderbookChannel: krakenWsOrderbook,
subscription.CandlesChannel: krakenWsOHLC,
subscription.AllTradesChannel: krakenWsTrade,
subscription.MyTradesChannel: krakenWsOwnTrades,
subscription.MyOrdersChannel: krakenWsOpenOrders,
}
var reverseChannelNames = map[string]string{}
func init() {
for k, v := range channelNames {
reverseChannelNames[v] = k
}
}
var (
errCancellingOrder = errors.New("error cancelling order")
errSubPairMissing = errors.New("pair missing from subscription response")
errInvalidChecksum = errors.New("invalid checksum")
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 1000},
{Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true},
}
// WsConnect initiates a websocket connection
func (e *Exchange) WsConnect() error {
ctx := context.TODO()
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
return websocket.ErrWebsocketNotEnabled
}
var dialer gws.Dialer
err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{})
if err != nil {
return err
}
comms := make(chan websocket.Response)
e.Websocket.Wg.Add(2)
go e.wsReadData(ctx, comms)
go e.wsFunnelConnectionData(e.Websocket.Conn, comms)
if e.IsWebsocketAuthenticationSupported() {
if authToken, err := e.GetWebsocketToken(ctx); err != nil {
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
log.Errorf(log.ExchangeSys, "%s - authentication failed: %v\n", e.Name, err)
} else {
if err := e.Websocket.AuthConn.Dial(ctx, &dialer, http.Header{}); err != nil {
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
log.Errorf(log.ExchangeSys, "%s - failed to connect to authenticated endpoint: %v\n", e.Name, err)
} else {
e.setWebsocketAuthToken(authToken)
e.Websocket.SetCanUseAuthenticatedEndpoints(true)
e.Websocket.Wg.Add(1)
go e.wsFunnelConnectionData(e.Websocket.AuthConn, comms)
e.startWsPingHandler(e.Websocket.AuthConn)
}
}
}
e.startWsPingHandler(e.Websocket.Conn)
return nil
}
// wsFunnelConnectionData funnels both auth and public ws data into one manageable place
func (e *Exchange) wsFunnelConnectionData(ws websocket.Connection, comms chan websocket.Response) {
defer e.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
if resp.Raw == nil {
return
}
comms <- resp
}
}
// wsReadData receives and passes on websocket messages for processing
func (e *Exchange) wsReadData(ctx context.Context, comms chan websocket.Response) {
defer e.Websocket.Wg.Done()
for {
select {
case <-e.Websocket.ShutdownC:
select {
case resp := <-comms:
err := e.wsHandleData(ctx, resp.Raw)
if err != nil {
select {
case e.Websocket.DataHandler <- err:
default:
log.Errorf(log.WebsocketMgr, "%s websocket handle data error: %v", e.Name, err)
}
}
default:
}
return
case resp := <-comms:
err := e.wsHandleData(ctx, resp.Raw)
if err != nil {
e.Websocket.DataHandler <- err
}
}
}
}
func (e *Exchange) wsHandleData(_ context.Context, respRaw []byte) error {
if strings.HasPrefix(string(respRaw), "[") {
var msg []json.RawMessage
if err := json.Unmarshal(respRaw, &msg); err != nil {
return err
}
if len(msg) < 3 {
return fmt.Errorf("data array too short: %s", respRaw)
}
// For all types of channel second to last field is the channel Name
var chanName string
if err := json.Unmarshal(msg[len(msg)-2], &chanName); err != nil {
return fmt.Errorf("error unmarshalling channel name: %w", err)
}
pair := currency.EMPTYPAIR
var maybePair string
if err := json.Unmarshal(msg[len(msg)-1], &maybePair); err == nil {
p, err := currency.NewPairFromString(maybePair)
if err != nil {
return err
}
pair = p
}
return e.wsReadDataResponse(chanName, pair, msg)
}
event, err := jsonparser.GetString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w parsing: %s", err, respRaw)
}
if event == krakenWsSubscriptionStatus { // Must happen before IncomingWithData to avoid race
e.wsProcessSubStatus(respRaw)
}
reqID, err := jsonparser.GetInt(respRaw, "reqid")
if err == nil && reqID != 0 && e.Websocket.Match.IncomingWithData(reqID, respRaw) {
return nil
}
if event == "" {
return nil
}
switch event {
case krakenWsPong, krakenWsHeartbeat:
return nil
case krakenWsCancelOrderStatus, krakenWsCancelAllOrderStatus, krakenWsAddOrderStatus, krakenWsSubscriptionStatus:
// All of these should have found a listener already
return fmt.Errorf("%w: %s %v", websocket.ErrSignatureNotMatched, event, reqID)
case krakenWsSystemStatus:
return e.wsProcessSystemStatus(respRaw)
default:
e.Websocket.DataHandler <- websocket.UnhandledMessageWarning{
Message: fmt.Sprintf("%s: %s", websocket.UnhandledMessage, respRaw),
}
}
return nil
}
// startWsPingHandler sets up a websocket ping handler to maintain a connection
func (e *Exchange) startWsPingHandler(conn websocket.Connection) {
conn.SetupPingHandler(request.Unset, websocket.PingHandler{
Message: []byte(`{"event":"ping"}`),
Delay: krakenWsPingDelay,
MessageType: gws.TextMessage,
})
}
// wsReadDataResponse classifies the WS response and sends to appropriate handler
func (e *Exchange) wsReadDataResponse(c string, pair currency.Pair, response []json.RawMessage) error {
switch c {
case krakenWsTicker:
return e.wsProcessTickers(response[1], pair)
case krakenWsSpread:
return e.wsProcessSpread(response[1], pair)
case krakenWsTrade:
return e.wsProcessTrades(response[1], pair)
case krakenWsOwnTrades:
return e.wsProcessOwnTrades(response[0])
case krakenWsOpenOrders:
return e.wsProcessOpenOrders(response[0])
}
channelType := strings.TrimRight(c, "-0123456789")
switch channelType {
case krakenWsOHLC:
return e.wsProcessCandle(c, response[1], pair)
case krakenWsOrderbook:
return e.wsProcessOrderBook(c, response, pair)
default:
return fmt.Errorf("received unidentified data for subscription %s: %+v", c, response)
}
}
func (e *Exchange) wsProcessSystemStatus(respRaw []byte) error {
var systemStatus wsSystemStatus
if err := json.Unmarshal(respRaw, &systemStatus); err != nil {
return fmt.Errorf("%s parsing system status: %s", err, respRaw)
}
if systemStatus.Status != "online" {
e.Websocket.DataHandler <- fmt.Errorf("system status not online: %v", systemStatus.Status)
}
if systemStatus.Version > krakenWSSupportedVersion {
log.Warnf(log.ExchangeSys, "%v New version of Websocket API released. Was %v Now %v", e.Name, krakenWSSupportedVersion, systemStatus.Version)
}
return nil
}
func (e *Exchange) wsProcessOwnTrades(ownOrdersRaw json.RawMessage) error {
var result []map[string]*WsOwnTrade
if err := json.Unmarshal(ownOrdersRaw, &result); err != nil {
return err
}
if len(result) == 0 {
return nil
}
for key, val := range result[0] {
oSide, err := order.StringToOrderSide(val.Type)
if err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
}
oType, err := order.StringToOrderType(val.OrderType)
if err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
}
e.Websocket.DataHandler <- &order.Detail{
Exchange: e.Name,
OrderID: val.OrderTransactionID,
Trades: []order.TradeHistory{
{
Price: val.Price,
Amount: val.Vol,
Fee: val.Fee,
Exchange: e.Name,
TID: key,
Type: oType,
Side: oSide,
Timestamp: val.Time.Time(),
},
},
}
}
return nil
}
// wsProcessOpenOrders processes open orders from the websocket response
func (e *Exchange) wsProcessOpenOrders(ownOrdersResp json.RawMessage) error {
var result []map[string]*WsOpenOrder
if err := json.Unmarshal(ownOrdersResp, &result); err != nil {
return err
}
for r := range result {
for key, val := range result[r] {
d := &order.Detail{
Exchange: e.Name,
OrderID: key,
AverageExecutedPrice: val.AveragePrice,
Amount: val.Volume,
LimitPriceUpper: val.LimitPrice,
ExecutedAmount: val.ExecutedVolume,
Fee: val.Fee,
Date: val.OpenTime.Time(),
LastUpdated: val.LastUpdated.Time(),
}
if val.Status != "" {
if s, err := order.StringToOrderStatus(val.Status); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
} else {
d.Status = s
}
}
if val.Description.Pair != "" {
if strings.Contains(val.Description.Order, "sell") {
d.Side = order.Sell
} else {
if oSide, err := order.StringToOrderSide(val.Description.Type); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
} else {
d.Side = oSide
}
}
if oType, err := order.StringToOrderType(val.Description.OrderType); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
} else {
d.Type = oType
}
if p, err := currency.NewPairFromString(val.Description.Pair); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
} else {
d.Pair = p
if d.AssetType, err = e.GetPairAssetType(p); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: key,
Err: err,
}
}
}
}
if val.Description.Price > 0 {
d.Leverage = val.Description.Leverage
d.Price = val.Description.Price
}
if val.Volume > 0 {
// Note: Volume and ExecutedVolume are only populated when status is open
d.RemainingAmount = val.Volume - val.ExecutedVolume
}
e.Websocket.DataHandler <- d
}
}
return nil
}
// wsProcessTickers converts ticker data and sends it to the datahandler
func (e *Exchange) wsProcessTickers(dataRaw json.RawMessage, pair currency.Pair) error {
var t wsTicker
if err := json.Unmarshal(dataRaw, &t); err != nil {
return fmt.Errorf("error unmarshalling ticker data: %w", err)
}
e.Websocket.DataHandler <- &ticker.Price{
ExchangeName: e.Name,
Ask: t.Ask[0].Float64(),
Bid: t.Bid[0].Float64(),
Close: t.Last[0].Float64(),
Volume: t.Volume[0].Float64(),
Low: t.Low[0].Float64(),
High: t.High[0].Float64(),
Open: t.Open[0].Float64(),
AssetType: asset.Spot,
Pair: pair,
}
return nil
}
// wsProcessSpread converts spread/orderbook data and sends it to the datahandler
func (e *Exchange) wsProcessSpread(rawData json.RawMessage, pair currency.Pair) error {
var data wsSpread
if err := json.Unmarshal(rawData, &data); err != nil {
return fmt.Errorf("error unmarshalling spread data: %w", err)
}
if e.Verbose {
log.Debugf(log.ExchangeSys, "%s Spread data for %q received. Best bid: '%v' Best ask: '%v' Time: %q, Bid volume: '%v', Ask volume: '%v'",
e.Name,
pair,
data.Bid.Float64(),
data.Ask.Float64(),
data.Time.Time(),
data.BidVolume.Float64(),
data.AskVolume.Float64())
}
return nil
}
// wsProcessTrades converts trade data and sends it to the datahandler
func (e *Exchange) wsProcessTrades(respRaw json.RawMessage, pair currency.Pair) error {
saveTradeData := e.IsSaveTradeDataEnabled()
tradeFeed := e.IsTradeFeedEnabled()
if !saveTradeData && !tradeFeed {
return nil
}
var t []wsTrades
if err := json.Unmarshal(respRaw, &t); err != nil {
return fmt.Errorf("error unmarshalling trade data: %w", err)
}
trades := make([]trade.Data, len(t))
for i := range trades {
side := order.Buy
if t[i].Side == "s" {
side = order.Sell
}
trades[i] = trade.Data{
AssetType: asset.Spot,
CurrencyPair: pair,
Exchange: e.Name,
Price: t[i].Price.Float64(),
Amount: t[i].Volume.Float64(),
Timestamp: t[i].Time.Time().UTC(),
Side: side,
}
}
if tradeFeed {
for i := range trades {
e.Websocket.DataHandler <- trades[i]
}
}
if saveTradeData {
return trade.AddTradesToBuffer(trades...)
}
return nil
}
func hasKey(raw json.RawMessage, key string) bool {
_, dataType, _, err := jsonparser.Get(raw, key)
if err != nil || dataType == jsonparser.NotExist {
return false
}
return true
}
// wsProcessOrderBook handles both partial and full orderbook updates
func (e *Exchange) wsProcessOrderBook(c string, response []json.RawMessage, pair currency.Pair) error {
key := &subscription.Subscription{
Channel: c,
Asset: asset.Spot,
Pairs: currency.Pairs{pair},
}
if err := fqChannelNameSub(key); err != nil {
return err
}
s := e.Websocket.GetSubscription(key)
if s == nil {
return fmt.Errorf("%w: %s %s %s", subscription.ErrNotFound, asset.Spot, c, pair)
}
if s.State() == subscription.UnsubscribingState {
// We only care if it's currently unsubscribing
return nil
}
if isSnapshot := hasKey(response[1], "as") && hasKey(response[1], "bs"); !isSnapshot {
var update wsUpdate
if err := json.Unmarshal(response[1], &update); err != nil {
return fmt.Errorf("error unmarshalling orderbook update: %w", err)
}
if len(response) == 5 {
var update2 wsUpdate
if err := json.Unmarshal(response[2], &update2); err != nil {
return fmt.Errorf("error unmarshalling orderbook update: %w", err)
}
update.Bids = make([]wsOrderbookItem, len(update2.Bids))
copy(update.Bids, update2.Bids)
update.Checksum = update2.Checksum
}
err := e.wsProcessOrderBookUpdate(pair, &update)
if errors.Is(err, errInvalidChecksum) {
log.Debugf(log.Global, "%s Resubscribing to invalid %s orderbook", e.Name, pair)
go func() {
if e2 := e.Websocket.ResubscribeToChannel(e.Websocket.Conn, s); e2 != nil && !errors.Is(e2, subscription.ErrInStateAlready) {
log.Errorf(log.ExchangeSys, "%s resubscription failure for %v: %v", e.Name, pair, e2)
}
}()
}
return err
}
var snapshot wsSnapshot
if err := json.Unmarshal(response[1], &snapshot); err != nil {
return fmt.Errorf("error unmarshalling orderbook snapshot: %w", err)
}
return e.wsProcessOrderBookPartial(pair, &snapshot, key.Levels)
}
// wsProcessOrderBookPartial creates a new orderbook entry for a given currency pair
func (e *Exchange) wsProcessOrderBookPartial(pair currency.Pair, obSnapshot *wsSnapshot, levels int) error {
base := orderbook.Book{
Pair: pair,
Asset: asset.Spot,
ValidateOrderbook: e.ValidateOrderbook,
Bids: make(orderbook.Levels, len(obSnapshot.Bids)),
Asks: make(orderbook.Levels, len(obSnapshot.Asks)),
MaxDepth: levels,
ChecksumStringRequired: true,
}
// Kraken ob data is timestamped per price, GCT orderbook data is
// timestamped per entry using the highest last update time, we can attempt
// to respect both within a reasonable degree
var highestLastUpdate time.Time
for i := range obSnapshot.Asks {
base.Asks[i].Price = obSnapshot.Asks[i].Price
base.Asks[i].StrPrice = obSnapshot.Asks[i].PriceRaw
base.Asks[i].Amount = obSnapshot.Asks[i].Amount
base.Asks[i].StrAmount = obSnapshot.Asks[i].AmountRaw
askUpdatedTime := obSnapshot.Asks[i].Time.Time()
if highestLastUpdate.Before(askUpdatedTime) {
highestLastUpdate = askUpdatedTime
}
}
for i := range obSnapshot.Bids {
base.Bids[i].Price = obSnapshot.Bids[i].Price
base.Bids[i].StrPrice = obSnapshot.Bids[i].PriceRaw
base.Bids[i].Amount = obSnapshot.Bids[i].Amount
base.Bids[i].StrAmount = obSnapshot.Bids[i].AmountRaw
bidUpdateTime := obSnapshot.Bids[i].Time.Time()
if highestLastUpdate.Before(bidUpdateTime) {
highestLastUpdate = bidUpdateTime
}
}
base.LastUpdated = highestLastUpdate
base.Exchange = e.Name
return e.Websocket.Orderbook.LoadSnapshot(&base)
}
// wsProcessOrderBookUpdate updates an orderbook entry for a given currency pair
func (e *Exchange) wsProcessOrderBookUpdate(pair currency.Pair, wsUpdt *wsUpdate) error {
obUpdate := orderbook.Update{
Asset: asset.Spot,
Pair: pair,
Bids: make(orderbook.Levels, len(wsUpdt.Bids)),
Asks: make(orderbook.Levels, len(wsUpdt.Asks)),
}
// Calculating checksum requires incoming decimal place checks for both
// price and amount as there is no set standard between currency pairs. This
// is calculated per update as opposed to snapshot because changes to
// decimal amounts could occur at any time.
var highestLastUpdate time.Time
// Ask data is not always sent
for i := range wsUpdt.Asks {
obUpdate.Asks[i].Price = wsUpdt.Asks[i].Price
obUpdate.Asks[i].StrPrice = wsUpdt.Asks[i].PriceRaw
obUpdate.Asks[i].Amount = wsUpdt.Asks[i].Amount
obUpdate.Asks[i].StrAmount = wsUpdt.Asks[i].AmountRaw
askUpdatedTime := wsUpdt.Asks[i].Time.Time()
if highestLastUpdate.Before(askUpdatedTime) {
highestLastUpdate = askUpdatedTime
}
}
// Bid data is not always sent
for i := range wsUpdt.Bids {
obUpdate.Bids[i].Price = wsUpdt.Bids[i].Price
obUpdate.Bids[i].StrPrice = wsUpdt.Bids[i].PriceRaw
obUpdate.Bids[i].Amount = wsUpdt.Bids[i].Amount
obUpdate.Bids[i].StrAmount = wsUpdt.Bids[i].AmountRaw
bidUpdatedTime := wsUpdt.Bids[i].Time.Time()
if highestLastUpdate.Before(bidUpdatedTime) {
highestLastUpdate = bidUpdatedTime
}
}
obUpdate.UpdateTime = highestLastUpdate
err := e.Websocket.Orderbook.Update(&obUpdate)
if err != nil {
return err
}
book, err := e.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil {
return fmt.Errorf("cannot calculate websocket checksum: book not found for %s %s %w", pair, asset.Spot, err)
}
return validateCRC32(book, wsUpdt.Checksum)
}
func validateCRC32(b *orderbook.Book, token uint32) error {
if b == nil {
return common.ErrNilPointer
}
var checkStr strings.Builder
for i := 0; i < 10 && i < len(b.Asks); i++ {
_, err := checkStr.WriteString(trim(b.Asks[i].StrPrice + trim(b.Asks[i].StrAmount)))
if err != nil {
return err
}
}
for i := 0; i < 10 && i < len(b.Bids); i++ {
_, err := checkStr.WriteString(trim(b.Bids[i].StrPrice) + trim(b.Bids[i].StrAmount))
if err != nil {
return err
}
}
if check := crc32.ChecksumIEEE([]byte(checkStr.String())); check != token {
return fmt.Errorf("%s %s %w %d, expected %d", b.Pair, b.Asset, errInvalidChecksum, check, token)
}
return nil
}
// trim removes '.' and prefixed '0' from subsequent string
func trim(s string) string {
s = strings.Replace(s, ".", "", 1)
s = strings.TrimLeft(s, "0")
return s
}
// wsProcessCandle converts candle data and sends it to the data handler
func (e *Exchange) wsProcessCandle(c string, resp json.RawMessage, pair currency.Pair) error {
var data wsCandle
if err := json.Unmarshal(resp, &data); err != nil {
return fmt.Errorf("error unmarshalling candle data: %w", err)
}
// Faster than getting it through the subscription
parts := strings.Split(c, "-")
if len(parts) != 2 {
return errBadChannelSuffix
}
interval := parts[1]
e.Websocket.DataHandler <- websocket.KlineData{
AssetType: asset.Spot,
Pair: pair,
Timestamp: time.Now(),
Exchange: e.Name,
StartTime: data.LastUpdateTime.Time(),
CloseTime: data.LastUpdateTime.Time(),
OpenPrice: data.Open.Float64(),
HighPrice: data.High.Float64(),
LowPrice: data.Low.Float64(),
ClosePrice: data.Close.Float64(),
Volume: data.Volume.Float64(),
Interval: interval,
}
return nil
}
// GetSubscriptionTemplate returns a subscription channel template
func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}
func (e *Exchange) generateSubscriptions() (subscription.List, error) {
return e.Features.Subscriptions.ExpandTemplates(e)
}
// Subscribe adds a channel subscription to the websocket
func (e *Exchange) Subscribe(in subscription.List) error {
ctx := context.TODO()
in, errs := in.ExpandTemplates(e)
// Collect valid new subs and add to websocket in Subscribing state
subs := subscription.List{}
for _, s := range in {
if s.State() != subscription.ResubscribingState {
if err := e.Websocket.AddSubscriptions(e.Websocket.Conn, s); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join()))
continue
}
}
subs = append(subs, s)
}
// Merge subs by grouping pairs for request; We make a single request to subscribe to N+ pairs, but get N+ responses back
groupedSubs := subs.GroupPairs()
errs = common.AppendError(errs,
e.ParallelChanOp(ctx, groupedSubs, func(ctx context.Context, s subscription.List) error { return e.manageSubs(ctx, krakenWsSubscribe, s) }, 1),
)
for _, s := range subs {
if s.State() != subscription.SubscribedState {
_ = s.SetState(subscription.InactiveState)
if err := e.Websocket.RemoveSubscriptions(e.Websocket.Conn, s); err != nil {
errs = common.AppendError(errs, fmt.Errorf("error removing failed subscription: %w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join()))
}
}
}
return errs
}
// Unsubscribe removes a channel subscriptions from the websocket
func (e *Exchange) Unsubscribe(keys subscription.List) error {
ctx := context.TODO()
var errs error
// Make sure we have the concrete subscriptions, since we will change the state
subs := make(subscription.List, 0, len(keys))
for _, key := range keys {
if s := e.Websocket.GetSubscription(key); s == nil {
errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", subscription.ErrNotFound, key.Channel, key.Pairs.Join()))
} else {
if s.State() != subscription.ResubscribingState {
if err := s.SetState(subscription.UnsubscribingState); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, s.Pairs.Join()))
continue
}
}
subs = append(subs, s)
}
}
subs = subs.GroupPairs()
return common.AppendError(errs,
e.ParallelChanOp(ctx, subs, func(ctx context.Context, s subscription.List) error { return e.manageSubs(ctx, krakenWsUnsubscribe, s) }, 1),
)
}
// manageSubs handles both websocket channel subscribe and unsubscribe
func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription.List) error {
if len(subs) != 1 {
return subscription.ErrBatchingNotSupported
}
s := subs[0]
if err := enforceStandardChannelNames(s); err != nil {
return err
}
reqFmt := currency.PairFormat{Uppercase: true, Delimiter: "/"}
r := &WebsocketSubRequest{
Event: op,
RequestID: e.Websocket.Conn.GenerateMessageID(false),
Subscription: WebsocketSubscriptionData{
Name: s.QualifiedChannel,
Depth: s.Levels,
},
Pairs: s.Pairs.Format(reqFmt).Strings(),
}
if s.Interval != 0 {
// TODO: Can Interval type be a kraken specific type with a MarshalText so we don't have to duplicate this
r.Subscription.Interval = int(time.Duration(s.Interval).Minutes())
}
conn := e.Websocket.Conn
if s.Authenticated {
r.Subscription.Token = e.websocketAuthToken()
conn = e.Websocket.AuthConn
}
resps, err := conn.SendMessageReturnResponses(ctx, request.Unset, r.RequestID, r, len(s.Pairs))
// Ignore an overall timeout, because we'll track individual subscriptions in handleSubResps
err = common.ExcludeError(err, websocket.ErrSignatureTimeout)
if err != nil {
return fmt.Errorf("%w; Channel: %s Pair: %s", err, s.Channel, s.Pairs)
}
return e.handleSubResps(s, resps, op)
}
// handleSubResps takes a collection of subscription responses from Kraken
// We submit a subscription for N+ pairs, and we get N+ individual responses
// Returns an error collection of unique errors and its pairs
func (e *Exchange) handleSubResps(s *subscription.Subscription, resps [][]byte, op string) error {
reqFmt := currency.PairFormat{Uppercase: true, Delimiter: "/"}
errMap := map[string]error{}
pairErrs := map[currency.Pair]error{}
for _, p := range s.Pairs {
pairErrs[p.Format(reqFmt)] = errSubPairMissing
}
subPairs := currency.Pairs{}
for _, resp := range resps {
pName, err := jsonparser.GetUnsafeString(resp, "pair")
if err != nil {
return fmt.Errorf("%w parsing WS pair from message: %s", err, resp)
}
pair, err := currency.NewPairDelimiter(pName, "/")
if err != nil {
return fmt.Errorf("%w parsing WS pair; Channel: %s Pair: %s", err, s.Channel, pName)
}
if err := e.getSubRespErr(resp, op); err != nil {
// Remove the pair name from the error so we can group errors
errStr := strings.TrimSpace(strings.TrimSuffix(err.Error(), pName))
if _, ok := errMap[errStr]; !ok {
errMap[errStr] = errors.New(errStr)
}
pairErrs[pair] = errMap[errStr]
} else {
delete(pairErrs, pair)
if e.Verbose && op == krakenWsSubscribe {
subPairs = subPairs.Add(pair)
}
}
}
// 2) Reverse the collection and report a list of pairs with each unique error, and re-add the missing and error pairs for unsubscribe
errPairs := map[error]currency.Pairs{}
for pair, err := range pairErrs {
errPairs[err] = errPairs[err].Add(pair)
}
var errs error
for err, pairs := range errPairs {
errs = common.AppendError(errs, fmt.Errorf("%w; Channel: %s Pairs: %s", err, s.Channel, pairs.Join()))
}
if e.Verbose && len(subPairs) > 0 {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pairs: %s", e.Name, s.Channel, subPairs.Join())
}
return errs
}
// getSubRespErr calls getRespErr and if there's no error from that ensures the status matches the sub operation
func (e *Exchange) getSubRespErr(resp []byte, op string) error {
if err := e.getRespErr(resp); err != nil {
return err
}
exp := op + "d" // subscribed or unsubscribed
if status, err := jsonparser.GetUnsafeString(resp, "status"); err != nil {
return fmt.Errorf("error parsing WS status: %w from message: %s", err, resp)
} else if status != exp {
return fmt.Errorf("wrong WS status: %s; expected: %s from message %s", exp, op, resp)
}
return nil
}
// getRespErr takes a json response string and looks for an error event type
// If found it returns the errorMessage
// It might log parsing errors about the nature of the error
// If the error message is not defined it will return a wrapped errUnknownError
func (e *Exchange) getRespErr(resp []byte) error {
event, err := jsonparser.GetUnsafeString(resp, "event")
switch {
case err != nil:
return fmt.Errorf("error parsing WS event: %w from message: %s", err, resp)
case event != "error":
status, _ := jsonparser.GetUnsafeString(resp, "status") // Error is really irrelevant here
if status != "error" {
return nil
}
}
var msg string
if msg, err = jsonparser.GetString(resp, "errorMessage"); err != nil {
log.Errorf(log.ExchangeSys, "%s error parsing WS errorMessage: %s from message: %s", e.Name, err, resp)
return fmt.Errorf("%w: error message did not contain errorMessage: %s", common.ErrUnknownError, resp)
}
return errors.New(msg)
}
// wsProcessSubStatus handles creating or removing Subscriptions as soon as we receive a message
// It's job is to ensure that subscription state is kept correct sequentially between WS messages
// If this responsibility was moved to Subscribe then we would have a race due to the channel connecting IncomingWithData
func (e *Exchange) wsProcessSubStatus(resp []byte) {
pName, err := jsonparser.GetUnsafeString(resp, "pair")
if err != nil {
return
}
pair, err := currency.NewPairFromString(pName)
if err != nil {
return
}
c, err := jsonparser.GetUnsafeString(resp, "channelName")
if err != nil {
return
}
if err = e.getRespErr(resp); err != nil {
return
}
status, err := jsonparser.GetUnsafeString(resp, "status")
if err != nil {
return
}
key := &subscription.Subscription{
// We don't use asset because it's either Empty or Spot, but not both
Channel: c,
Pairs: currency.Pairs{pair},
}
if err = fqChannelNameSub(key); err != nil {
return
}
s := e.Websocket.GetSubscription(&subscription.IgnoringAssetKey{Subscription: key})
if s == nil {
log.Errorf(log.ExchangeSys, "%s %s Channel: %s Pairs: %s", e.Name, subscription.ErrNotFound, key.Channel, key.Pairs.Join())
return
}
if status == krakenWsSubscribed {
err = s.SetState(subscription.SubscribedState)
} else if s.State() != subscription.ResubscribingState { // Do not remove a resubscribing sub which just unsubbed
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, s)
if e2 := s.SetState(subscription.UnsubscribedState); e2 != nil {
err = common.AppendError(err, e2)
}
}
if err != nil {
log.Errorf(log.ExchangeSys, "%s %s Channel: %s Pairs: %s", e.Name, err, s.Channel, s.Pairs.Join())
}
}
// channelName converts a global channel name to kraken bespoke names
func channelName(s *subscription.Subscription) string {
if n, ok := channelNames[s.Channel]; ok {
return n
}
return s.Channel
}
func enforceStandardChannelNames(s *subscription.Subscription) error {
name := strings.Split(s.Channel, "-") // Protect against attempted usage of book-N as a channel name
if n, ok := reverseChannelNames[name[0]]; ok && n != s.Channel {
return fmt.Errorf("%w: %s => subscription.%s%sChannel", subscription.ErrUseConstChannelName, s.Channel, bytes.ToUpper([]byte{n[0]}), n[1:])
}
return nil
}
// fqChannelNameSub converts an fully qualified channel name into standard name and subscription params
// e.g. book-5 => subscription.OrderbookChannel with Levels: 5
func fqChannelNameSub(s *subscription.Subscription) error {
parts := strings.Split(s.Channel, "-")
name := parts[0]
if stdName, ok := reverseChannelNames[name]; ok {
name = stdName
}
if name == subscription.OrderbookChannel || name == subscription.CandlesChannel {
if len(parts) != 2 {
return errBadChannelSuffix
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return errBadChannelSuffix
}
switch name {
case subscription.OrderbookChannel:
s.Levels = i
case subscription.CandlesChannel:
s.Interval = kline.Interval(time.Minute * time.Duration(i))
}
}
s.Channel = name
return nil
}
// wsAddOrder creates an order, returned order ID if success
func (e *Exchange) wsAddOrder(ctx context.Context, req *WsAddOrderRequest) (string, error) {
if req == nil {
return "", common.ErrNilPointer
}
req.RequestID = e.Websocket.AuthConn.GenerateMessageID(false)
req.Event = krakenWsAddOrder
req.Token = e.websocketAuthToken()
jsonResp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, req.RequestID, req)
if err != nil {
return "", err
}
var resp WsAddOrderResponse
err = json.Unmarshal(jsonResp, &resp)
if err != nil {
return "", err
}
if resp.Status == "error" {
return "", errors.New("AddOrder error: " + resp.ErrorMessage)
}
e.Websocket.DataHandler <- &order.Detail{
Exchange: e.Name,
OrderID: resp.TransactionID,
Status: order.New,
}
return resp.TransactionID, nil
}
// wsCancelOrders cancels open orders concurrently
// It does not use the multiple txId facility of the cancelOrder API because the errors are not specific
func (e *Exchange) wsCancelOrders(ctx context.Context, orderIDs []string) error {
errs := common.CollectErrors(len(orderIDs))
for _, id := range orderIDs {
go func() {
defer errs.Wg.Done()
errs.C <- e.wsCancelOrder(ctx, id)
}()
}
return errs.Collect()
}
// wsCancelOrder cancels an open order
func (e *Exchange) wsCancelOrder(ctx context.Context, orderID string) error {
id := e.Websocket.AuthConn.GenerateMessageID(false)
req := WsCancelOrderRequest{
Event: krakenWsCancelOrder,
Token: e.websocketAuthToken(),
TransactionIDs: []string{orderID},
RequestID: id,
}
resp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, id, req)
if err != nil {
return fmt.Errorf("%w %s: %w", errCancellingOrder, orderID, err)
}
status, err := jsonparser.GetUnsafeString(resp, "status")
if err != nil {
return fmt.Errorf("%w 'status': %w from message: %s", common.ErrParsingWSField, err, resp)
} else if status == "ok" {
return nil
}
err = common.ErrUnknownError
if msg, pErr := jsonparser.GetUnsafeString(resp, "errorMessage"); pErr == nil && msg != "" {
err = errors.New(msg)
}
return fmt.Errorf("%w %s: %w", errCancellingOrder, orderID, err)
}
// wsCancelAllOrders cancels all opened orders
// Returns number (count param) of affected orders or 0 if no open orders found
func (e *Exchange) wsCancelAllOrders(ctx context.Context) (*WsCancelOrderResponse, error) {
id := e.Websocket.AuthConn.GenerateMessageID(false)
req := WsCancelOrderRequest{
Event: krakenWsCancelAll,
Token: e.websocketAuthToken(),
RequestID: id,
}
jsonResp, err := e.Websocket.AuthConn.SendMessageReturnResponse(ctx, request.Unset, id, req)
if err != nil {
return &WsCancelOrderResponse{}, err
}
var resp WsCancelOrderResponse
err = json.Unmarshal(jsonResp, &resp)
if err != nil {
return &WsCancelOrderResponse{}, err
}
if resp.ErrorMessage != "" {
return &WsCancelOrderResponse{}, errors.New(resp.ErrorMessage)
}
return &resp, nil
}
/*
One sub per-pair. We don't use one sub with many pairs because:
- Kraken will fan out in responses anyay
- resubscribe is messy when our subs don't match their respsonses
- FlushChannels and GetChannelDiff would incorrectly resub existing subs if we don't generate the same as we've stored
*/
const subTplText = `
{{- if $.S.Asset -}}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- channelName $.S }}
{{- $.PairSeparator }}
{{- end -}}
{{ $.AssetSeparator }}
{{- end -}}
{{- else -}}
{{- channelName $.S }}
{{- end }}
`
// websocketAuthToken retrieves the current websocket session's auth token
func (e *Exchange) websocketAuthToken() string {
e.wsAuthMtx.RLock()
defer e.wsAuthMtx.RUnlock()
return e.wsAuthToken
}
func (e *Exchange) setWebsocketAuthToken(token string) {
e.wsAuthMtx.Lock()
e.wsAuthToken = token
e.wsAuthMtx.Unlock()
}