HitBTC: Add subscription configuration (#1639)

* HitBTC: Upgrade test config

* HitBTC: Add subscription configuration
This commit is contained in:
Gareth Kirwan
2024-12-17 00:04:23 +00:00
committed by GitHub
parent 0d7b776e87
commit 16e5398dd5
7 changed files with 221 additions and 117 deletions

View File

@@ -93,12 +93,24 @@ if err != nil {
}
```
### How to do Websocket public/private calls
### Subscriptions
```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
```
Subscriptions are for [v2 api](https://hitbtc-com.github.io/hitbtc-api/#socket-api-reference)
All subscriptions are for spot.
Default Public Subscriptions:
- Ticker
- Orderbook
- Candles ( Interval: 30 minutes, History: 100 )
- All Trades ( History: 100 )
Default Authenticated Subscriptions:
- My Account events
Subscriptions are subject to enabled assets and pairs.
Configure Levels for number of history entries to return for applicable APIs.
### Please click GoDocs chevron above to view current GoDoc information for this package
{{template "contributions"}}

View File

@@ -111,12 +111,24 @@ if err != nil {
}
```
### How to do Websocket public/private calls
### Subscriptions
```go
// Exchanges will be abstracted out in further updates and examples will be
// supplied then
```
Subscriptions are for [v2 api](https://hitbtc-com.github.io/hitbtc-api/#socket-api-reference)
All subscriptions are for spot.
Default Public Subscriptions:
- Ticker
- Orderbook
- Candles ( Interval: 30 minutes, History: 100 )
- All Trades ( History: 100 )
Default Authenticated Subscriptions:
- My Account events
Subscriptions are subject to enabled assets and pairs.
Configure Levels for number of history entries to return for applicable APIs.
### Please click GoDocs chevron above to view current GoDoc information for this package

View File

@@ -21,7 +21,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -1007,7 +1009,7 @@ func Test_FormatExchangeKlineInterval(t *testing.T) {
test := testCases[x]
t.Run(test.name, func(t *testing.T) {
t.Parallel()
ret, err := h.FormatExchangeKlineInterval(test.interval)
ret, err := formatExchangeKlineInterval(test.interval)
if err != nil {
t.Fatal(err)
}
@@ -1090,3 +1092,67 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
h := new(HitBTC)
require.NoError(t, testexch.Setup(h), "Test instance Setup must not error")
h.Websocket.SetCanUseAuthenticatedEndpoints(true)
require.True(t, h.Websocket.CanUseAuthenticatedEndpoints(), "CanUseAuthenticatedEndpoints must return true")
subs, err := h.generateSubscriptions()
require.NoError(t, err, "generateSubscriptions must not error")
exp := subscription.List{}
pairs, err := h.GetEnabledPairs(asset.Spot)
require.NoErrorf(t, err, "GetEnabledPairs must not error")
for _, s := range h.Features.Subscriptions {
for _, p := range pairs.Format(currency.PairFormat{Uppercase: true}) {
s = s.Clone()
s.Pairs = currency.Pairs{p}
n := subscriptionNames[s.Channel]
switch s.Channel {
case subscription.MyAccountChannel:
s.QualifiedChannel = `{"method":"` + n + `"}`
case subscription.CandlesChannel:
s.QualifiedChannel = `{"method":"` + n + `","params":{"symbol":"` + p.String() + `","period":"M30","limit":100}}`
case subscription.AllTradesChannel:
s.QualifiedChannel = `{"method":"` + n + `","params":{"symbol":"` + p.String() + `","limit":100}}`
default:
s.QualifiedChannel = `{"method":"` + n + `","params":{"symbol":"` + p.String() + `"}}`
}
exp = append(exp, s)
}
}
testsubs.EqualLists(t, exp, subs)
}
func TestIsSymbolChannel(t *testing.T) {
t.Parallel()
assert.True(t, isSymbolChannel(&subscription.Subscription{Channel: subscription.TickerChannel}))
assert.False(t, isSymbolChannel(&subscription.Subscription{Channel: subscription.MyAccountChannel}))
}
func TestSubToReq(t *testing.T) {
t.Parallel()
p := currency.NewPairWithDelimiter("BTC", "USD", "-")
r := subToReq(&subscription.Subscription{Channel: subscription.TickerChannel}, p)
assert.Equal(t, "Ticker", r.Method)
assert.Equal(t, "BTC-USD", (r.Params.Symbol))
r = subToReq(&subscription.Subscription{Channel: subscription.CandlesChannel, Levels: 4, Interval: kline.OneHour}, p)
assert.Equal(t, "Candles", r.Method)
assert.Equal(t, "H1", r.Params.Period)
assert.Equal(t, 4, r.Params.Limit)
assert.Equal(t, "BTC-USD", (r.Params.Symbol))
r = subToReq(&subscription.Subscription{Channel: subscription.AllTradesChannel, Levels: 150})
assert.Equal(t, "Trades", r.Method)
assert.Equal(t, 150, r.Params.Limit)
assert.PanicsWithError(t,
"subscription channel not supported: myTrades",
func() { subToReq(&subscription.Subscription{Channel: subscription.MyTradesChannel}, p) },
"should panic on invalid channel",
)
}

View File

@@ -294,24 +294,17 @@ type ResponseError struct {
// WsRequest defines a request obj for the JSON-RPC and gets a websocket response
type WsRequest struct {
Method string `json:"method"`
Params WsParams `json:"params,omitempty"`
ID int64 `json:"id"`
}
// WsNotification defines a notification obj for the JSON-RPC this does not get
// a websocket response
type WsNotification struct {
JSONRPCVersion string `json:"jsonrpc,omitempty"`
Method string `json:"method"`
Params WsParams `json:"params"`
JSONRPCVersion string `json:"jsonrpc,omitempty"`
Method string `json:"method"`
Params *WsParams `json:"params,omitempty"`
ID int64 `json:"id,omitempty"`
}
// WsParams are websocket params for a request
type WsParams struct {
Symbol string `json:"symbol,omitempty"`
Period string `json:"period,omitempty"`
Limit int64 `json:"limit,omitempty"`
Limit int `json:"limit,omitempty"`
Symbols []string `json:"symbols,omitempty"`
}

View File

@@ -8,13 +8,16 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
@@ -31,6 +34,22 @@ const (
errAuthFailed = 1002
)
var subscriptionNames = map[string]string{
subscription.TickerChannel: "Ticker",
subscription.OrderbookChannel: "Orderbook",
subscription.CandlesChannel: "Candles",
subscription.AllTradesChannel: "Trades",
subscription.MyAccountChannel: "Reports",
}
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.ThirtyMin, Levels: 100},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel, Levels: 100},
{Enabled: true, Asset: asset.Spot, Channel: subscription.MyAccountChannel, Authenticated: true},
}
// WsConnect starts a new connection with the websocket API
func (h *HitBTC) WsConnect() error {
if !h.Websocket.IsEnabled() || !h.IsEnabled() {
@@ -465,104 +484,54 @@ func (h *HitBTC) WsProcessOrderbookUpdate(update *WsOrderbook) error {
})
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HitBTC) GenerateDefaultSubscriptions() (subscription.List, error) {
var channels = []string{
"Ticker",
"Orderbook",
"Trades",
"Candles",
}
var subscriptions subscription.List
if h.Websocket.CanUseAuthenticatedEndpoints() {
subscriptions = append(subscriptions, &subscription.Subscription{Channel: "Reports"})
}
pairs, err := h.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
pairFmt, err := h.GetPairFormat(asset.Spot, true)
if err != nil {
return nil, err
}
pairFmt.Delimiter = ""
pairs = pairs.Format(pairFmt)
for i := range channels {
for j := range pairs {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[i],
Pairs: currency.Pairs{pairs[j]},
Asset: asset.Spot,
})
}
}
return subscriptions, nil
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (h *HitBTC) generateSubscriptions() (subscription.List, error) {
return h.Features.Subscriptions.ExpandTemplates(h)
}
// GetSubscriptionTemplate returns a subscription channel template
func (h *HitBTC) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(sprig.FuncMap()).Funcs(template.FuncMap{
"subToReq": subToReq,
"isSymbolChannel": isSymbolChannel,
}).Parse(subTplText)
}
const (
subscribeOp = "subscribe"
unsubscribeOp = "unsubscribe"
)
// Subscribe sends a websocket message to receive data from the channel
func (h *HitBTC) Subscribe(channelsToSubscribe subscription.List) error {
var errs error
for _, s := range channelsToSubscribe {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
pair := s.Pairs[0]
r := WsRequest{
Method: "subscribe" + s.Channel,
ID: h.Websocket.Conn.GenerateMessageID(false),
Params: WsParams{
Symbol: pair.String(),
},
}
switch s.Channel {
case "Trades":
r.Params.Limit = 100
case "Candles":
r.Params.Period = "M30"
r.Params.Limit = 100
}
err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r)
if err == nil {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, s)
}
if err != nil {
errs = common.AppendError(errs, err)
}
}
return errs
func (h *HitBTC) Subscribe(subs subscription.List) error {
return h.ParallelChanOp(subs, func(subs subscription.List) error { return h.manageSubs(subscribeOp, subs) }, 1)
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HitBTC) Unsubscribe(subs subscription.List) error {
return h.ParallelChanOp(subs, func(subs subscription.List) error { return h.manageSubs(unsubscribeOp, subs) }, 1)
}
func (h *HitBTC) manageSubs(op string, subs subscription.List) error {
var errs error
subs, errs = subs.ExpandTemplates(h)
for _, s := range subs {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
pair := s.Pairs[0]
r := WsNotification{
r := WsRequest{
JSONRPCVersion: rpcVersion,
Method: "unsubscribe" + s.Channel,
Params: WsParams{
Symbol: pair.String(),
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
switch s.Channel {
case "Trades":
r.Params.Limit = 100
case "Candles":
r.Params.Period = "M30"
r.Params.Limit = 100
if err := json.Unmarshal([]byte(s.QualifiedChannel), &r); err != nil {
errs = common.AppendError(errs, err)
continue
}
err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r)
r.Method = op + r.Method
err := h.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, r) // v2 api does not return an ID with errors, so we don't use ReturnResponse
if err == nil {
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, s)
if op == subscribeOp {
err = h.Websocket.AddSuccessfulSubscriptions(h.Websocket.Conn, s)
} else {
err = h.Websocket.RemoveSubscriptions(h.Websocket.Conn, s)
}
}
if err != nil {
errs = common.AppendError(errs, err)
@@ -838,3 +807,50 @@ func (h *HitBTC) wsGetTrades(c currency.Pair, limit int64, sort, by string) (*Ws
}
return &response, nil
}
// subToReq returns the subscription as a map to populate WsRequest
func subToReq(s *subscription.Subscription, maybePair ...currency.Pair) *WsRequest {
name, ok := subscriptionNames[s.Channel]
if !ok {
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}
r := &WsRequest{
Method: name,
}
if len(maybePair) != 0 {
r.Params = &WsParams{
Symbol: maybePair[0].String(),
Limit: s.Levels,
}
if s.Interval != 0 {
var err error
if r.Params.Period, err = formatExchangeKlineInterval(s.Interval); err != nil {
panic(err)
}
}
} else if s.Levels != 0 {
r.Params = &WsParams{
Limit: s.Levels,
}
}
return r
}
// isSymbolChannel returns if the channel expects receive a symbol
func isSymbolChannel(s *subscription.Subscription) bool {
return s.Channel != subscription.MyAccountChannel
}
const subTplText = `
{{- if isSymbolChannel $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- range $p := $pairs -}}
{{- subToReq $.S $p | mustToJson }}
{{ $.PairSeparator }}
{{- end }}
{{ $.AssetSeparator }}
{{- end }}
{{- else }}
{{- subToReq $.S | mustToJson }}
{{- end }}
`

View File

@@ -108,6 +108,7 @@ func (h *HitBTC) SetDefaults() {
GlobalResultLimit: 1000,
},
},
Subscriptions: defaultSubscriptions.Clone(),
}
h.Requester, err = request.New(h.Name,
@@ -157,7 +158,7 @@ func (h *HitBTC) Setup(exch *config.Exchange) error {
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
GenerateSubscriptions: h.generateSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
@@ -780,8 +781,8 @@ func (h *HitBTC) ValidateAPICredentials(ctx context.Context, assetType asset.Ite
return h.CheckTransientError(err)
}
// FormatExchangeKlineInterval returns Interval to exchange formatted string
func (h *HitBTC) FormatExchangeKlineInterval(in kline.Interval) (string, error) {
// formatExchangeKlineInterval returns Interval to exchange formatted string
func formatExchangeKlineInterval(in kline.Interval) (string, error) {
switch in {
case kline.OneMin:
return "M1", nil
@@ -814,7 +815,7 @@ func (h *HitBTC) GetHistoricCandles(ctx context.Context, pair currency.Pair, a a
return nil, err
}
formattedInterval, err := h.FormatExchangeKlineInterval(req.ExchangeInterval)
formattedInterval, err := formatExchangeKlineInterval(req.ExchangeInterval)
if err != nil {
return nil, err
}
@@ -850,7 +851,7 @@ func (h *HitBTC) GetHistoricCandlesExtended(ctx context.Context, pair currency.P
return nil, err
}
formattedInterval, err := h.FormatExchangeKlineInterval(req.ExchangeInterval)
formattedInterval, err := formatExchangeKlineInterval(req.ExchangeInterval)
if err != nil {
return nil, err
}

File diff suppressed because one or more lines are too long