Files
gocryptotrader/exchanges/btse/btse_websocket.go
Samuael A. 3f534a15f1 cmd/exchange_template, exchanges: Update templates and propogate to exchanges (#1777)
* Added TimeInForce type and updated related files

* Linter issue fix and minor coinbasepro type update

* Bitrex consts update

* added unit test and minor changes in bittrex

* Unit tests update

* Fix minor linter issues

* Update TestStringToTimeInForce unit test

* Exchange test template change

* A different approach

* fix conflict with gateio timeInForce

* minor exchange template update

* Minor fix to test_files template

* Update order tests

* Complete updating the order unit tests

* Updating exchange wrapper and test template files

* update kucoin and deribit wrapper to match the time in force change

* minor comment update

* fix time-in-force related test errors

* linter issue fix

* ADD_NEW_EXCHANGE documentation update

* time in force constants, functions and unit tests update

* shift tif policies to TimeInForce

* Update time-in-force, related functions, and unit tests

* fix linter issue and time-in-force processing

* added a good till crossing tif value

* order type fix and fix related tim-in-force entries

* update time-in-force unmarshaling and unit test

* consistency guideline added

* fix time-in-force error in gateio

* linter issue fix

* update based on review comments

* add unit test and fix missing issues

* minor fix and added benchmark unit test

* change GTT to GTC for limit

* fix linter issue

* added time-in-force value to place order param

* fix minor issues based on review comment and move tif code to separate files

* update on exchanges linked to time-in-force

* resolve missing review comments

* minor linter issues fix

* added time-in-force handler and update timeInForce parametered endpoint

* minor fixes based on review

* nits fix

* update based on review

* linter fix

* rm getTimeInForce func and minor change to time-in-force

* minor change

* update based on review comments

* wrappers and time-in-force calling approach

* minor change

* update gateio string to timeInForce conversion and unit test

* update exchange template

* update wrapper template file

* policy comments, and template files update

* rename all exchange types name to Exchange

* update on template files and template generation

* templates and generation code and other updates

* linter issue fix

* added subscriptions and websocket templates

* update ADD_NEW_EXCHANGE.md with recent binance functions and implementations

* rename template files and update unit tests

* minor template and unit test fix

* rename templates and fix on unit tests

* update on template files and documentation

* removed unnecessary tag fix and update templates

* fix Add_NEW_EXCHANGE.md doc file

* formatting, comments, and error checks update on template files

* rename exchange receivers to e and ex for consistency

* rename unit test exchange receiver and minor updates

* linter issues fix

* fix deribit issue and minor style update

* fix test issues caused by receiver change

* raname local variables exchange declaration variables

* update templates comments

* update templates and related comments

* renamed ex to e

* update template comments

* toggle WS to false to improve coverage

* template comments update

* added test coverage to Ws enabled and minor changes

---------

Co-authored-by: Samuel Reid <43227667+cranktakular@users.noreply.github.com>
2025-07-17 10:46:36 +10:00

443 lines
12 KiB
Go

package btse
import (
"context"
"encoding/hex"
"errors"
"net/http"
"strconv"
"strings"
"text/template"
"time"
gws "github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
btseWebsocket = "wss://ws.btse.com/ws/spot"
btseWebsocketTimer = time.Second * 57
)
var subscriptionNames = map[string]string{
subscription.MyTradesChannel: "notificationApi",
subscription.AllTradesChannel: "tradeHistoryApi",
}
var defaultSubscriptions = subscription.List{
{Enabled: true, Asset: asset.Spot, Channel: subscription.AllTradesChannel},
{Enabled: true, Channel: subscription.MyTradesChannel, Authenticated: true},
}
// WsConnect connects the websocket client
func (e *Exchange) WsConnect() error {
ctx := context.TODO()
if !e.Websocket.IsEnabled() || !e.IsEnabled() {
return websocket.ErrWebsocketNotEnabled
}
var dialer gws.Dialer
err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{})
if err != nil {
return err
}
e.Websocket.Conn.SetupPingHandler(request.Unset, websocket.PingHandler{
MessageType: gws.PingMessage,
Delay: btseWebsocketTimer,
})
e.Websocket.Wg.Add(1)
go e.wsReadData(ctx)
if e.IsWebsocketAuthenticationSupported() {
err = e.WsAuthenticate(ctx)
if err != nil {
e.Websocket.DataHandler <- err
e.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
return nil
}
// WsAuthenticate Send an authentication message to receive auth data
func (e *Exchange) WsAuthenticate(ctx context.Context) error {
creds, err := e.GetCredentials(ctx)
if err != nil {
return err
}
nonce := strconv.FormatInt(time.Now().UnixMilli(), 10)
path := "/ws/spot" + nonce
hmac, err := crypto.GetHMAC(crypto.HashSHA512_384, []byte((path)), []byte(creds.Secret))
if err != nil {
return err
}
req := wsSub{
Operation: "authKeyExpires",
Arguments: []string{creds.Key, nonce, hex.EncodeToString(hmac)},
}
return e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, req)
}
func stringToOrderStatus(status string) (order.Status, error) {
switch status {
case "ORDER_INSERTED", "TRIGGER_INSERTED":
return order.New, nil
case "ORDER_CANCELLED":
return order.Cancelled, nil
case "ORDER_FULL_TRANSACTED":
return order.Filled, nil
case "ORDER_PARTIALLY_TRANSACTED":
return order.PartiallyFilled, nil
case "TRIGGER_ACTIVATED":
return order.Active, nil
case "INSUFFICIENT_BALANCE":
return order.InsufficientBalance, nil
case "MARKET_UNAVAILABLE":
return order.MarketUnavailable, nil
default:
return order.UnknownStatus, errors.New(status + " not recognised as order status")
}
}
// wsReadData receives and passes on websocket messages for processing
func (e *Exchange) wsReadData(ctx context.Context) {
defer e.Websocket.Wg.Done()
for {
resp := e.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
return
}
err := e.wsHandleData(ctx, resp.Raw)
if err != nil {
e.Websocket.DataHandler <- err
}
}
}
func (e *Exchange) wsHandleData(_ context.Context, respRaw []byte) error {
type Result map[string]any
var result Result
err := json.Unmarshal(respRaw, &result)
if err != nil {
if strings.Contains(string(respRaw), "connect success") {
return nil
}
return err
}
if result == nil {
return nil
}
if result["event"] != nil {
event, ok := result["event"].(string)
if !ok {
return errors.New(e.Name + websocket.UnhandledMessage + string(respRaw))
}
switch event {
case "subscribe":
var subscribe WsSubscriptionAcknowledgement
err = json.Unmarshal(respRaw, &subscribe)
if err != nil {
return err
}
if e.Verbose {
log.Infof(log.WebsocketMgr, "%v subscribed to %v", e.Name, strings.Join(subscribe.Channel, ", "))
}
case "login":
var login WsLoginAcknowledgement
err = json.Unmarshal(respRaw, &login)
if err != nil {
return err
}
e.Websocket.SetCanUseAuthenticatedEndpoints(login.Success)
if e.Verbose {
log.Infof(log.WebsocketMgr, "%v websocket authenticated: %v", e.Name, login.Success)
}
default:
return errors.New(e.Name + websocket.UnhandledMessage + string(respRaw))
}
return nil
}
topic, ok := result["topic"].(string)
if !ok {
return errors.New(e.Name + websocket.UnhandledMessage + string(respRaw))
}
switch {
case topic == "notificationApi":
var notification wsNotification
err = json.Unmarshal(respRaw, &notification)
if err != nil {
return err
}
for i := range notification.Data {
var oType order.Type
var oSide order.Side
var oStatus order.Status
oType, err = order.StringToOrderType(notification.Data[i].Type)
if err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: notification.Data[i].OrderID,
Err: err,
}
}
oSide, err = order.StringToOrderSide(notification.Data[i].OrderMode)
if err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: notification.Data[i].OrderID,
Err: err,
}
}
oStatus, err = stringToOrderStatus(notification.Data[i].Status)
if err != nil {
e.Websocket.DataHandler <- order.ClassificationError{
Exchange: e.Name,
OrderID: notification.Data[i].OrderID,
Err: err,
}
}
var p currency.Pair
p, err = currency.NewPairFromString(notification.Data[i].Symbol)
if err != nil {
return err
}
var a asset.Item
a, err = e.GetPairAssetType(p)
if err != nil {
return err
}
e.Websocket.DataHandler <- &order.Detail{
Price: notification.Data[i].Price,
Amount: notification.Data[i].Size,
TriggerPrice: notification.Data[i].TriggerPrice,
Exchange: e.Name,
OrderID: notification.Data[i].OrderID,
Type: oType,
Side: oSide,
Status: oStatus,
AssetType: a,
Date: notification.Data[i].Timestamp.Time(),
Pair: p,
}
}
case strings.Contains(topic, "tradeHistoryApi"):
saveTradeData := e.IsSaveTradeDataEnabled()
tradeFeed := e.IsTradeFeedEnabled()
if !saveTradeData && !tradeFeed {
return nil
}
var tradeHistory wsTradeHistory
err = json.Unmarshal(respRaw, &tradeHistory)
if err != nil {
return err
}
var trades []trade.Data
for x := range tradeHistory.Data {
var p currency.Pair
p, err = currency.NewPairFromString(tradeHistory.Data[x].Symbol)
if err != nil {
return err
}
var a asset.Item
a, err = e.GetPairAssetType(p)
if err != nil {
return err
}
trades = append(trades, trade.Data{
Timestamp: tradeHistory.Data[x].Timestamp.Time().UTC(),
CurrencyPair: p,
AssetType: a,
Exchange: e.Name,
Price: tradeHistory.Data[x].Price,
Amount: tradeHistory.Data[x].Size,
Side: tradeHistory.Data[x].Side,
TID: strconv.FormatInt(tradeHistory.Data[x].TID, 10),
})
}
if tradeFeed {
for i := range trades {
e.Websocket.DataHandler <- trades[i]
}
}
if saveTradeData {
return trade.AddTradesToBuffer(trades...)
}
case strings.Contains(topic, "orderBookL2Api"): // TODO: Fix orderbook updates.
var t wsOrderBook
err = json.Unmarshal(respRaw, &t)
if err != nil {
return err
}
newOB := orderbook.Book{
Bids: make(orderbook.Levels, 0, len(t.Data.BuyQuote)),
Asks: make(orderbook.Levels, 0, len(t.Data.SellQuote)),
}
var price, amount float64
for i := range t.Data.SellQuote {
p := strings.ReplaceAll(t.Data.SellQuote[i].Price, ",", "")
price, err = strconv.ParseFloat(p, 64)
if err != nil {
return err
}
a := strings.ReplaceAll(t.Data.SellQuote[i].Size, ",", "")
amount, err = strconv.ParseFloat(a, 64)
if err != nil {
return err
}
if e.orderbookFilter(price, amount) {
continue
}
newOB.Asks = append(newOB.Asks, orderbook.Level{
Price: price,
Amount: amount,
})
}
for j := range t.Data.BuyQuote {
p := strings.ReplaceAll(t.Data.BuyQuote[j].Price, ",", "")
price, err = strconv.ParseFloat(p, 64)
if err != nil {
return err
}
a := strings.ReplaceAll(t.Data.BuyQuote[j].Size, ",", "")
amount, err = strconv.ParseFloat(a, 64)
if err != nil {
return err
}
if e.orderbookFilter(price, amount) {
continue
}
newOB.Bids = append(newOB.Bids, orderbook.Level{
Price: price,
Amount: amount,
})
}
p, err := currency.NewPairFromString(t.Topic[strings.Index(t.Topic, ":")+1 : strings.Index(t.Topic, currency.UnderscoreDelimiter)])
if err != nil {
return err
}
var a asset.Item
a, err = e.GetPairAssetType(p)
if err != nil {
return err
}
newOB.Pair = p
newOB.Asset = a
newOB.Exchange = e.Name
newOB.Asks.Reverse() // Reverse asks for correct alignment
newOB.ValidateOrderbook = e.ValidateOrderbook
newOB.LastUpdated = time.Now() // NOTE: Temp to fix test.
err = e.Websocket.Orderbook.LoadSnapshot(&newOB)
if err != nil {
return err
}
default:
return errors.New(e.Name + websocket.UnhandledMessage + string(respRaw))
}
return nil
}
// orderbookFilter is needed on book levels from this exchange as their data
// is incorrect
func (e *Exchange) orderbookFilter(price, amount float64) bool {
// Amount filtering occurs when the amount exceeds the decimal returned.
// e.g. {"price":"1.37","size":"0.00"} currency: SFI-ETH
// Opted to not round up to 0.01 as this might skew calculations
// more than removing from the books completely.
// Price filtering occurs when we are deep in the bid book and there are
// prices that are less than 4 decimal places
// e.g. {"price":"0.0000","size":"14219"} currency: TRX-PAX
// We cannot load a zero price and this will ruin calculations
return price == 0 || amount == 0
}
// generateSubscriptions returns a list of subscriptions from the configured subscriptions feature
func (e *Exchange) generateSubscriptions() (subscription.List, error) {
return e.Features.Subscriptions.ExpandTemplates(e)
}
// GetSubscriptionTemplate returns a subscription channel template
func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) {
return template.New("master.tmpl").Funcs(template.FuncMap{
"channelName": channelName,
"isSymbolChannel": isSymbolChannel,
}).Parse(subTplText)
}
// Subscribe sends a websocket message to receive data from a list of channels
func (e *Exchange) Subscribe(subs subscription.List) error {
ctx := context.TODO()
req := wsSub{Operation: "subscribe"}
for _, s := range subs {
req.Arguments = append(req.Arguments, s.QualifiedChannel)
}
err := e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, req)
if err == nil {
err = e.Websocket.AddSuccessfulSubscriptions(e.Websocket.Conn, subs...)
}
return err
}
// Unsubscribe sends a websocket message to stop receiving data from a list of channels
func (e *Exchange) Unsubscribe(subs subscription.List) error {
ctx := context.TODO()
req := wsSub{Operation: "unsubscribe"}
for _, s := range subs {
req.Arguments = append(req.Arguments, s.QualifiedChannel)
}
err := e.Websocket.Conn.SendJSONMessage(ctx, request.Unset, req)
if err == nil {
err = e.Websocket.RemoveSubscriptions(e.Websocket.Conn, subs...)
}
return err
}
// channelName returns the correct channel name for the asset
func channelName(s *subscription.Subscription) string {
if name, ok := subscriptionNames[s.Channel]; ok {
return name
}
panic("Channel not supported: " + s.Channel)
}
// isSymbolChannel returns if the channel expects receive a symbol
func isSymbolChannel(s *subscription.Subscription) bool {
return s.Channel != subscription.MyTradesChannel
}
const subTplText = `
{{- with $name := channelName $.S }}
{{ range $asset, $pairs := $.AssetPairs }}
{{- if isSymbolChannel $.S }}
{{- range $p := $pairs -}}
{{- $name -}} : {{- $p -}}
{{- $.PairSeparator }}
{{- end }}
{{- else -}}
{{ $name }}
{{- end }}
{{- $.AssetSeparator }}
{{- end }}
{{- end }}
`