Files
gocryptotrader/exchanges/coinbase/coinbase_websocket.go
cranktakular fd9aaf00a2 Coinbase: Update exchange implementation (#1480)
* Slight enhance of Coinbase tests

Continual enhance of Coinbase tests

The revamp continues

Oh jeez the Orderbook part's unfinished don't look

Coinbase revamp, Orderbook still unfinished

* Coinbase revamp; CreateReport is still WIP

* More coinbase improvements; onto sandbox testing

* Coinbase revamp continues

* Coinbase revamp continues

* Coinbasepro revamp is ceaseless

* Coinbase revamp, starting on advanced trade API

* Coinbase Advanced Trade Starts in Ernest

V3 done, onto V2

Coinbase revamp nears completion

Coinbase revamp nears completion

Test commit should fail

Coinbase revamp nears completion

* Coinbase revamp stage wrapper

* Coinbase wrapper coherence continues

* Coinbase wrapper continues writhing

* Coinbase wrapper & codebase cleanup

* Coinbase updates & wrap progress

* More Coinbase wrapper progress

* Wrapper is wrapped, kinda

* Test & type checking

* Coinbase REST revamp finished

* Post-merge fix

* WS revamp begins

* WS Main Revamp Done?

* CB websocket tidying up

* Coinbase WS wrapperupperer

* Coinbase revamp done??

* Linter progress

* Continued lint cleanup

* Further lint cleanup

* Increased lint coverage

* Does this fix all sloppy reassigns & shadowing?

* Undoing retry policy change

* Documentation regeneration

* Coinbase code improvements

* Providing warning about known issue

* Updating an error to new format

* Making gocritic happy

* Review adherence

* Endpoints moved to V3 & nil pointer fixes

* Removing seemingly superfluous constant

* Glorious improvements

* Removing unused error

* Partial public endpoint addition

* Slight improvements

* Wrapper improvements; still a few errors left in other packages

* A lil Coinbase progress

* Json cleaning

* Lint appeasement

* Config repair

* Config fix (real)

* Little fix

* New public endpoint incorporation

* Additional fixes

* Improvements & Appeasements

* LineSaver

* Additional fixes

* Another fix

* Fixing picked nits

* Quick fixies

* Lil fixes

* Subscriptions: Add List.Enabled

* CoinbasePro: Add subscription templating

* fixup! CoinbasePro: Add subscription templating

* fixup! CoinbasePro: Add subscription templating

* Comment fix

* Subsequent fixes

* Issues hopefully fixed

* Lint fix

* Glorious fixes

* Json formatting

* ShazNits

* (L/N)i(n/)t

* Adding a test

* Tiny test improvement

* Template patch testing

* Fixes

* Further shaznits

* Lint nit

* JWT move and other fixes

* Small nits

* Shaznit, singular

* Post-merge fix

* Post-merge fixes

* Typo fix

* Some glorious nits

* Required changes

* Stop going

* Alias attempt

* Alias fix & test cleanup

* Test fix

* GetDepositAddress logic improvement

* Status update: Fixed

* Lint fix

* Happy birthday to PR 1480

* Cleanups

* Necessary nit corrections

* Fixing sillybug

* As per request

* Programming progress

* Order fixes

* Further fixies

* Test fix

* Pre-merge fixes

* More shaznits

* Context

* Sonic error handling

* Import fix

* Better Sonic error handling

* Perfect Sonic error handling?

* F purge

* Coinbase improvements

* API Update Conformity

* Coinbase continuation

* Coinbase order improvements

* Coinbase order improvements

* CreateOrderConfig improvements

* Managing API updates

* Coinbase API update progression

* jwt rename

* Comment link fix

* Coinbase v2 cleanup

* Post-merge fixes

* Review fixes

* GK's suggestions

* Linter fix

* Minor gbjk fixes

* Nit fixes

* Merge fix

* Lint fixes

* Coinbase rename stage 1

* Coinbase rename stage 2

* Coinbase rename stage 3

* Coinbase rename stage 4

* Coinbase rename final fix

* Coinbase: PoC on converting to request structs

* Applying requested changes

* Many review fixes, handled

* Thrashed by nits

* More minor modifications

* The last nit!?

---------

Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
2025-09-16 13:37:00 +10:00

607 lines
19 KiB
Go

package coinbase
import (
"context"
"fmt"
"net/http"
"slices"
"strconv"
"text/template"
"time"
gws "github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/margin"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
)
const (
coinbaseWebsocketURL = "wss://advanced-trade-ws.coinbase.com"
)
var subscriptionNames = map[string]string{
subscription.HeartbeatChannel: "heartbeats",
subscription.TickerChannel: "ticker",
subscription.CandlesChannel: "candles",
subscription.AllTradesChannel: "market_trades",
subscription.OrderbookChannel: "level2",
subscription.MyAccountChannel: "user",
"status": "status",
"ticker_batch": "ticker_batch",
/* Not Implemented:
"futures_balance_summary": "futures_balance_summary",
*/
}
var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.HeartbeatChannel},
{Enabled: true, Asset: asset.All, Channel: "status"},
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
{Enabled: true, Asset: asset.All, Channel: subscription.MyAccountChannel, Authenticated: true},
{Enabled: true, Asset: asset.Spot, Channel: "ticker_batch"},
/* Not Implemented:
{Enabled: false, Asset: asset.Spot, Channel: "futures_balance_summary", Authenticated: true},
*/
}
// WsConnect initiates a websocket connection
func (e *Exchange) WsConnect() error {
ctx := context.TODO()
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
return websocket.ErrWebsocketNotEnabled
}
var dialer gws.Dialer
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
return err
}
e.Websocket.Wg.Add(1)
go e.wsReadData()
return nil
}
// wsReadData receives and passes on websocket messages for processing
func (e *Exchange) wsReadData() {
defer e.Websocket.Wg.Done()
var seqCount uint64
for {
resp := e.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
return
}
sequence, err := e.wsHandleData(resp.Raw)
if err != nil {
e.Websocket.DataHandler <- err
}
if sequence != nil {
if *sequence != seqCount {
e.Websocket.DataHandler <- fmt.Errorf("%w: received %v, expected %v", errOutOfSequence, sequence, seqCount)
seqCount = *sequence
}
seqCount++
}
}
}
// wsProcessTicker handles ticker data from the websocket
func (e *Exchange) wsProcessTicker(resp *StandardWebsocketResponse) error {
var wsTickers []WebsocketTickerHolder
if err := json.Unmarshal(resp.Events, &wsTickers); err != nil {
return err
}
var allTickers []ticker.Price
aliases := e.pairAliases.GetAliases()
for i := range wsTickers {
for j := range wsTickers[i].Tickers {
symbolAliases := aliases[wsTickers[i].Tickers[j].ProductID]
t := ticker.Price{
LastUpdated: resp.Timestamp,
AssetType: asset.Spot,
ExchangeName: e.Name,
High: wsTickers[i].Tickers[j].High24H.Float64(),
Low: wsTickers[i].Tickers[j].Low24H.Float64(),
Last: wsTickers[i].Tickers[j].Price.Float64(),
Volume: wsTickers[i].Tickers[j].Volume24H.Float64(),
Bid: wsTickers[i].Tickers[j].BestBid.Float64(),
BidSize: wsTickers[i].Tickers[j].BestBidQuantity.Float64(),
Ask: wsTickers[i].Tickers[j].BestAsk.Float64(),
AskSize: wsTickers[i].Tickers[j].BestAskQuantity.Float64(),
}
var errs error
for k := range symbolAliases {
if isEnabled, err := e.CurrencyPairs.IsPairEnabled(symbolAliases[k], asset.Spot); err != nil {
errs = common.AppendError(errs, err)
continue
} else if isEnabled {
t.Pair = symbolAliases[k]
allTickers = append(allTickers, t)
}
}
}
}
e.Websocket.DataHandler <- allTickers
return nil
}
// wsProcessCandle handles candle data from the websocket
func (e *Exchange) wsProcessCandle(resp *StandardWebsocketResponse) error {
var wsCandles []WebsocketCandleHolder
if err := json.Unmarshal(resp.Events, &wsCandles); err != nil {
return err
}
var allCandles []websocket.KlineData
for i := range wsCandles {
for j := range wsCandles[i].Candles {
allCandles = append(allCandles, websocket.KlineData{
Timestamp: resp.Timestamp,
Pair: wsCandles[i].Candles[j].ProductID,
AssetType: asset.Spot,
Exchange: e.Name,
StartTime: wsCandles[i].Candles[j].Start.Time(),
OpenPrice: wsCandles[i].Candles[j].Open.Float64(),
ClosePrice: wsCandles[i].Candles[j].Close.Float64(),
HighPrice: wsCandles[i].Candles[j].High.Float64(),
LowPrice: wsCandles[i].Candles[j].Low.Float64(),
Volume: wsCandles[i].Candles[j].Volume.Float64(),
})
}
}
e.Websocket.DataHandler <- allCandles
return nil
}
// wsProcessMarketTrades handles market trades data from the websocket
func (e *Exchange) wsProcessMarketTrades(resp *StandardWebsocketResponse) error {
var wsTrades []WebsocketMarketTradeHolder
if err := json.Unmarshal(resp.Events, &wsTrades); err != nil {
return err
}
var allTrades []trade.Data
for i := range wsTrades {
for j := range wsTrades[i].Trades {
allTrades = append(allTrades, trade.Data{
TID: wsTrades[i].Trades[j].TradeID,
Exchange: e.Name,
CurrencyPair: wsTrades[i].Trades[j].ProductID,
AssetType: asset.Spot,
Side: wsTrades[i].Trades[j].Side,
Price: wsTrades[i].Trades[j].Price.Float64(),
Amount: wsTrades[i].Trades[j].Size.Float64(),
Timestamp: wsTrades[i].Trades[j].Time,
})
}
}
e.Websocket.DataHandler <- allTrades
return nil
}
// wsProcessL2 handles l2 orderbook data from the websocket
func (e *Exchange) wsProcessL2(resp *StandardWebsocketResponse) error {
var wsL2 []WebsocketOrderbookDataHolder
err := json.Unmarshal(resp.Events, &wsL2)
if err != nil {
return err
}
for i := range wsL2 {
switch wsL2[i].Type {
case "snapshot":
err = e.ProcessSnapshot(&wsL2[i], resp.Timestamp)
case "update":
err = e.ProcessUpdate(&wsL2[i], resp.Timestamp)
default:
err = fmt.Errorf("%w %v", errUnknownL2DataType, wsL2[i].Type)
}
if err != nil {
return err
}
}
return nil
}
// wsProcessUser handles user data from the websocket
func (e *Exchange) wsProcessUser(resp *StandardWebsocketResponse) error {
var wsUser []WebsocketOrderDataHolder
err := json.Unmarshal(resp.Events, &wsUser)
if err != nil {
return err
}
var allOrders []order.Detail
for i := range wsUser {
for j := range wsUser[i].Orders {
var oType order.Type
if oType, err = stringToStandardType(wsUser[i].Orders[j].OrderType); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
var oSide order.Side
if oSide, err = order.StringToOrderSide(wsUser[i].Orders[j].OrderSide); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
var oStatus order.Status
if oStatus, err = statusToStandardStatus(wsUser[i].Orders[j].Status); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
price := wsUser[i].Orders[j].AveragePrice
if wsUser[i].Orders[j].LimitPrice != 0 {
price = wsUser[i].Orders[j].LimitPrice
}
var assetType asset.Item
if assetType, err = stringToStandardAsset(wsUser[i].Orders[j].ProductType); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
var tif order.TimeInForce
if tif, err = strategyDecoder(wsUser[i].Orders[j].TimeInForce); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
if wsUser[i].Orders[j].PostOnly {
tif |= order.PostOnly
}
allOrders = append(allOrders, order.Detail{
Price: price.Float64(),
ClientOrderID: wsUser[i].Orders[j].ClientOrderID,
ExecutedAmount: wsUser[i].Orders[j].CumulativeQuantity.Float64(),
RemainingAmount: wsUser[i].Orders[j].LeavesQuantity.Float64(),
Amount: wsUser[i].Orders[j].CumulativeQuantity.Float64() + wsUser[i].Orders[j].LeavesQuantity.Float64(),
OrderID: wsUser[i].Orders[j].OrderID,
Side: oSide,
Type: oType,
Pair: wsUser[i].Orders[j].ProductID,
AssetType: assetType,
Status: oStatus,
TriggerPrice: wsUser[i].Orders[j].StopPrice.Float64(),
TimeInForce: tif,
Fee: wsUser[i].Orders[j].TotalFees.Float64(),
Date: wsUser[i].Orders[j].CreationTime,
CloseTime: wsUser[i].Orders[j].EndTime,
Exchange: e.Name,
})
}
for j := range wsUser[i].Positions.PerpetualFuturesPositions {
var oSide order.Side
if oSide, err = order.StringToOrderSide(wsUser[i].Positions.PerpetualFuturesPositions[j].PositionSide); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
var mType margin.Type
if mType, err = margin.StringToMarginType(wsUser[i].Positions.PerpetualFuturesPositions[j].MarginType); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
allOrders = append(allOrders, order.Detail{
Pair: wsUser[i].Positions.PerpetualFuturesPositions[j].ProductID,
Side: oSide,
MarginType: mType,
Amount: wsUser[i].Positions.PerpetualFuturesPositions[j].NetSize.Float64(),
Leverage: wsUser[i].Positions.PerpetualFuturesPositions[j].Leverage.Float64(),
AssetType: asset.Futures,
Exchange: e.Name,
})
}
for j := range wsUser[i].Positions.ExpiringFuturesPositions {
var oSide order.Side
if oSide, err = order.StringToOrderSide(wsUser[i].Positions.ExpiringFuturesPositions[j].Side); err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
Err: err,
}
}
allOrders = append(allOrders, order.Detail{
Pair: wsUser[i].Positions.ExpiringFuturesPositions[j].ProductID,
Side: oSide,
ContractAmount: wsUser[i].Positions.ExpiringFuturesPositions[j].NumberOfContracts.Float64(),
Price: wsUser[i].Positions.ExpiringFuturesPositions[j].EntryPrice.Float64(),
})
}
}
e.Websocket.DataHandler <- allOrders
return nil
}
// wsHandleData handles all the websocket data coming from the websocket connection
func (e *Exchange) wsHandleData(respRaw []byte) (*uint64, error) {
var resp StandardWebsocketResponse
if err := json.Unmarshal(respRaw, &resp); err != nil {
return nil, err
}
if resp.Error != "" {
return &resp.Sequence, errors.New(resp.Error)
}
switch resp.Channel {
case "subscriptions", "heartbeats":
return &resp.Sequence, nil
case "status":
var wsStatus []WebsocketProductHolder
if err := json.Unmarshal(resp.Events, &wsStatus); err != nil {
return &resp.Sequence, err
}
e.Websocket.DataHandler <- wsStatus
case "ticker", "ticker_batch":
if err := e.wsProcessTicker(&resp); err != nil {
return &resp.Sequence, err
}
case "candles":
if err := e.wsProcessCandle(&resp); err != nil {
return &resp.Sequence, err
}
case "market_trades":
if err := e.wsProcessMarketTrades(&resp); err != nil {
return &resp.Sequence, err
}
case "l2_data":
if err := e.wsProcessL2(&resp); err != nil {
return &resp.Sequence, err
}
case "user":
if err := e.wsProcessUser(&resp); err != nil {
return &resp.Sequence, err
}
default:
return &resp.Sequence, errChannelNameUnknown
}
return &resp.Sequence, nil
}
// ProcessSnapshot processes the initial orderbook snap shot
func (e *Exchange) ProcessSnapshot(snapshot *WebsocketOrderbookDataHolder, timestamp time.Time) error {
bids, asks, err := processBidAskArray(snapshot, true)
if err != nil {
return err
}
book := &orderbook.Book{
Bids: bids,
Asks: asks,
Exchange: e.Name,
Pair: snapshot.ProductID,
Asset: asset.Spot,
LastUpdated: timestamp,
ValidateOrderbook: e.ValidateOrderbook,
}
for _, a := range e.pairAliases.GetAlias(snapshot.ProductID) {
isEnabled, err := e.IsPairEnabled(a, asset.Spot)
if err != nil {
return err
}
if isEnabled {
book.Pair = a
if err := e.Websocket.Orderbook.LoadSnapshot(book); err != nil {
return err
}
}
}
return nil
}
// ProcessUpdate updates the orderbook local cache
func (e *Exchange) ProcessUpdate(update *WebsocketOrderbookDataHolder, timestamp time.Time) error {
bids, asks, err := processBidAskArray(update, false)
if err != nil {
return err
}
obU := &orderbook.Update{
Bids: bids,
Asks: asks,
Pair: update.ProductID,
UpdateTime: timestamp,
Asset: asset.Spot,
}
for _, a := range e.pairAliases.GetAlias(update.ProductID) {
isEnabled, err := e.IsPairEnabled(a, asset.Spot)
if err != nil {
return err
}
if isEnabled {
obU.Pair = a
if err := e.Websocket.Orderbook.Update(obU); err != nil {
return err
}
}
}
return nil
}
// GenerateSubscriptions adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (e *Exchange) generateSubscriptions() (subscription.List, error) {
return e.Features.Subscriptions.ExpandTemplates(e)
}
// GetSubscriptionTemplate returns a subscription channel template
func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from a list of channels
func (e *Exchange) Subscribe(subs subscription.List) error {
return e.ParallelChanOp(context.TODO(), subs, func(ctx context.Context, subs subscription.List) error { return e.manageSubs(ctx, "subscribe", subs) }, 1)
}
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (e *Exchange) Unsubscribe(subs subscription.List) error {
return e.ParallelChanOp(context.TODO(), subs, func(ctx context.Context, subs subscription.List) error { return e.manageSubs(ctx, "unsubscribe", subs) }, 1)
}
// manageSubs subscribes or unsubscribes from a list of websocket channels
func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription.List) error {
var errs error
subs, errs = subs.ExpandTemplates(e)
for _, s := range subs {
r := &WebsocketRequest{
Type: op,
ProductIDs: s.Pairs,
Channel: s.QualifiedChannel,
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
}
var err error
limitType := WSUnauthRate
if s.Authenticated {
limitType = WSAuthRate
if r.JWT, err = e.GetWSJWT(ctx); err != nil {
return err
}
}
if err = e.Websocket.Conn.SendJSONMessage(ctx, limitType, r); err == nil {
switch op {
case "subscribe":
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, s)
case "unsubscribe":
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, s)
}
}
errs = common.AppendError(errs, err)
}
return errs
}
// GetWSJWT returns a JWT, using a stored one of it's provided, and generating a new one otherwise
func (e *Exchange) GetWSJWT(ctx context.Context) (string, error) {
e.jwt.m.RLock()
if e.jwt.expiresAt.After(time.Now()) {
retStr := e.jwt.token
e.jwt.m.RUnlock()
return retStr, nil
}
e.jwt.m.RUnlock()
e.jwt.m.Lock()
defer e.jwt.m.Unlock()
var err error
e.jwt.token, e.jwt.expiresAt, err = e.GetJWT(ctx, "")
return e.jwt.token, err
}
// processBidAskArray is a helper function that turns WebsocketOrderbookDataHolder into arrays of bids and asks
func processBidAskArray(data *WebsocketOrderbookDataHolder, snapshot bool) (bids, asks orderbook.Levels, err error) {
bids = make(orderbook.Levels, 0, len(data.Changes))
asks = make(orderbook.Levels, 0, len(data.Changes))
for i := range data.Changes {
change := orderbook.Level{Price: data.Changes[i].PriceLevel.Float64(), Amount: data.Changes[i].NewQuantity.Float64()}
switch data.Changes[i].Side {
case "bid":
bids = append(bids, change)
case "offer":
asks = append(asks, change)
default:
return nil, nil, fmt.Errorf("%w %v", order.ErrSideIsInvalid, data.Changes[i].Side)
}
}
if snapshot {
return slices.Clip(bids), slices.Clip(asks), nil
}
return bids, asks, nil
}
// statusToStandardStatus is a helper function that converts a Coinbase Pro status string to a standardised order.Status type
func statusToStandardStatus(stat string) (order.Status, error) {
switch stat {
case "PENDING":
return order.New, nil
case "OPEN":
return order.Active, nil
case "FILLED":
return order.Filled, nil
case "CANCELLED":
return order.Cancelled, nil
case "EXPIRED":
return order.Expired, nil
case "FAILED":
return order.Rejected, nil
default:
return order.UnknownStatus, fmt.Errorf("%w %v", order.ErrUnsupportedStatusType, stat)
}
}
// stringToStandardType is a helper function that converts a Coinbase Pro side string to a standardised order.Type type
func stringToStandardType(str string) (order.Type, error) {
switch str {
case "LIMIT_ORDER_TYPE":
return order.Limit, nil
case "MARKET_ORDER_TYPE":
return order.Market, nil
case "STOP_LIMIT_ORDER_TYPE":
return order.StopLimit, nil
default:
return order.UnknownType, fmt.Errorf("%w %v", order.ErrUnrecognisedOrderType, str)
}
}
// stringToStandardAsset is a helper function that converts a Coinbase Pro asset string to a standardised asset.Item type
func stringToStandardAsset(str string) (asset.Item, error) {
switch str {
case "SPOT":
return asset.Spot, nil
case "FUTURE":
return asset.Futures, nil
default:
return asset.Empty, asset.ErrNotSupported
}
}
// strategyDecoder is a helper function that converts a Coinbase Pro time in force string to a few standardised bools
func strategyDecoder(str string) (tif order.TimeInForce, err error) {
switch str {
case "IMMEDIATE_OR_CANCEL":
return order.ImmediateOrCancel, nil
case "FILL_OR_KILL":
return order.FillOrKill, nil
case "GOOD_UNTIL_CANCELLED":
return order.GoodTillCancel, nil
case "GOOD_UNTIL_DATE_TIME":
return order.GoodTillDay | order.GoodTillTime, nil
default:
return order.UnknownTIF, fmt.Errorf("%w %v", errUnrecognisedStrategyType, str)
}
}
// checkSubscriptions looks for incompatible subscriptions and if found replaces all with defaults
// This should be unnecessary and removable by mid-2025
func (e *Exchange) checkSubscriptions() {
for _, s := range e.Config.Features.Subscriptions {
switch s.Channel {
case "level2_batch", "matches":
e.Config.Features.Subscriptions = defaultSubscriptions.Clone()
e.Features.Subscriptions = e.Config.Features.Subscriptions.Enabled()
return
}
}
}
func channelName(s *subscription.Subscription) (string, error) {
if n, ok := subscriptionNames[s.Channel]; ok {
return n, nil
}
return "", fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel)
}
const subTplText = `
{{ range $asset, $pairs := $.AssetPairs }}
{{- channelName $.S -}}
{{- $.AssetSeparator }}
{{- end }}
`