Files
gocryptotrader/exchanges/btcc/btcc_websocket.go
Scott 6c850e73e2 Websocket connection handling and subscription management (#297)
* Step one: Sets up  connection handler for websockets to always be connected until a shutdown event is received.
Sets up a vague subscription handler to ensure subscriptions are subscribed

* Adds support for resubscriptions for bitfinex, bitstamp, bitmex and btcc. Adds subscription params for special websocket subscription requirements. Removes subscription monitor from wait group so that it can exist despite a shutdown and continuously check

* Adds channel subscription support to bitmex, btse, coibasepro, coinut, gateio, gemini, hitbtc, huobi, hadax, kraken, okgroup, poloniex and zb

* Implements unsubscribe for bitfinex, btcc, btse, coinbasepro, gateio, gitbtc, huobi, hadax

* ManageSubscriptions now called from WSConnect and made private instead of inside individual exchanges. ManageSubscriptions can now unsubscribe. exchange_websocket_types.go now contains all exchange_websocket.go types to avoid clutter

* Adds it to websocket functionality so managesubscriptions will close when not supported

* Separates functions into testable functions to ensure logic works. Adds tests. Updates websocket setup to include verbosity (inherited from exchange). Adds no connection tolerance to fatal on failed reconnects

* More exchange_websocket tests. Updating to use pointers. Creation of equals func to make comparison easier

* Fixes okex, okcoin tests. Fixes race conditions. Removes pointer usage again.

* Adds subscribe and unsubscribe to wrappers

* Fixes deadlock. Fixes ws verbosity.

* Updates all exchanges to properly support subscription/connection feature. Also reintroduces race conditions....

* Moves connection varialbes to struct from package to allow each websocket to have their own reconnection checks. Neatens up logs

* Fixes lint/critic issues. Fixes tests. Removes unused function.

* Moves websocket ratelimiter to their own const variables. Fixes more race conditions with connecting variable

* Removes redundant subscribe functions. Ensuring only the exchange_websocket.go can manage subscriptions. Fixes debug logs to be verbose wrapped

* Fixes issue with slice copying. Re-adds okgroup default channels

* Adds nolint to append

* Adds comments and adds support for gateio auth request subscriptions

* Adds new test to ensure slices dont point to the same vars

* removes fatals. gofmt goimports

* more gofmts

* Addresses PR comments, removing empty and redundant lines

* Addresses PR comments. Ensures that writing to the websocket is single-threaded by adding a mutex to exchanges. Minimises wrapper code and moves subscription loops to exchange_websocket. Privatises ChannelsToSubscribe, Connecting properties and removeChannelToSubscribe func to prevent unnecessary tampering.

* Removes unused mutex. FMTS and IMPORTS

* Fixes request lock time change

* More specific logs

* Renames ws mutex. Fixes bitmex subscriptions. Increased gateio ratelimiter to 120ms. Removes ratelimiter from bitfinex, bitmex, bitstamp, btcc, btse, coibasepro, hitbtc, huobi, hadax, poloniex and zb

* changes recieved typo due to not being well received

* Fixes parsing issue with Huobi and hadax

* Fixes data race with more locks

* removes defer locks. fixes huobi/hadax verbose output

* Fixes double JSONEncode for coinut. Fixes verbose output for coinut

* gofmt,goimport for coinut

* Fixes issue where multiple connection monitors can spawn

* Removes defer exchange.WebsocketConn.Close() in defer handledata exit as connectionmonitor handles connections instead

* gofmt and go import

* More fmts
2019-05-16 16:39:16 +10:00

540 lines
12 KiB
Go

package btcc
import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
log "github.com/thrasher-/gocryptotrader/logger"
)
const (
btccSocketioAddress = "wss://ws.btcc.com"
msgTypeHeartBeat = "Heartbeat"
msgTypeGetActiveContracts = "GetActiveContractsResponse"
msgTypeQuote = "QuoteResponse"
msgTypeLogin = "LoginResponse"
msgTypeAccountInfo = "AccountInfo"
msgTypeExecReport = "ExecReport"
msgTypePlaceOrder = "PlaceOrderResponse"
msgTypeCancelAllOrders = "CancelAllOrdersResponse"
msgTypeCancelOrder = "CancelOrderResponse"
msgTypeCancelReplaceOrder = "CancelReplaceOrderResponse"
msgTypeGetAccountInfo = "GetAccountInfoResponse"
msgTypeRetrieveOrder = "RetrieveOrderResponse"
msgTypeGetTrades = "GetTradesResponse"
msgTypeAllTickers = "AllTickersResponse"
)
var (
mtx sync.Mutex
)
// WsConnect initiates a websocket client connection
func (b *BTCC) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return errors.New(exchange.WebsocketNotEnabled)
}
var dialer websocket.Dialer
var err error
if b.Websocket.GetProxyAddress() != "" {
var proxy *url.URL
proxy, err = url.Parse(b.Websocket.GetProxyAddress())
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
b.Conn, _, err = dialer.Dial(b.Websocket.GetWebsocketURL(), http.Header{})
if err != nil {
return err
}
err = b.WsUpdateCurrencyPairs()
if err != nil {
return err
}
go b.WsHandleData()
b.GenerateDefaultSubscriptions()
return nil
}
// WsReadData reads data from the websocket connection
func (b *BTCC) WsReadData() (exchange.WebsocketResponse, error) {
mtx.Lock()
_, resp, err := b.Conn.ReadMessage()
mtx.Unlock()
if err != nil {
return exchange.WebsocketResponse{}, err
}
b.Websocket.TrafficAlert <- struct{}{}
return exchange.WebsocketResponse{
Raw: resp,
}, nil
}
// WsHandleData handles read data
func (b *BTCC) WsHandleData() {
b.Websocket.Wg.Add(1)
defer func() {
err := b.Conn.Close()
if err != nil {
b.Websocket.DataHandler <- fmt.Errorf("btcc_websocket.go - Unable to close Websocket connection. Error: %s",
err)
}
b.Websocket.Wg.Done()
}()
for {
select {
case <-b.Websocket.ShutdownC:
return
default:
resp, err := b.WsReadData()
if err != nil {
b.Websocket.DataHandler <- err
return
}
var Result WsResponseMain
err = common.JSONDecode(resp.Raw, &Result)
if err != nil {
b.Websocket.DataHandler <- err
continue
}
switch Result.MsgType {
case msgTypeHeartBeat:
case msgTypeGetActiveContracts:
log.Debugf("Active Contracts: %s", resp.Raw)
case msgTypeQuote:
log.Debugf("Quotes: %s", resp.Raw)
case msgTypeLogin:
log.Debugf("Login: %s", resp.Raw)
case msgTypeAccountInfo:
log.Debugf("Account info: %s", resp.Raw)
case msgTypeExecReport:
log.Debugf("Exec Report: %s", resp.Raw)
case msgTypePlaceOrder:
log.Debugf("Place order: %s", resp.Raw)
case msgTypeCancelAllOrders:
log.Debugf("Cancel All orders: %s", resp.Raw)
case msgTypeCancelOrder:
log.Debugf("Cancel order: %s", resp.Raw)
case msgTypeCancelReplaceOrder:
log.Debugf("Replace order: %s", resp.Raw)
case msgTypeGetAccountInfo:
log.Debugf("Account info: %s", resp.Raw)
case msgTypeRetrieveOrder:
log.Debugf("Retrieve order: %s", resp.Raw)
case msgTypeGetTrades:
var trades WsTrades
err = common.JSONDecode(resp.Raw, &trades)
if err != nil {
b.Websocket.DataHandler <- err
continue
}
case "OrderBook":
// NOTE: This seems to be a websocket update not reflected in
// current API docs, this comes in conjunction with the other
// orderbook feeds
var orderbook WsOrderbookSnapshot
err = common.JSONDecode(resp.Raw, &orderbook)
if err != nil {
b.Websocket.DataHandler <- err
continue
}
switch orderbook.Type {
case "F":
err = b.WsProcessOrderbookSnapshot(&orderbook)
if err != nil {
b.Websocket.DataHandler <- err
}
case "I":
err = b.WsProcessOrderbookUpdate(&orderbook)
if err != nil {
b.Websocket.DataHandler <- err
}
}
case "SubOrderBookResponse":
case "Ticker":
var ticker WsTicker
err = common.JSONDecode(resp.Raw, &ticker)
if err != nil {
b.Websocket.DataHandler <- err
continue
}
tick := exchange.TickerData{}
tick.AssetType = "SPOT"
tick.ClosePrice = ticker.PrevCls
tick.Exchange = b.GetName()
tick.HighPrice = ticker.High
tick.LowPrice = ticker.Low
tick.OpenPrice = ticker.Open
tick.Pair = currency.NewPairFromString(ticker.Symbol)
tick.Quantity = ticker.Volume
timestamp := time.Unix(ticker.Timestamp, 0)
tick.Timestamp = timestamp
b.Websocket.DataHandler <- tick
default:
if common.StringContains(Result.MsgType, "OrderBook") {
var oldOrderbookType WsOrderbookSnapshotOld
err = common.JSONDecode(resp.Raw, &oldOrderbookType)
if err != nil {
b.Websocket.DataHandler <- err
continue
}
symbol := common.SplitStrings(Result.MsgType, ".")
err = b.WsProcessOldOrderbookSnapshot(oldOrderbookType, symbol[1])
if err != nil {
b.Websocket.DataHandler <- err
continue
}
continue
}
}
}
}
}
// WsUpdateCurrencyPairs updates currency pairs from the websocket connection
func (b *BTCC) WsUpdateCurrencyPairs() error {
var currencyResponse WsResponseMain
for {
_, resp, err := b.Conn.ReadMessage()
if err != nil {
return err
}
b.Websocket.TrafficAlert <- struct{}{}
err = common.JSONDecode(resp, &currencyResponse)
if err != nil {
return err
}
switch currencyResponse.MsgType {
case msgTypeAllTickers:
var tickers WsAllTickerData
err := common.JSONDecode(currencyResponse.Data, &tickers)
if err != nil {
return err
}
var availableTickers currency.Pairs
for i := range tickers {
availableTickers = append(availableTickers,
currency.NewPairFromString(tickers[i].Symbol))
}
err = b.UpdateCurrencies(availableTickers, false, true)
if err != nil {
return fmt.Errorf("%s failed to update available currencies. %s",
b.Name,
err)
}
case "Heartbeat":
default:
return fmt.Errorf("btcc_websocket.go error - Updating currency pairs resp incorrect: %s",
string(resp))
}
}
}
// WsProcessOrderbookSnapshot processes a new orderbook snapshot
func (b *BTCC) WsProcessOrderbookSnapshot(ob *WsOrderbookSnapshot) error {
var asks, bids []orderbook.Item
for _, data := range ob.List {
var newSize float64
switch result := data.Size.(type) {
case float64:
newSize = result
case string:
var err error
newSize, err = strconv.ParseFloat(result, 64)
if err != nil {
return err
}
}
if data.Side == "1" {
asks = append(asks, orderbook.Item{Price: data.Price, Amount: newSize})
continue
}
bids = append(bids, orderbook.Item{Price: data.Price, Amount: newSize})
}
var newOrderBook orderbook.Base
newOrderBook.Asks = asks
newOrderBook.AssetType = "SPOT"
newOrderBook.Bids = bids
newOrderBook.Pair = currency.NewPairFromString(ob.Symbol)
err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, b.GetName(), false)
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: b.GetName(),
Asset: "SPOT",
Pair: currency.NewPairFromString(ob.Symbol),
}
return nil
}
// WsProcessOrderbookUpdate processes an orderbook update
func (b *BTCC) WsProcessOrderbookUpdate(ob *WsOrderbookSnapshot) error {
var asks, bids []orderbook.Item
for _, data := range ob.List {
var newSize float64
switch d := data.Size.(type) {
case float64:
newSize = d
case string:
var err error
newSize, err = strconv.ParseFloat(d, 64)
if err != nil {
return err
}
}
if data.Side == "1" {
if newSize < 0 {
asks = append(asks, orderbook.Item{Price: data.Price, Amount: 0})
continue
}
asks = append(asks, orderbook.Item{Price: data.Price, Amount: newSize})
continue
}
if newSize < 0 {
bids = append(bids, orderbook.Item{Price: data.Price, Amount: 0})
continue
}
bids = append(bids, orderbook.Item{Price: data.Price, Amount: newSize})
}
p := currency.NewPairFromString(ob.Symbol)
err := b.Websocket.Orderbook.Update(bids, asks, p, time.Now(), b.GetName(), "SPOT")
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: b.GetName(),
Asset: "SPOT",
Pair: currency.NewPairFromString(ob.Symbol),
}
return nil
}
// WsProcessOldOrderbookSnapshot processes an old orderbook snapshot
func (b *BTCC) WsProcessOldOrderbookSnapshot(ob WsOrderbookSnapshotOld, symbol string) error {
var asks, bids []orderbook.Item
askData := ob.Data["Asks"]
bidData := ob.Data["Bids"]
for _, ask := range askData {
data := ask.([]interface{})
var price, amount float64
switch d := data[0].(type) {
case string:
var err error
price, err = strconv.ParseFloat(d, 64)
if err != nil {
return err
}
case float64:
price = d
}
switch d := data[0].(type) {
case string:
var err error
amount, err = strconv.ParseFloat(d, 64)
if err != nil {
return err
}
case float64:
amount = d
}
asks = append(asks, orderbook.Item{
Price: price,
Amount: amount,
})
}
for _, bid := range bidData {
data := bid.([]interface{})
var price, amount float64
switch d := data[1].(type) {
case string:
var err error
price, err = strconv.ParseFloat(d, 64)
if err != nil {
return err
}
case float64:
price = d
}
switch d := data[1].(type) {
case string:
var err error
amount, err = strconv.ParseFloat(d, 64)
if err != nil {
return err
}
case float64:
amount = d
}
bids = append(bids, orderbook.Item{
Price: price,
Amount: amount,
})
}
p := currency.NewPairFromString(symbol)
err := b.Websocket.Orderbook.Update(bids, asks, p, time.Now(), b.GetName(), "SPOT")
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: b.GetName(),
Pair: p,
Asset: "SPOT",
}
return nil
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (b *BTCC) GenerateDefaultSubscriptions() {
subscriptions := []exchange.WebsocketChannelSubscription{}
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
Channel: "SubscribeAllTickers",
})
var channels = []string{"SubOrderBook", "GetTrades", "Subscribe"}
enabledCurrencies := b.GetEnabledCurrencies()
for i := range channels {
for j := range enabledCurrencies {
params := make(map[string]interface{})
if channels[i] == "SubOrderBook" {
params["len"] = "100"
} else if channels[i] == "GetTrades" {
params["count"] = "100"
}
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
Channel: channels[i],
Currency: enabledCurrencies[j],
Params: params,
})
}
}
b.Websocket.SubscribeToChannels(subscriptions)
}
// Subscribe sends a websocket message to receive data from the channel
func (b *BTCC) Subscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
subscription := WsOutgoing{
Action: channelToSubscribe.Channel,
Symbol: channelToSubscribe.Currency.String(),
}
if subscription.Action == "SubOrderBook" {
subscription.Len = 100
} else if subscription.Action == "GetTrades" {
subscription.Count = 100
}
return b.wsSend(subscription)
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *BTCC) Unsubscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
subscription := WsOutgoing{}
switch channelToSubscribe.Channel {
case "SubOrderBook":
subscription.Action = "UnSubOrderBook"
subscription.Symbol = channelToSubscribe.Currency.String()
case "Subscribe":
subscription.Action = "UnSubscribe"
subscription.Symbol = channelToSubscribe.Currency.String()
case "SubscribeAllTickers":
subscription.Action = "UnSubscribeAllTickers"
}
return b.wsSend(subscription)
}
// WsSend sends data to the websocket server
func (b *BTCC) wsSend(data interface{}) error {
b.wsRequestMtx.Lock()
defer b.wsRequestMtx.Unlock()
if b.Verbose {
log.Debugf("%v sending message to websocket %v", b.Name, data)
}
return b.Conn.WriteJSON(data)
}