Bitstamp: Add subscription configuration (#1620)

* Bitstamp: Add subscription configuration

* Bitstamp: Handle sub/unsub responses

* Bitstamp: Fix TestWsOrderbook
This commit is contained in:
Gareth Kirwan
2024-12-12 03:08:06 +00:00
committed by GitHub
parent ca62b0a4c2
commit 2a4c2d24a7
5 changed files with 224 additions and 185 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1,8 +1,6 @@
package bitstamp
import (
"errors"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/types"
)
@@ -21,8 +19,6 @@ const (
SellOrder
)
var errWSPairParsingError = errors.New("unable to parse currency pair from wsResponse.Channel")
// Ticker holds ticker information
type Ticker struct {
Last float64 `json:"last,string"`
@@ -220,10 +216,8 @@ type websocketData struct {
}
type websocketResponse struct {
Event string `json:"event"`
Channel string `json:"channel"`
channelType string
pair currency.Pair
Event string `json:"event"`
Channel string `json:"channel"`
}
type websocketTradeResponse struct {

View File

@@ -8,13 +8,16 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
@@ -30,19 +33,28 @@ const (
)
var (
errParsingWSField = errors.New("error parsing WS field")
errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel")
errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens")
errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores")
hbMsg = []byte(`{"event":"bts:heartbeat"}`)
defaultSubChannels = []string{
bitstampAPIWSTrades,
bitstampAPIWSOrderbook,
}
defaultAuthSubChannels = []string{
bitstampAPIWSMyOrders,
bitstampAPIWSMyTrades,
}
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Interval: kline.HundredMilliseconds},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true},
}
var subscriptionNames = map[string]string{
subscription.OrderbookChannel: bitstampAPIWSOrderbook,
subscription.AllTradesChannel: bitstampAPIWSTrades,
subscription.MyOrdersChannel: bitstampAPIWSMyOrders,
subscription.MyTradesChannel: bitstampAPIWSMyTrades,
}
// WsConnect connects to a websocket feed
func (b *Bitstamp) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
@@ -88,78 +100,55 @@ func (b *Bitstamp) wsReadData() {
}
func (b *Bitstamp) wsHandleData(respRaw []byte) error {
wsResponse := &websocketResponse{}
if err := json.Unmarshal(respRaw, wsResponse); err != nil {
return err
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w `event`: %w", errParsingWSField, err)
}
if err := b.parseChannelName(wsResponse); err != nil {
return err
}
switch wsResponse.Event {
case "bts:heartbeat":
event = strings.TrimPrefix(event, "bts:")
switch event {
case "heartbeat":
return nil
case "bts:subscribe", "bts:subscription_succeeded":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket subscription acknowledgement", b.Name)
}
case "bts:unsubscribe":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket unsubscribe acknowledgement", b.Name)
}
case "bts:request_reconnect":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.Name)
}
case "subscription_succeeded", "unsubscription_succeeded":
return b.handleWSSubscription(event, respRaw)
case "data":
return b.handleWSOrderbook(respRaw)
case "trade":
return b.handleWSTrade(respRaw)
case "order_created", "order_deleted", "order_changed":
return b.handleWSOrder(event, respRaw)
case "request_reconnect":
go func() {
err := b.Websocket.Shutdown()
if err != nil {
if err := b.Websocket.Shutdown(); err != nil { // Connection monitor will reconnect
log.Errorf(log.WebsocketMgr, "%s failed to shutdown websocket: %v", b.Name, err)
}
}() // Connection monitor will reconnect
case "data":
if err := b.handleWSOrderbook(wsResponse, respRaw); err != nil {
return err
}
case "trade":
if err := b.handleWSTrade(wsResponse, respRaw); err != nil {
return err
}
case "order_created", "order_deleted", "order_changed":
// Only process MyOrders, not orders from the LiveOrder channel
if wsResponse.channelType == bitstampAPIWSMyOrders {
if err := b.handleWSOrder(wsResponse, respRaw); err != nil {
return err
}
}
}()
default:
b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)}
}
return nil
}
func (b *Bitstamp) handleWSOrderbook(wsResp *websocketResponse, msg []byte) error {
if wsResp.pair.IsEmpty() {
return errWSPairParsingError
}
wsOrderBookTemp := websocketOrderBookResponse{}
err := json.Unmarshal(msg, &wsOrderBookTemp)
func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return err
return fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
}
return b.wsUpdateOrderbook(&wsOrderBookTemp.Data, wsResp.pair, asset.Spot)
event = strings.TrimSuffix(event, "scription_succeeded")
if !b.Websocket.Match.IncomingWithData(event+":"+channel, respRaw) {
return fmt.Errorf("%w: %s", stream.ErrNoMessageListener, event+":"+channel)
}
return nil
}
func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
func (b *Bitstamp) handleWSTrade(msg []byte) error {
if !b.IsSaveTradeDataEnabled() {
return nil
}
if wsResp.pair.IsEmpty() {
return errWSPairParsingError
_, p, err := b.parseChannelName(msg)
if err != nil {
return err
}
wsTradeTemp := websocketTradeResponse{}
@@ -173,7 +162,7 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
}
return trade.AddTradesToBuffer(b.Name, trade.Data{
Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0),
CurrencyPair: wsResp.pair,
CurrencyPair: p,
AssetType: asset.Spot,
Exchange: b.Name,
Price: wsTradeTemp.Data.Price,
@@ -183,7 +172,15 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
})
}
func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
func (b *Bitstamp) handleWSOrder(event string, msg []byte) error {
channel, p, err := b.parseChannelName(msg)
if err != nil {
return err
}
if channel != bitstampAPIWSMyOrders {
return nil // Only process MyOrders, not orders from the LiveOrder channel
}
r := &websocketOrderResponse{}
if err := json.Unmarshal(msg, &r); err != nil {
return err
@@ -194,7 +191,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
}
var status order.Status
switch wsResp.Event {
switch event {
case "order_created":
status = order.New
case "order_changed":
@@ -224,7 +221,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
Status: status,
AssetType: asset.Spot,
Date: r.Order.Microtimestamp.Time(),
Pair: wsResp.pair,
Pair: p,
}
b.Websocket.DataHandler <- d
@@ -232,101 +229,78 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
return nil
}
func (b *Bitstamp) generateDefaultSubscriptions() (subscription.List, error) {
enabledCurrencies, err := b.GetEnabledPairs(asset.Spot)
func (b *Bitstamp) generateSubscriptions() (subscription.List, error) {
return b.Features.Subscriptions.ExpandTemplates(b)
}
// GetSubscriptionTemplate returns a subscription channel template
func (b *Bitstamp) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from a list of channels
func (b *Bitstamp) Subscribe(subs subscription.List) error {
return b.manageSubsWithCreds(subs, "sub")
}
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (b *Bitstamp) Unsubscribe(subs subscription.List) error {
return b.manageSubsWithCreds(subs, "unsub")
}
func (b *Bitstamp) manageSubsWithCreds(subs subscription.List, op string) error {
var errs error
var creds *WebsocketAuthResponse
if authed := subs.Private(); len(authed) > 0 {
creds, errs = b.FetchWSAuth(context.TODO())
}
return common.AppendError(errs, b.ParallelChanOp(subs, func(s subscription.List) error { return b.manageSubs(s, op, creds) }, 1))
}
func (b *Bitstamp) manageSubs(subs subscription.List, op string, creds *WebsocketAuthResponse) error {
subs, errs := subs.ExpandTemplates(b)
for _, s := range subs {
req := websocketEventRequest{
Event: "bts:" + op + "scribe",
Data: websocketData{
Channel: s.QualifiedChannel,
},
}
if s.Authenticated {
if creds == nil {
return request.ErrAuthRequestFailed
}
req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(creds.UserID))
req.Data.Auth = creds.Token
}
_, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, op+":"+req.Data.Channel, req)
if err == nil {
if op == "sub" {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)
} else {
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)
}
}
if err != nil {
errs = common.AppendError(errs, err)
}
}
return errs
}
func (b *Bitstamp) handleWSOrderbook(msg []byte) error {
_, p, err := b.parseChannelName(msg)
if err != nil {
return nil, err
}
var subscriptions subscription.List
for i := range enabledCurrencies {
p, err := b.FormatExchangeCurrency(enabledCurrencies[i], asset.Spot)
if err != nil {
return nil, err
}
for j := range defaultSubChannels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: defaultSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pairs: currency.Pairs{p},
})
}
if b.Websocket.CanUseAuthenticatedEndpoints() {
for j := range defaultAuthSubChannels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: defaultAuthSubChannels[j] + "_" + p.String(),
Asset: asset.Spot,
Pairs: currency.Pairs{p},
Params: map[string]interface{}{
"auth": struct{}{},
},
})
}
}
}
return subscriptions, nil
}
// Subscribe sends a websocket message to receive data from the channel
func (b *Bitstamp) Subscribe(channelsToSubscribe subscription.List) error {
var errs error
var auth *WebsocketAuthResponse
for i := range channelsToSubscribe {
if _, ok := channelsToSubscribe[i].Params["auth"]; ok {
var err error
auth, err = b.FetchWSAuth(context.TODO())
if err != nil {
errs = common.AppendError(errs, err)
}
break
}
return err
}
for _, s := range channelsToSubscribe {
req := websocketEventRequest{
Event: "bts:subscribe",
Data: websocketData{
Channel: s.Channel,
},
}
if _, ok := s.Params["auth"]; ok && auth != nil {
req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(auth.UserID))
req.Data.Auth = auth.Token
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
if err == nil {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)
}
if err != nil {
errs = common.AppendError(errs, err)
}
wsOrderBookResp := websocketOrderBookResponse{}
if err := json.Unmarshal(msg, &wsOrderBookResp); err != nil {
return err
}
update := &wsOrderBookResp.Data
return errs
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *Bitstamp) Unsubscribe(channelsToUnsubscribe subscription.List) error {
var errs error
for _, s := range channelsToUnsubscribe {
req := websocketEventRequest{
Event: "bts:unsubscribe",
Data: websocketData{
Channel: s.Channel,
},
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
if err == nil {
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)
}
if err != nil {
errs = common.AppendError(errs, err)
}
}
return errs
}
func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair, assetType asset.Item) error {
if len(update.Asks) == 0 && len(update.Bids) == 0 {
return errors.New("no orderbook data")
}
@@ -336,7 +310,7 @@ func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair
Asks: make(orderbook.Tranches, len(update.Asks)),
Pair: p,
LastUpdated: time.UnixMicro(update.Microtimestamp),
Asset: assetType,
Asset: asset.Spot,
Exchange: b.Name,
VerifyOrderbook: b.CanVerifyOrderbook,
}
@@ -427,35 +401,58 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err
return resp, nil
}
// parseChannel splits the ws response channel and sets the channel type and pair
func (b *Bitstamp) parseChannelName(r *websocketResponse) error {
if r.Channel == "" {
return nil
// parseChannelName splits the ws message channel and returns the channel name and pair
func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
}
chanName := r.Channel
authParts := strings.Split(r.Channel, "-")
authParts := strings.Split(channel, "-")
switch len(authParts) {
case 1:
// Not an auth channel
case 3:
chanName = authParts[1]
channel = authParts[1]
default:
return fmt.Errorf("channel name does not contain exactly 0 or 2 hyphens: %v", r.Channel)
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelHyphens, channel)
}
parts := strings.Split(chanName, "_")
parts := strings.Split(channel, "_")
if len(parts) != 3 {
return fmt.Errorf("%w: channel name does not contain exactly 2 underscores: %v", errWSPairParsingError, r.Channel)
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelUnderscores, channel)
}
r.channelType = parts[0] + "_" + parts[1]
symbol := parts[2]
enabledPairs, err := b.GetEnabledPairs(asset.Spot)
if err == nil {
r.pair, err = enabledPairs.DeriveFrom(symbol)
if err != nil {
return "", currency.EMPTYPAIR, err
}
return err
pair, err := enabledPairs.DeriveFrom(parts[2])
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errParsingWSPair, err)
}
return parts[0] + "_" + parts[1], pair, nil
}
// channelName converts global channel Names to exchange specific ones
// panics if name is not supported, so should be called within a recover chain
func channelName(s *subscription.Subscription) string {
if s, ok := subscriptionNames[s.Channel]; ok {
return s
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}
const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- with $name := channelName $.S }}
{{- range $p := $pairs -}}
{{- $name -}} _ {{- $p -}}
{{ $.PairSeparator }}
{{- end -}}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
`

View File

@@ -107,6 +107,7 @@ func (b *Bitstamp) SetDefaults() {
GlobalResultLimit: 1000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
b.Requester, err = request.New(b.Name,
@@ -156,7 +157,7 @@ func (b *Bitstamp) Setup(exch *config.Exchange) error {
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
GenerateSubscriptions: b.generateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
})
if err != nil {