Huobi: Add subscription configuration (#1604)

* Huobi: Update test config format

* Huobi: Add subscription configuration

* Huobi: Add subscription documentation

* Huobi: Clarify OB sub Levels usage

* Huobi: Enable websocket for tests

* Subscriptions: Rename ErrPrivateChannelName

Rename ErrPrivateChannelName to ErrUseConstChannelName
This commit is contained in:
Gareth Kirwan
2024-11-11 04:19:25 +01:00
committed by GitHub
parent d4c4bf1bcd
commit a2a7fed273
9 changed files with 209 additions and 98 deletions

View File

@@ -110,12 +110,24 @@ if err != nil {
}
```
### How to do Websocket public/private calls
### Subscriptions
```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
All subscriptions are for spot only.
Default Public Subscriptions:
- Ticker
- Candles ( Interval: 1min )
- Orderbook ( Level: 0 - No aggregation )
- Configure Level: 1-5 for depth aggregation, for example:
```json
{"enabled": true, "channel": "orderbook", "asset": "spot", "levels": 1}
```
- Trades
Default Authenticated Subscriptions:
- Account Trades
- Account Orders
- Account Updates
### Please click GoDocs chevron above to view current GoDoc information for this package

View File

@@ -25,8 +25,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -2927,3 +2929,65 @@ func TestGetCurrencyTradeURL(t *testing.T) {
}
}
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
for _, s := range h.Features.Subscriptions {
if s.Authenticated && !h.Websocket.CanUseAuthenticatedEndpoints() {
continue
}
for _, a := range h.GetAssetTypes(true) {
if s.Asset != asset.All && s.Asset != a {
continue
}
pairs, err := h.GetEnabledPairs(a)
require.NoErrorf(t, err, "GetEnabledPairs %s must not error", a)
pairs = common.SortStrings(pairs).Format(currency.PairFormat{Uppercase: false, Delimiter: ""})
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.Asset = a
for i, p := range pairs {
s := s.Clone() //nolint:govet // Intentional lexical scope shadow
s.QualifiedChannel = channelName(s, p)
switch s.Channel {
case subscription.OrderbookChannel:
s.QualifiedChannel += ".step0"
case subscription.CandlesChannel:
s.QualifiedChannel += ".1min"
}
s.Pairs = pairs[i : i+1]
exp = append(exp, s)
}
}
}
testsubs.EqualLists(t, exp, subs)
}
// TestSubscribe exercises live public subscriptions
func TestSubscribe(t *testing.T) {
t.Parallel()
h := new(HUOBI)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
subs, err := h.Features.Subscriptions.ExpandTemplates(h)
require.NoError(t, err, "ExpandTemplates must not error")
testexch.SetupWs(t, h)
err = h.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
got := h.Websocket.GetSubscriptions()
require.Equal(t, 4, len(got), "Must get correct number of subscriptions")
for _, s := range got {
assert.Equal(t, subscription.SubscribedState, s.State())
}
}
func TestChannelName(t *testing.T) {
p := currency.NewPair(currency.BTC, currency.USD)
assert.Equal(t, "market.BTCUSD.kline", channelName(&subscription.Subscription{Channel: subscription.CandlesChannel}, p))
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: wsOrderbookChannel}, p) })
assert.Panics(t, func() { channelName(&subscription.Subscription{Channel: subscription.MyAccountChannel}, p) }, "Should panic on V2 endpoints until implemented")
}

View File

