mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-06-07 15:11:03 +00:00
Merge branch 'master' into engine
This commit is contained in:
@@ -4,19 +4,19 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/thrasher-/gocryptotrader/common"
|
||||
"github.com/thrasher-/gocryptotrader/common/crypto"
|
||||
"github.com/thrasher-/gocryptotrader/currency"
|
||||
exchange "github.com/thrasher-/gocryptotrader/exchanges"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/asset"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
|
||||
log "github.com/thrasher-/gocryptotrader/logger"
|
||||
"github.com/thrasher-corp/gocryptotrader/common"
|
||||
"github.com/thrasher-corp/gocryptotrader/common/crypto"
|
||||
"github.com/thrasher-corp/gocryptotrader/currency"
|
||||
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
|
||||
"github.com/thrasher-corp/gocryptotrader/exchanges/wshandler"
|
||||
log "github.com/thrasher-corp/gocryptotrader/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -68,34 +68,21 @@ var (
|
||||
// WsConnector initiates a new websocket connection
|
||||
func (b *Bitmex) WsConnector() error {
|
||||
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
|
||||
return errors.New(exchange.WebsocketNotEnabled)
|
||||
return errors.New(wshandler.WebsocketNotEnabled)
|
||||
}
|
||||
|
||||
var dialer websocket.Dialer
|
||||
var err error
|
||||
|
||||
if b.Websocket.GetProxyAddress() != "" {
|
||||
var proxy *url.URL
|
||||
proxy, err = url.Parse(b.Websocket.GetProxyAddress())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dialer.Proxy = http.ProxyURL(proxy)
|
||||
}
|
||||
|
||||
b.WebsocketConn, _, err = dialer.Dial(b.Websocket.GetWebsocketURL(), nil)
|
||||
err := b.WebsocketConn.Dial(&dialer, http.Header{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, p, err := b.WebsocketConn.ReadMessage()
|
||||
p, err := b.WebsocketConn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
var welcomeResp WebsocketWelcome
|
||||
err = common.JSONDecode(p, &welcomeResp)
|
||||
err = common.JSONDecode(p.Raw, &welcomeResp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -118,19 +105,6 @@ func (b *Bitmex) WsConnector() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Bitmex) wsReadData() (exchange.WebsocketResponse, error) {
|
||||
_, resp, err := b.WebsocketConn.ReadMessage()
|
||||
if err != nil {
|
||||
return exchange.WebsocketResponse{}, err
|
||||
}
|
||||
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
|
||||
return exchange.WebsocketResponse{
|
||||
Raw: resp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// wsHandleIncomingData services incoming data from the websocket connection
|
||||
func (b *Bitmex) wsHandleIncomingData() {
|
||||
b.Websocket.Wg.Add(1)
|
||||
@@ -145,12 +119,12 @@ func (b *Bitmex) wsHandleIncomingData() {
|
||||
return
|
||||
|
||||
default:
|
||||
resp, err := b.wsReadData()
|
||||
resp, err := b.WebsocketConn.ReadMessage()
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
return
|
||||
}
|
||||
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
message := string(resp.Raw)
|
||||
if strings.Contains(message, "pong") {
|
||||
pongChan <- 1
|
||||
@@ -158,7 +132,7 @@ func (b *Bitmex) wsHandleIncomingData() {
|
||||
}
|
||||
|
||||
if strings.Contains(message, "ping") {
|
||||
err = b.wsSend("pong")
|
||||
err = b.WebsocketConn.SendMessage("pong")
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
@@ -257,7 +231,7 @@ func (b *Bitmex) wsHandleIncomingData() {
|
||||
}
|
||||
|
||||
// TODO: update this to support multiple asset types
|
||||
b.Websocket.DataHandler <- exchange.TradeData{
|
||||
b.Websocket.DataHandler <- wshandler.TradeData{
|
||||
Timestamp: timestamp,
|
||||
Price: trade.Price,
|
||||
Amount: float64(trade.Size),
|
||||
@@ -407,7 +381,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai
|
||||
err)
|
||||
}
|
||||
snapshotloaded[currencyPair][assetType] = true
|
||||
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
|
||||
b.Websocket.DataHandler <- wshandler.WebsocketOrderbookUpdate{
|
||||
Pair: currencyPair,
|
||||
Asset: assetType,
|
||||
Exchange: b.GetName(),
|
||||
@@ -442,7 +416,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai
|
||||
return err
|
||||
}
|
||||
|
||||
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
|
||||
b.Websocket.DataHandler <- wshandler.WebsocketOrderbookUpdate{
|
||||
Pair: currencyPair,
|
||||
Asset: assetType,
|
||||
Exchange: b.GetName(),
|
||||
@@ -456,7 +430,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai
|
||||
func (b *Bitmex) GenerateDefaultSubscriptions() {
|
||||
contracts := b.GetEnabledPairs(asset.PerpetualContract)
|
||||
channels := []string{bitmexWSOrderbookL2, bitmexWSTrade}
|
||||
subscriptions := []exchange.WebsocketChannelSubscription{
|
||||
subscriptions := []wshandler.WebsocketChannelSubscription{
|
||||
{
|
||||
Channel: bitmexWSAnnouncement,
|
||||
},
|
||||
@@ -464,7 +438,7 @@ func (b *Bitmex) GenerateDefaultSubscriptions() {
|
||||
|
||||
for i := range channels {
|
||||
for j := range contracts {
|
||||
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
|
||||
subscriptions = append(subscriptions, wshandler.WebsocketChannelSubscription{
|
||||
Channel: fmt.Sprintf("%v:%v", channels[i], contracts[j].String()),
|
||||
Currency: contracts[j],
|
||||
})
|
||||
@@ -482,7 +456,7 @@ func (b *Bitmex) GenerateAuthenticatedSubscriptions() {
|
||||
channels := []string{bitmexWSExecution,
|
||||
bitmexWSPosition,
|
||||
}
|
||||
subscriptions := []exchange.WebsocketChannelSubscription{
|
||||
subscriptions := []wshandler.WebsocketChannelSubscription{
|
||||
{
|
||||
Channel: bitmexWSAffiliate,
|
||||
},
|
||||
@@ -504,7 +478,7 @@ func (b *Bitmex) GenerateAuthenticatedSubscriptions() {
|
||||
}
|
||||
for i := range channels {
|
||||
for j := range contracts {
|
||||
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
|
||||
subscriptions = append(subscriptions, wshandler.WebsocketChannelSubscription{
|
||||
Channel: fmt.Sprintf("%v:%v", channels[i], contracts[j].String()),
|
||||
Currency: contracts[j],
|
||||
})
|
||||
@@ -514,21 +488,21 @@ func (b *Bitmex) GenerateAuthenticatedSubscriptions() {
|
||||
}
|
||||
|
||||
// Subscribe subscribes to a websocket channel
|
||||
func (b *Bitmex) Subscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
|
||||
func (b *Bitmex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
|
||||
var subscriber WebsocketRequest
|
||||
subscriber.Command = "subscribe"
|
||||
subscriber.Arguments = append(subscriber.Arguments, channelToSubscribe.Channel)
|
||||
return b.wsSend(subscriber)
|
||||
return b.WebsocketConn.SendMessage(subscriber)
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (b *Bitmex) Unsubscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
|
||||
func (b *Bitmex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
|
||||
var subscriber WebsocketRequest
|
||||
subscriber.Command = "unsubscribe"
|
||||
subscriber.Arguments = append(subscriber.Arguments,
|
||||
channelToSubscribe.Params["args"],
|
||||
channelToSubscribe.Channel+":"+channelToSubscribe.Currency.String())
|
||||
return b.wsSend(subscriber)
|
||||
return b.WebsocketConn.SendMessage(subscriber)
|
||||
}
|
||||
|
||||
// WebsocketSendAuth sends an authenticated subscription
|
||||
@@ -548,20 +522,10 @@ func (b *Bitmex) websocketSendAuth() error {
|
||||
sendAuth.Command = "authKeyExpires"
|
||||
sendAuth.Arguments = append(sendAuth.Arguments, b.API.Credentials.Key, timestamp,
|
||||
signature)
|
||||
err := b.wsSend(sendAuth)
|
||||
err := b.WebsocketConn.SendMessage(sendAuth)
|
||||
if err != nil {
|
||||
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WsSend sends data to the websocket server
|
||||
func (b *Bitmex) wsSend(data interface{}) error {
|
||||
b.wsRequestMtx.Lock()
|
||||
defer b.wsRequestMtx.Unlock()
|
||||
if b.Verbose {
|
||||
log.Debugf(log.ExchangeSys, "%v sending message to websocket %v", b.Name, data)
|
||||
}
|
||||
return b.WebsocketConn.WriteJSON(data)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user