diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index d768e7a4..7a6c4c27 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -114,14 +114,12 @@ func (k *Kraken) WsConnect() error { err) } else { go k.wsFunnelConnectionData(k.Websocket.AuthConn, comms) - var authsubs []stream.ChannelSubscription - authsubs, err = k.GenerateAuthenticatedSubscriptions() + err = k.wsAuthPingHandler() if err != nil { - return err - } - err = k.Websocket.SubscribeToChannels(authsubs) - if err != nil { - return err + log.Errorf(log.ExchangeSys, + "%v - failed setup ping handler for auth connection. Websocket may disconnect unexpectedly. %v\n", + k.Name, + err) } } } @@ -376,6 +374,20 @@ func (k *Kraken) wsPingHandler() error { return nil } +// wsAuthPingHandler sends a message "ping" every 27 to maintain the connection to the websocket +func (k *Kraken) wsAuthPingHandler() error { + message, err := json.Marshal(pingRequest) + if err != nil { + return err + } + k.Websocket.AuthConn.SetupPingHandler(stream.PingHandler{ + Message: message, + Delay: krakenWsPingDelay, + MessageType: websocket.TextMessage, + }) + return nil +} + // wsReadDataResponse classifies the WS response and sends to appropriate handler func (k *Kraken) wsReadDataResponse(response WebsocketDataResponse) error { if cID, ok := response[0].(float64); ok { @@ -1127,18 +1139,12 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, e }) } } - return subscriptions, nil -} - -// GenerateAuthenticatedSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (k *Kraken) GenerateAuthenticatedSubscriptions() ([]stream.ChannelSubscription, error) { - var subscriptions []stream.ChannelSubscription - for i := range authenticatedChannels { - params := make(map[string]interface{}) - subscriptions = append(subscriptions, stream.ChannelSubscription{ - Channel: authenticatedChannels[i], - Params: params, - }) + if k.Websocket.CanUseAuthenticatedEndpoints() { + for i := range authenticatedChannels { + subscriptions = append(subscriptions, stream.ChannelSubscription{ + Channel: authenticatedChannels[i], + }) + } } return subscriptions, nil } @@ -1179,7 +1185,7 @@ channels: if !channelsToSubscribe[i].Currency.IsEmpty() { outbound.Pairs = []string{channelsToSubscribe[i].Currency.String()} } - if channelsToSubscribe[i].Params != nil { + if common.StringDataContains(authenticatedChannels, channelsToSubscribe[i].Channel) { outbound.Subscription.Token = authToken } @@ -1255,6 +1261,9 @@ channels: }, RequestID: id, } + if common.StringDataContains(authenticatedChannels, channelsToUnsubscribe[x].Channel) { + unsub.Subscription.Token = authToken + } unsub.Channels = append(unsub.Channels, channelsToUnsubscribe[x]) unsubs = append(unsubs, unsub) }