@@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
@@ -17,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
@@ -31,11 +33,13 @@ const (
baseWSURL = "wss://api.huobi.pro"
futuresWSURL = "wss://api.hbdm.com/"
wsMarketURL = baseWSURL + "/ws"
wsMarketKline = "market.%s.kline.1min"
wsMarketDepth = "market.%s.depth.step0"
wsMarketTrade = "market.%s.trade.detail"
wsMarketTicker = "market.%s.detail"
wsMarketURL = baseWSURL + "/ws"
wsCandlesChannel = "market.%s.kline"
wsOrderbookChannel = "market.%s.depth"
wsTradesChannel = "market.%s.trade.detail"
wsMarketDetailChannel = "market.%s.detail"
wsMyOrdersChannel = "orders.%s"
wsMyTradesChannel = "orders.%s.update"
wsAccountsOrdersEndPoint = "/ws/v1"
wsAccountsList = "accounts.list"
@@ -56,6 +60,28 @@ const (
loginDelay = 50 * time.Millisecond
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneMin},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel, Levels: 0}, // Aggregation Levels; 0 is no depth aggregation
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyOrdersChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyTradesChannel, Authenticated: true},
{Enabled: true, Channel: subscription.MyAccountChannel, Authenticated: true},
}
var subscriptionNames = map[string]string{
subscription.TickerChannel: wsMarketDetailChannel,
subscription.CandlesChannel: wsCandlesChannel,
subscription.OrderbookChannel: wsOrderbookChannel,
subscription.AllTradesChannel: wsTradesChannel,
/* TODO: Pending upcoming V2 support, these are dropped from the translation table so that the sub conf will be correct and not need upgrading, but will error on usage
subscription.MyTradesChannel: wsMyOrdersChannel,
subscription.MyOrdersChannel: wsMyTradesChannel,
subscription.MyAccountChannel: wsMyAccountChannel,
*/
}
// Instantiates a communications channel between websocket connections
var comms = make(chan WsMessage)
@@ -514,101 +540,66 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
return h.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HUOBI) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{wsMarketKline,
wsMarketDepth,
wsMarketTrade,
wsMarketTicker}
var subscriptions subscription.List
if h.Websocket.CanUseAuthenticatedEndpoints() {
channels = append(channels, "orders.%v", "orders.%v.update")
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: "accounts",
})
}
enabledCurrencies, err := h.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
for i := range channels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = ""
channel := fmt.Sprintf(channels[i],
enabledCurrencies[j].Lower().String())
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channel,
Pairs: currency.Pairs{enabledCurrencies[j]},
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (h *HUOBI) generateSubscriptions() (subscription.List, error) {
return h.Features.Subscriptions.ExpandTemplates(h)
}
// GetSubscriptionTemplate returns a subscription channel template
func (h *HUOBI) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": h.FormatExchangeKlineInterval,
}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from the channel
func (h *HUOBI) Subscribe(channelsToSubscribe subscription.List) error {
func (h *HUOBI) Subscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToSubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToSubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToSubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"sub",
wsAccountsOrdersEndPoint+channelsToSubscribe[i].Channel,
channelsToSubscribe[i].Channel)
if s.Authenticated {
if err = h.wsAuthenticatedSubscribe(creds, "sub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, s)
}
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Subscribe: channelsToSubscribe[i].Channel,
})
}
if err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, channelsToSubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
if err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Subscribe: s.QualifiedChannel}); err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.AuthConn, s)
}
}
errs = common.AppendError(errs, err)
}
return nil
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HUOBI) Unsubscribe(channelsToUnsubscribe subscription.List) error {
func (h *HUOBI) Unsubscribe(subs subscription.List) error {
ctx := context.Background()
var errs error
var creds *account.Credentials
if h.Websocket.CanUseAuthenticatedEndpoints() {
var err error
creds, err = h.GetCredentials(context.TODO())
if err != nil {
return err
if len(subs.Private()) > 0 {
if creds, errs = h.GetCredentials(ctx); errs != nil {
return errs
}
}
var errs error
for i := range channelsToUnsubscribe {
for _, s := range subs {
var err error
if (strings.Contains(channelsToUnsubscribe[i].Channel, "orders.") ||
strings.Contains(channelsToUnsubscribe[i].Channel, "accounts")) && creds != nil {
err = h.wsAuthenticatedSubscribe(creds,
"unsub",
wsAccountsOrdersEndPoint+channelsToUnsubscribe[i].Channel,
channelsToUnsubscribe[i].Channel)
if s.Authenticated {
err = h.wsAuthenticatedSubscribe(creds, "unsub", wsAccountsOrdersEndPoint+"/"+s.QualifiedChannel, s.QualifiedChannel)
} else {
err = h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, WsRequest{
Unsubscribe: channelsToUnsubscribe[i].Channel,
})
err = h.Websocket.Conn.SendJSONMessage(ctx, request.Unset, WsRequest{Unsubscribe: s.QualifiedChannel})
}
if err == nil {
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, channelsToUnsubscribe[i])
}
if err != nil {
errs = common.AppendError(errs, err)
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, s)
}
errs = common.AppendError(errs, err)
}
return errs
}
@@ -810,3 +801,31 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe
}
return &response, nil
}
// channelName converts global channel Names used in config of channel input into exchange channel names
// returns the name unchanged if no match is found
func channelName(s *subscription.Subscription, p currency.Pair) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return fmt.Sprintf(n, p)
}
if s.Authenticated {
panic(fmt.Errorf("%w: Private endpoints not currently supported", common.ErrNotYetImplemented))
}
panic(subscription.ErrUseConstChannelName)
}
const subTplText = `
{{- if $.S.Asset }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs }}
{{- channelName $.S $p -}}
{{- if eq $.S.Channel "candles" -}} . {{- interval $.S.Interval }}{{ end }}
{{- if eq $.S.Channel "orderbook" -}} .step {{- $.S.Levels }}{{ end }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{- else -}}
{{ channelName $.S nil }}
{{- end }}
`

