Gemini: Add subscription configuration (#1625)

* Gemini: Upgrade test config

* Gemini: Add subscription configuration

* Gemini: Fix negative waitgroup on shutdown

Fixes #1738
This commit is contained in:
Gareth Kirwan
2024-12-06 09:13:32 +07:00
committed by GitHub
parent 0c4b070ebc
commit 66a3ff021e
5 changed files with 101 additions and 45 deletions

View File

@@ -16,10 +16,13 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -1299,3 +1302,34 @@ func TestGetCurrencyTradeURL(t *testing.T) {
assert.NotEmpty(t, resp)
}
}
func TestGenerateSubscriptions(t *testing.T) {
t.Parallel()
g := new(Gemini)
require.NoError(t, testexch.Setup(g), "Test instance Setup must not error")
p := currency.Pairs{currency.NewPairWithDelimiter("BTC", "USD", ""), currency.NewPairWithDelimiter("ETH", "BTC", "")}
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, false))
require.NoError(t, g.CurrencyPairs.StorePairs(asset.Spot, p, true))
subs, err := g.generateSubscriptions()
require.NoError(t, err)
exp := subscription.List{
{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, QualifiedChannel: "candles_1d", Interval: kline.OneDay},
{Asset: asset.Spot, Channel: subscription.OrderbookChannel, Pairs: p, QualifiedChannel: "l2"},
}
testsubs.EqualLists(t, exp, subs)
for _, i := range []kline.Interval{kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour} {
subs, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: i}}.ExpandTemplates(g)
assert.NoErrorf(t, err, "ExpandTemplates should not error on interval %s", i)
require.NotEmpty(t, subs)
assert.Equal(t, "candles_"+i.Short(), subs[0].QualifiedChannel)
}
_, err = subscription.List{{Asset: asset.Spot, Channel: subscription.CandlesChannel, Pairs: p, Interval: kline.FourHour}}.ExpandTemplates(g)
assert.ErrorIs(t, err, kline.ErrUnsupportedInterval, "ExpandTemplates should error on invalid interval")
assert.PanicsWithError(t,
"subscription channel not supported: wibble",
func() { channelName(&subscription.Subscription{Channel: "wibble"}) },
"should panic on invalid channel",
)
}

View File

@@ -5,17 +5,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/types"
)
const (
marketDataLevel2 = "l2"
candles1m = "candles_1m"
candles5m = "candles_5m"
candles15m = "candles_15m"
candles30m = "candles_30m"
candles1hr = "candles_1h"
candles6hr = "candles_6h"
candles1d = "candles_1d"
)
// Ticker holds returned ticker data from the exchange
type Ticker struct {
Ask float64 `json:"ask,string"`

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
@@ -17,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
@@ -33,6 +35,23 @@ const (
geminiWsOrderEvents = "order/events"
)
const (
marketDataLevel2 = "l2"
candlesChannel = "candles"
)
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel, Interval: kline.OneDay},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
// Authenticated connection is directly to the orders URI, so this is implicit
// {Enabled: true, Channel: subscription.MyOrdersChannel, Authenticated: true},
}
var subscriptionNames = map[string]string{
subscription.CandlesChannel: candlesChannel,
subscription.OrderbookChannel: marketDataLevel2,
}
// Instantiates a communications channel between websocket connections
var comms = make(chan stream.Response)
@@ -62,28 +81,17 @@ func (g *Gemini) WsConnect() error {
return nil
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (g *Gemini) GenerateDefaultSubscriptions() (subscription.List, error) {
// See gemini_types.go for more subscription/candle vars
var channels = []string{
marketDataLevel2,
candles1d,
}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (g *Gemini) generateSubscriptions() (subscription.List, error) {
return g.Features.Subscriptions.ExpandTemplates(g)
}
pairs, err := g.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions subscription.List
for x := range channels {
subscriptions = append(subscriptions, &subscription.Subscription{
Channel: channels[x],
Pairs: pairs,
Asset: asset.Spot,
})
}
return subscriptions, nil
// GetSubscriptionTemplate returns a subscription channel template
func (g *Gemini) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"interval": channelInterval,
}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from the channel
@@ -97,19 +105,14 @@ func (g *Gemini) Unsubscribe(subs subscription.List) error {
}
func (g *Gemini) manageSubs(subs subscription.List, op wsSubOp) error {
format, err := g.GetPairFormat(asset.Spot, true)
if err != nil {
return err
}
req := wsSubscribeRequest{
Type: op,
Subscriptions: make([]wsSubscriptions, 0, len(subs)),
}
for _, s := range subs {
req.Subscriptions = append(req.Subscriptions, wsSubscriptions{
Name: s.Channel,
Symbols: s.Pairs.Format(format).Strings(),
Name: s.QualifiedChannel,
Symbols: s.Pairs.Strings(),
})
}
@@ -166,6 +169,7 @@ func (g *Gemini) WsAuth(ctx context.Context, dialer *websocket.Dialer) error {
if err != nil {
return fmt.Errorf("%v Websocket connection %v error. Error %v", g.Name, endpoint, err)
}
g.Websocket.Wg.Add(1)
go g.wsFunnelConnectionData(g.Websocket.AuthConn)
return nil
}
@@ -561,3 +565,28 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {
return trade.AddTradesToBuffer(g.Name, trades...)
}
func channelName(s *subscription.Subscription) string {
if n, ok := subscriptionNames[s.Channel]; ok {
return n
}
panic(fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel))
}
func channelInterval(i kline.Interval) string {
switch i {
case kline.OneMin, kline.FiveMin, kline.FifteenMin, kline.ThirtyMin, kline.OneHour, kline.SixHour:
return i.Short()
case kline.OneDay:
return "1d"
}
panic(fmt.Errorf("%w: %s", kline.ErrUnsupportedInterval, i.Short()))
}
const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- with $i := $.S.Interval -}} _ {{- interval $i }}{{ end -}}
{{- $.AssetSeparator }}
{{- end }}
`

View File

@@ -89,6 +89,7 @@ func (g *Gemini) SetDefaults() {
Enabled: exchange.FeaturesEnabled{
AutoPairUpdates: true,
},
Subscriptions: defaultSubscriptions.Clone(),
}
g.Requester, err = request.New(g.Name,
@@ -145,7 +146,7 @@ func (g *Gemini) Setup(exch *config.Exchange) error {
Connector: g.WsConnect,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
GenerateSubscriptions: g.generateSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
})
if err != nil {

View File

@@ -1638,7 +1638,6 @@
"websocketResponseCheckTimeout": 30000000,
"websocketResponseMaxLimit": 7000000000,
"websocketTrafficTimeout": 30000000000,
"websocketOrderbookBufferLimit": 5,
"baseCurrencies": "USD",
"currencyPairs": {
"requestFormat": {
@@ -1648,11 +1647,9 @@
"uppercase": true
},
"useGlobalFormat": true,
"assetTypes": [
"spot"
],
"pairs": {
"spot": {
"assetEnabled": true,
"enabled": "BTCUSD",
"available": "BTCUSD,ETHBTC,ETHUSD,BCHUSD,BCHBTC,BCHETH,LTCUSD,LTCBTC,LTCETH,LTCBCH,ZECUSD,ZECBTC,ZECETH,ZECBCH,ZECLTC"
}
@@ -1703,7 +1700,13 @@
"iban": "",
"supportedCurrencies": ""
}
]
],
"orderbook": {
"verificationBypass": false,
"websocketBufferLimit": 5,
"websocketBufferEnabled": false,
"publishPeriod": 10000000000
}
},
{
"name": "HitBTC",