mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 23:16:45 +00:00
* Slight enhance of Coinbase tests Continual enhance of Coinbase tests The revamp continues Oh jeez the Orderbook part's unfinished don't look Coinbase revamp, Orderbook still unfinished * Coinbase revamp; CreateReport is still WIP * More coinbase improvements; onto sandbox testing * Coinbase revamp continues * Coinbase revamp continues * Coinbasepro revamp is ceaseless * Coinbase revamp, starting on advanced trade API * Coinbase Advanced Trade Starts in Ernest V3 done, onto V2 Coinbase revamp nears completion Coinbase revamp nears completion Test commit should fail Coinbase revamp nears completion * Coinbase revamp stage wrapper * Coinbase wrapper coherence continues * Coinbase wrapper continues writhing * Coinbase wrapper & codebase cleanup * Coinbase updates & wrap progress * More Coinbase wrapper progress * Wrapper is wrapped, kinda * Test & type checking * Coinbase REST revamp finished * Post-merge fix * WS revamp begins * WS Main Revamp Done? * CB websocket tidying up * Coinbase WS wrapperupperer * Coinbase revamp done?? * Linter progress * Continued lint cleanup * Further lint cleanup * Increased lint coverage * Does this fix all sloppy reassigns & shadowing? * Undoing retry policy change * Documentation regeneration * Coinbase code improvements * Providing warning about known issue * Updating an error to new format * Making gocritic happy * Review adherence * Endpoints moved to V3 & nil pointer fixes * Removing seemingly superfluous constant * Glorious improvements * Removing unused error * Partial public endpoint addition * Slight improvements * Wrapper improvements; still a few errors left in other packages * A lil Coinbase progress * Json cleaning * Lint appeasement * Config repair * Config fix (real) * Little fix * New public endpoint incorporation * Additional fixes * Improvements & Appeasements * LineSaver * Additional fixes * Another fix * Fixing picked nits * Quick fixies * Lil fixes * Subscriptions: Add List.Enabled * CoinbasePro: Add subscription templating * fixup! CoinbasePro: Add subscription templating * fixup! CoinbasePro: Add subscription templating * Comment fix * Subsequent fixes * Issues hopefully fixed * Lint fix * Glorious fixes * Json formatting * ShazNits * (L/N)i(n/)t * Adding a test * Tiny test improvement * Template patch testing * Fixes * Further shaznits * Lint nit * JWT move and other fixes * Small nits * Shaznit, singular * Post-merge fix * Post-merge fixes * Typo fix * Some glorious nits * Required changes * Stop going * Alias attempt * Alias fix & test cleanup * Test fix * GetDepositAddress logic improvement * Status update: Fixed * Lint fix * Happy birthday to PR 1480 * Cleanups * Necessary nit corrections * Fixing sillybug * As per request * Programming progress * Order fixes * Further fixies * Test fix * Pre-merge fixes * More shaznits * Context * Sonic error handling * Import fix * Better Sonic error handling * Perfect Sonic error handling? * F purge * Coinbase improvements * API Update Conformity * Coinbase continuation * Coinbase order improvements * Coinbase order improvements * CreateOrderConfig improvements * Managing API updates * Coinbase API update progression * jwt rename * Comment link fix * Coinbase v2 cleanup * Post-merge fixes * Review fixes * GK's suggestions * Linter fix * Minor gbjk fixes * Nit fixes * Merge fix * Lint fixes * Coinbase rename stage 1 * Coinbase rename stage 2 * Coinbase rename stage 3 * Coinbase rename stage 4 * Coinbase rename final fix * Coinbase: PoC on converting to request structs * Applying requested changes * Many review fixes, handled * Thrashed by nits * More minor modifications * The last nit!? --------- Co-authored-by: Gareth Kirwan <gbjkirwan@gmail.com>
607 lines
19 KiB
Go
607 lines
19 KiB
Go
package coinbase
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"slices"
|
|
"strconv"
|
|
"text/template"
|
|
"time"
|
|
|
|
gws "github.com/gorilla/websocket"
|
|
"github.com/pkg/errors"
|
|
"github.com/thrasher-corp/gocryptotrader/common"
|
|
"github.com/thrasher-corp/gocryptotrader/encoding/json"
|
|
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/margin"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
|
|
)
|
|
|
|
const (
|
|
coinbaseWebsocketURL = "wss://advanced-trade-ws.coinbase.com"
|
|
)
|
|
|
|
var subscriptionNames = map[string]string{
|
|
subscription.HeartbeatChannel: "heartbeats",
|
|
subscription.TickerChannel: "ticker",
|
|
subscription.CandlesChannel: "candles",
|
|
subscription.AllTradesChannel: "market_trades",
|
|
subscription.OrderbookChannel: "level2",
|
|
subscription.MyAccountChannel: "user",
|
|
"status": "status",
|
|
"ticker_batch": "ticker_batch",
|
|
/* Not Implemented:
|
|
"futures_balance_summary": "futures_balance_summary",
|
|
*/
|
|
}
|
|
|
|
var defaultSubscriptions = subscription.List{
|
|
{Enabled: true, Channel: subscription.HeartbeatChannel},
|
|
{Enabled: true, Asset: asset.All, Channel: "status"},
|
|
{Enabled: true, Asset: asset.Spot, Channel: subscription.TickerChannel},
|
|
{Enabled: true, Asset: asset.Spot, Channel: subscription.CandlesChannel},
|
|
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
|
|
{Enabled: true, Asset: asset.Spot, Channel: subscription.OrderbookChannel},
|
|
{Enabled: true, Asset: asset.All, Channel: subscription.MyAccountChannel, Authenticated: true},
|
|
{Enabled: true, Asset: asset.Spot, Channel: "ticker_batch"},
|
|
/* Not Implemented:
|
|
{Enabled: false, Asset: asset.Spot, Channel: "futures_balance_summary", Authenticated: true},
|
|
*/
|
|
}
|
|
|
|
// WsConnect initiates a websocket connection
|
|
func (e *Exchange) WsConnect() error {
|
|
ctx := context.TODO()
|
|
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
|
|
return websocket.ErrWebsocketNotEnabled
|
|
}
|
|
var dialer gws.Dialer
|
|
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
|
|
return err
|
|
}
|
|
e.Websocket.Wg.Add(1)
|
|
go e.wsReadData()
|
|
return nil
|
|
}
|
|
|
|
// wsReadData receives and passes on websocket messages for processing
|
|
func (e *Exchange) wsReadData() {
|
|
defer e.Websocket.Wg.Done()
|
|
var seqCount uint64
|
|
for {
|
|
resp := e.Websocket.Conn.ReadMessage()
|
|
if resp.Raw == nil {
|
|
return
|
|
}
|
|
sequence, err := e.wsHandleData(resp.Raw)
|
|
if err != nil {
|
|
e.Websocket.DataHandler <- err
|
|
}
|
|
if sequence != nil {
|
|
if *sequence != seqCount {
|
|
e.Websocket.DataHandler <- fmt.Errorf("%w: received %v, expected %v", errOutOfSequence, sequence, seqCount)
|
|
seqCount = *sequence
|
|
}
|
|
seqCount++
|
|
}
|
|
}
|
|
}
|
|
|
|
// wsProcessTicker handles ticker data from the websocket
|
|
func (e *Exchange) wsProcessTicker(resp *StandardWebsocketResponse) error {
|
|
var wsTickers []WebsocketTickerHolder
|
|
if err := json.Unmarshal(resp.Events, &wsTickers); err != nil {
|
|
return err
|
|
}
|
|
var allTickers []ticker.Price
|
|
aliases := e.pairAliases.GetAliases()
|
|
for i := range wsTickers {
|
|
for j := range wsTickers[i].Tickers {
|
|
symbolAliases := aliases[wsTickers[i].Tickers[j].ProductID]
|
|
t := ticker.Price{
|
|
LastUpdated: resp.Timestamp,
|
|
AssetType: asset.Spot,
|
|
ExchangeName: e.Name,
|
|
High: wsTickers[i].Tickers[j].High24H.Float64(),
|
|
Low: wsTickers[i].Tickers[j].Low24H.Float64(),
|
|
Last: wsTickers[i].Tickers[j].Price.Float64(),
|
|
Volume: wsTickers[i].Tickers[j].Volume24H.Float64(),
|
|
Bid: wsTickers[i].Tickers[j].BestBid.Float64(),
|
|
BidSize: wsTickers[i].Tickers[j].BestBidQuantity.Float64(),
|
|
Ask: wsTickers[i].Tickers[j].BestAsk.Float64(),
|
|
AskSize: wsTickers[i].Tickers[j].BestAskQuantity.Float64(),
|
|
}
|
|
var errs error
|
|
for k := range symbolAliases {
|
|
if isEnabled, err := e.CurrencyPairs.IsPairEnabled(symbolAliases[k], asset.Spot); err != nil {
|
|
errs = common.AppendError(errs, err)
|
|
continue
|
|
} else if isEnabled {
|
|
t.Pair = symbolAliases[k]
|
|
allTickers = append(allTickers, t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
e.Websocket.DataHandler <- allTickers
|
|
return nil
|
|
}
|
|
|
|
// wsProcessCandle handles candle data from the websocket
|
|
func (e *Exchange) wsProcessCandle(resp *StandardWebsocketResponse) error {
|
|
var wsCandles []WebsocketCandleHolder
|
|
if err := json.Unmarshal(resp.Events, &wsCandles); err != nil {
|
|
return err
|
|
}
|
|
var allCandles []websocket.KlineData
|
|
for i := range wsCandles {
|
|
for j := range wsCandles[i].Candles {
|
|
allCandles = append(allCandles, websocket.KlineData{
|
|
Timestamp: resp.Timestamp,
|
|
Pair: wsCandles[i].Candles[j].ProductID,
|
|
AssetType: asset.Spot,
|
|
Exchange: e.Name,
|
|
StartTime: wsCandles[i].Candles[j].Start.Time(),
|
|
OpenPrice: wsCandles[i].Candles[j].Open.Float64(),
|
|
ClosePrice: wsCandles[i].Candles[j].Close.Float64(),
|
|
HighPrice: wsCandles[i].Candles[j].High.Float64(),
|
|
LowPrice: wsCandles[i].Candles[j].Low.Float64(),
|
|
Volume: wsCandles[i].Candles[j].Volume.Float64(),
|
|
})
|
|
}
|
|
}
|
|
e.Websocket.DataHandler <- allCandles
|
|
return nil
|
|
}
|
|
|
|
// wsProcessMarketTrades handles market trades data from the websocket
|
|
func (e *Exchange) wsProcessMarketTrades(resp *StandardWebsocketResponse) error {
|
|
var wsTrades []WebsocketMarketTradeHolder
|
|
if err := json.Unmarshal(resp.Events, &wsTrades); err != nil {
|
|
return err
|
|
}
|
|
var allTrades []trade.Data
|
|
for i := range wsTrades {
|
|
for j := range wsTrades[i].Trades {
|
|
allTrades = append(allTrades, trade.Data{
|
|
TID: wsTrades[i].Trades[j].TradeID,
|
|
Exchange: e.Name,
|
|
CurrencyPair: wsTrades[i].Trades[j].ProductID,
|
|
AssetType: asset.Spot,
|
|
Side: wsTrades[i].Trades[j].Side,
|
|
Price: wsTrades[i].Trades[j].Price.Float64(),
|
|
Amount: wsTrades[i].Trades[j].Size.Float64(),
|
|
Timestamp: wsTrades[i].Trades[j].Time,
|
|
})
|
|
}
|
|
}
|
|
e.Websocket.DataHandler <- allTrades
|
|
return nil
|
|
}
|
|
|
|
// wsProcessL2 handles l2 orderbook data from the websocket
|
|
func (e *Exchange) wsProcessL2(resp *StandardWebsocketResponse) error {
|
|
var wsL2 []WebsocketOrderbookDataHolder
|
|
err := json.Unmarshal(resp.Events, &wsL2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range wsL2 {
|
|
switch wsL2[i].Type {
|
|
case "snapshot":
|
|
err = e.ProcessSnapshot(&wsL2[i], resp.Timestamp)
|
|
case "update":
|
|
err = e.ProcessUpdate(&wsL2[i], resp.Timestamp)
|
|
default:
|
|
err = fmt.Errorf("%w %v", errUnknownL2DataType, wsL2[i].Type)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// wsProcessUser handles user data from the websocket
|
|
func (e *Exchange) wsProcessUser(resp *StandardWebsocketResponse) error {
|
|
var wsUser []WebsocketOrderDataHolder
|
|
err := json.Unmarshal(resp.Events, &wsUser)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var allOrders []order.Detail
|
|
for i := range wsUser {
|
|
for j := range wsUser[i].Orders {
|
|
var oType order.Type
|
|
if oType, err = stringToStandardType(wsUser[i].Orders[j].OrderType); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
var oSide order.Side
|
|
if oSide, err = order.StringToOrderSide(wsUser[i].Orders[j].OrderSide); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
var oStatus order.Status
|
|
if oStatus, err = statusToStandardStatus(wsUser[i].Orders[j].Status); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
price := wsUser[i].Orders[j].AveragePrice
|
|
if wsUser[i].Orders[j].LimitPrice != 0 {
|
|
price = wsUser[i].Orders[j].LimitPrice
|
|
}
|
|
var assetType asset.Item
|
|
if assetType, err = stringToStandardAsset(wsUser[i].Orders[j].ProductType); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
var tif order.TimeInForce
|
|
if tif, err = strategyDecoder(wsUser[i].Orders[j].TimeInForce); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
if wsUser[i].Orders[j].PostOnly {
|
|
tif |= order.PostOnly
|
|
}
|
|
allOrders = append(allOrders, order.Detail{
|
|
Price: price.Float64(),
|
|
ClientOrderID: wsUser[i].Orders[j].ClientOrderID,
|
|
ExecutedAmount: wsUser[i].Orders[j].CumulativeQuantity.Float64(),
|
|
RemainingAmount: wsUser[i].Orders[j].LeavesQuantity.Float64(),
|
|
Amount: wsUser[i].Orders[j].CumulativeQuantity.Float64() + wsUser[i].Orders[j].LeavesQuantity.Float64(),
|
|
OrderID: wsUser[i].Orders[j].OrderID,
|
|
Side: oSide,
|
|
Type: oType,
|
|
Pair: wsUser[i].Orders[j].ProductID,
|
|
AssetType: assetType,
|
|
Status: oStatus,
|
|
TriggerPrice: wsUser[i].Orders[j].StopPrice.Float64(),
|
|
TimeInForce: tif,
|
|
Fee: wsUser[i].Orders[j].TotalFees.Float64(),
|
|
Date: wsUser[i].Orders[j].CreationTime,
|
|
CloseTime: wsUser[i].Orders[j].EndTime,
|
|
Exchange: e.Name,
|
|
})
|
|
}
|
|
for j := range wsUser[i].Positions.PerpetualFuturesPositions {
|
|
var oSide order.Side
|
|
if oSide, err = order.StringToOrderSide(wsUser[i].Positions.PerpetualFuturesPositions[j].PositionSide); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
var mType margin.Type
|
|
if mType, err = margin.StringToMarginType(wsUser[i].Positions.PerpetualFuturesPositions[j].MarginType); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
allOrders = append(allOrders, order.Detail{
|
|
Pair: wsUser[i].Positions.PerpetualFuturesPositions[j].ProductID,
|
|
Side: oSide,
|
|
MarginType: mType,
|
|
Amount: wsUser[i].Positions.PerpetualFuturesPositions[j].NetSize.Float64(),
|
|
Leverage: wsUser[i].Positions.PerpetualFuturesPositions[j].Leverage.Float64(),
|
|
AssetType: asset.Futures,
|
|
Exchange: e.Name,
|
|
})
|
|
}
|
|
for j := range wsUser[i].Positions.ExpiringFuturesPositions {
|
|
var oSide order.Side
|
|
if oSide, err = order.StringToOrderSide(wsUser[i].Positions.ExpiringFuturesPositions[j].Side); err != nil {
|
|
e.Websocket.DataHandler <- order.ClassificationError{
|
|
Exchange: e.Name,
|
|
Err: err,
|
|
}
|
|
}
|
|
allOrders = append(allOrders, order.Detail{
|
|
Pair: wsUser[i].Positions.ExpiringFuturesPositions[j].ProductID,
|
|
Side: oSide,
|
|
ContractAmount: wsUser[i].Positions.ExpiringFuturesPositions[j].NumberOfContracts.Float64(),
|
|
Price: wsUser[i].Positions.ExpiringFuturesPositions[j].EntryPrice.Float64(),
|
|
})
|
|
}
|
|
}
|
|
e.Websocket.DataHandler <- allOrders
|
|
return nil
|
|
}
|
|
|
|
// wsHandleData handles all the websocket data coming from the websocket connection
|
|
func (e *Exchange) wsHandleData(respRaw []byte) (*uint64, error) {
|
|
var resp StandardWebsocketResponse
|
|
if err := json.Unmarshal(respRaw, &resp); err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.Error != "" {
|
|
return &resp.Sequence, errors.New(resp.Error)
|
|
}
|
|
switch resp.Channel {
|
|
case "subscriptions", "heartbeats":
|
|
return &resp.Sequence, nil
|
|
case "status":
|
|
var wsStatus []WebsocketProductHolder
|
|
if err := json.Unmarshal(resp.Events, &wsStatus); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
e.Websocket.DataHandler <- wsStatus
|
|
case "ticker", "ticker_batch":
|
|
if err := e.wsProcessTicker(&resp); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
case "candles":
|
|
if err := e.wsProcessCandle(&resp); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
case "market_trades":
|
|
if err := e.wsProcessMarketTrades(&resp); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
case "l2_data":
|
|
if err := e.wsProcessL2(&resp); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
case "user":
|
|
if err := e.wsProcessUser(&resp); err != nil {
|
|
return &resp.Sequence, err
|
|
}
|
|
default:
|
|
return &resp.Sequence, errChannelNameUnknown
|
|
}
|
|
return &resp.Sequence, nil
|
|
}
|
|
|
|
// ProcessSnapshot processes the initial orderbook snap shot
|
|
func (e *Exchange) ProcessSnapshot(snapshot *WebsocketOrderbookDataHolder, timestamp time.Time) error {
|
|
bids, asks, err := processBidAskArray(snapshot, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
book := &orderbook.Book{
|
|
Bids: bids,
|
|
Asks: asks,
|
|
Exchange: e.Name,
|
|
Pair: snapshot.ProductID,
|
|
Asset: asset.Spot,
|
|
LastUpdated: timestamp,
|
|
ValidateOrderbook: e.ValidateOrderbook,
|
|
}
|
|
for _, a := range e.pairAliases.GetAlias(snapshot.ProductID) {
|
|
isEnabled, err := e.IsPairEnabled(a, asset.Spot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isEnabled {
|
|
book.Pair = a
|
|
if err := e.Websocket.Orderbook.LoadSnapshot(book); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ProcessUpdate updates the orderbook local cache
|
|
func (e *Exchange) ProcessUpdate(update *WebsocketOrderbookDataHolder, timestamp time.Time) error {
|
|
bids, asks, err := processBidAskArray(update, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
obU := &orderbook.Update{
|
|
Bids: bids,
|
|
Asks: asks,
|
|
Pair: update.ProductID,
|
|
UpdateTime: timestamp,
|
|
Asset: asset.Spot,
|
|
}
|
|
for _, a := range e.pairAliases.GetAlias(update.ProductID) {
|
|
isEnabled, err := e.IsPairEnabled(a, asset.Spot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isEnabled {
|
|
obU.Pair = a
|
|
if err := e.Websocket.Orderbook.Update(obU); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GenerateSubscriptions adds default subscriptions to websocket to be handled by ManageSubscriptions()
|
|
func (e *Exchange) generateSubscriptions() (subscription.List, error) {
|
|
return e.Features.Subscriptions.ExpandTemplates(e)
|
|
}
|
|
|
|
// GetSubscriptionTemplate returns a subscription channel template
|
|
func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
|
|
return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText)
|
|
}
|
|
|
|
// Subscribe sends a websocket message to receive data from a list of channels
|
|
func (e *Exchange) Subscribe(subs subscription.List) error {
|
|
return e.ParallelChanOp(context.TODO(), subs, func(ctx context.Context, subs subscription.List) error { return e.manageSubs(ctx, "subscribe", subs) }, 1)
|
|
}
|
|
|
|
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
|
|
func (e *Exchange) Unsubscribe(subs subscription.List) error {
|
|
return e.ParallelChanOp(context.TODO(), subs, func(ctx context.Context, subs subscription.List) error { return e.manageSubs(ctx, "unsubscribe", subs) }, 1)
|
|
}
|
|
|
|
// manageSubs subscribes or unsubscribes from a list of websocket channels
|
|
func (e *Exchange) manageSubs(ctx context.Context, op string, subs subscription.List) error {
|
|
var errs error
|
|
subs, errs = subs.ExpandTemplates(e)
|
|
for _, s := range subs {
|
|
r := &WebsocketRequest{
|
|
Type: op,
|
|
ProductIDs: s.Pairs,
|
|
Channel: s.QualifiedChannel,
|
|
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
|
|
}
|
|
var err error
|
|
limitType := WSUnauthRate
|
|
if s.Authenticated {
|
|
limitType = WSAuthRate
|
|
if r.JWT, err = e.GetWSJWT(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err = e.Websocket.Conn.SendJSONMessage(ctx, limitType, r); err == nil {
|
|
switch op {
|
|
case "subscribe":
|
|
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, s)
|
|
case "unsubscribe":
|
|
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, s)
|
|
}
|
|
}
|
|
errs = common.AppendError(errs, err)
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// GetWSJWT returns a JWT, using a stored one of it's provided, and generating a new one otherwise
|
|
func (e *Exchange) GetWSJWT(ctx context.Context) (string, error) {
|
|
e.jwt.m.RLock()
|
|
if e.jwt.expiresAt.After(time.Now()) {
|
|
retStr := e.jwt.token
|
|
e.jwt.m.RUnlock()
|
|
return retStr, nil
|
|
}
|
|
e.jwt.m.RUnlock()
|
|
e.jwt.m.Lock()
|
|
defer e.jwt.m.Unlock()
|
|
var err error
|
|
e.jwt.token, e.jwt.expiresAt, err = e.GetJWT(ctx, "")
|
|
return e.jwt.token, err
|
|
}
|
|
|
|
// processBidAskArray is a helper function that turns WebsocketOrderbookDataHolder into arrays of bids and asks
|
|
func processBidAskArray(data *WebsocketOrderbookDataHolder, snapshot bool) (bids, asks orderbook.Levels, err error) {
|
|
bids = make(orderbook.Levels, 0, len(data.Changes))
|
|
asks = make(orderbook.Levels, 0, len(data.Changes))
|
|
for i := range data.Changes {
|
|
change := orderbook.Level{Price: data.Changes[i].PriceLevel.Float64(), Amount: data.Changes[i].NewQuantity.Float64()}
|
|
switch data.Changes[i].Side {
|
|
case "bid":
|
|
bids = append(bids, change)
|
|
case "offer":
|
|
asks = append(asks, change)
|
|
default:
|
|
return nil, nil, fmt.Errorf("%w %v", order.ErrSideIsInvalid, data.Changes[i].Side)
|
|
}
|
|
}
|
|
if snapshot {
|
|
return slices.Clip(bids), slices.Clip(asks), nil
|
|
}
|
|
return bids, asks, nil
|
|
}
|
|
|
|
// statusToStandardStatus is a helper function that converts a Coinbase Pro status string to a standardised order.Status type
|
|
func statusToStandardStatus(stat string) (order.Status, error) {
|
|
switch stat {
|
|
case "PENDING":
|
|
return order.New, nil
|
|
case "OPEN":
|
|
return order.Active, nil
|
|
case "FILLED":
|
|
return order.Filled, nil
|
|
case "CANCELLED":
|
|
return order.Cancelled, nil
|
|
case "EXPIRED":
|
|
return order.Expired, nil
|
|
case "FAILED":
|
|
return order.Rejected, nil
|
|
default:
|
|
return order.UnknownStatus, fmt.Errorf("%w %v", order.ErrUnsupportedStatusType, stat)
|
|
}
|
|
}
|
|
|
|
// stringToStandardType is a helper function that converts a Coinbase Pro side string to a standardised order.Type type
|
|
func stringToStandardType(str string) (order.Type, error) {
|
|
switch str {
|
|
case "LIMIT_ORDER_TYPE":
|
|
return order.Limit, nil
|
|
case "MARKET_ORDER_TYPE":
|
|
return order.Market, nil
|
|
case "STOP_LIMIT_ORDER_TYPE":
|
|
return order.StopLimit, nil
|
|
default:
|
|
return order.UnknownType, fmt.Errorf("%w %v", order.ErrUnrecognisedOrderType, str)
|
|
}
|
|
}
|
|
|
|
// stringToStandardAsset is a helper function that converts a Coinbase Pro asset string to a standardised asset.Item type
|
|
func stringToStandardAsset(str string) (asset.Item, error) {
|
|
switch str {
|
|
case "SPOT":
|
|
return asset.Spot, nil
|
|
case "FUTURE":
|
|
return asset.Futures, nil
|
|
default:
|
|
return asset.Empty, asset.ErrNotSupported
|
|
}
|
|
}
|
|
|
|
// strategyDecoder is a helper function that converts a Coinbase Pro time in force string to a few standardised bools
|
|
func strategyDecoder(str string) (tif order.TimeInForce, err error) {
|
|
switch str {
|
|
case "IMMEDIATE_OR_CANCEL":
|
|
return order.ImmediateOrCancel, nil
|
|
case "FILL_OR_KILL":
|
|
return order.FillOrKill, nil
|
|
case "GOOD_UNTIL_CANCELLED":
|
|
return order.GoodTillCancel, nil
|
|
case "GOOD_UNTIL_DATE_TIME":
|
|
return order.GoodTillDay | order.GoodTillTime, nil
|
|
default:
|
|
return order.UnknownTIF, fmt.Errorf("%w %v", errUnrecognisedStrategyType, str)
|
|
}
|
|
}
|
|
|
|
// checkSubscriptions looks for incompatible subscriptions and if found replaces all with defaults
|
|
// This should be unnecessary and removable by mid-2025
|
|
func (e *Exchange) checkSubscriptions() {
|
|
for _, s := range e.Config.Features.Subscriptions {
|
|
switch s.Channel {
|
|
case "level2_batch", "matches":
|
|
e.Config.Features.Subscriptions = defaultSubscriptions.Clone()
|
|
e.Features.Subscriptions = e.Config.Features.Subscriptions.Enabled()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func channelName(s *subscription.Subscription) (string, error) {
|
|
if n, ok := subscriptionNames[s.Channel]; ok {
|
|
return n, nil
|
|
}
|
|
return "", fmt.Errorf("%w: %s", subscription.ErrNotSupported, s.Channel)
|
|
}
|
|
|
|
const subTplText = `
|
|
{{ range $asset, $pairs := $.AssetPairs }}
|
|
{{- channelName $.S -}}
|
|
{{- $.AssetSeparator }}
|
|
{{- end }}
|
|
`
|