View File

@@ -162,6 +162,7 @@ func (h *HUOBI) SetDefaults() {
GlobalResultLimit: 2000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
h.Requester, err = request.New(h.Name,
@@ -213,7 +214,7 @@ func (h *HUOBI) Setup(exch *config.Exchange) error {
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
GenerateSubscriptions: h.generateSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
})
if err != nil {

View File

@@ -1001,7 +1001,7 @@ func TestWsSubscribe(t *testing.T) {
Channel: c,
Pairs: currency.Pairs{spotTestPair},
}})
assert.ErrorIs(t, err, subscription.ErrPrivateChannelName, "Must error when trying to use a private channel name")
assert.ErrorIs(t, err, subscription.ErrUseConstChannelName, "Must error when trying to use a private channel name")
assert.ErrorContains(t, err, c+" => subscription.CandlesChannel", "Must error when trying to use a private channel name")
}
}
@@ -1703,6 +1703,6 @@ func TestEnforceStandardChannelNames(t *testing.T) {
}
for _, n := range []string{krakenWsOrderbook, krakenWsOHLC, krakenWsTrade, krakenWsOwnTrades, krakenWsOpenOrders, krakenWsOrderbook + "-5"} {
err := enforceStandardChannelNames(&subscription.Subscription{Channel: n})
assert.ErrorIsf(t, err, subscription.ErrPrivateChannelName, "Private channel names should not be allowed for %s", n)
assert.ErrorIsf(t, err, subscription.ErrUseConstChannelName, "Private channel names should not be allowed for %s", n)
}
}

View File

@@ -1237,7 +1237,7 @@ func channelName(s *subscription.Subscription) string {
func enforceStandardChannelNames(s *subscription.Subscription) error {
name := strings.Split(s.Channel, "-") // Protect against attempted usage of book-N as a channel name
if n, ok := reverseChannelNames[name[0]]; ok && n != s.Channel {
return fmt.Errorf("%w: %s => subscription.%s%sChannel", subscription.ErrPrivateChannelName, s.Channel, bytes.ToUpper([]byte{n[0]}), n[1:])
return fmt.Errorf("%w: %s => subscription.%s%sChannel", subscription.ErrUseConstChannelName, s.Channel, bytes.ToUpper([]byte{n[0]}), n[1:])
}
return nil
}

View File

@@ -32,6 +32,7 @@ const (
MyTradesChannel = "myTrades"
MyOrdersChannel = "myOrders"
MyWalletChannel = "myWallet"
MyAccountChannel = "myAccount"
)
// Public errors
@@ -42,7 +43,7 @@ var (
ErrInStateAlready = errors.New("subscription already in state")
ErrInvalidState = errors.New("invalid subscription state")
ErrDuplicate = errors.New("duplicate subscription")
ErrPrivateChannelName = errors.New("must use standard channel name constants")
ErrUseConstChannelName = errors.New("must use standard channel name constants")
)
// State tracks the status of a subscription channel