mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-06-02 07:26:53 +00:00
Authenticated Websocket support (#315)
* Improves subscribing by not allowing duplicates. Adds bitmex auth support * Adds coinbase pro support. Partial BTCC support. Adds WebsocketAuthenticatedEndpointsSupported websocket feature. Adds GateIO support * Adds Coinut support * Moves Coinut WS types to file. Implements Gemini's secure WS endpoint * Adds HitBTC ws authenticated support. Fixes var names * Adds huobi and hadax authenticated websocket support * Adds auth to okgroup (okex, okcoin). Fixes some linting * Adds Poloniex support * Adds ZB support * Adds proper bitmex support * Improves bitfinex support, improves websocket functionality definitions * Fixes coinbasepro auth * Tests all endpoints * go formatting, importing, linting run * Adds wrapper supports * General clean up. Data race destruction * Improves testing on all exchanges except ZB * Fixes ZB hashing, parsing and tests * minor nits before someone else sees them <_< * Fixes some nits pertaining to variable usage, comments, typos and rate limiting * Addresses nits regarding types and test responses where applicable * fmt import * Fixes linting issues * No longer returns an error on failure to authenticate, just logs. Adds new AuthenticatedWebsocketAPISupport config value to allow a user to seperate auth from REST and WS. Prevents WS auth if AuthenticatedWebsocketAPISupport is false, adds additional login check 'CanUseAuthenticatedEndpoints' for when login only occurs once (not per request). Removes unnecessary time.Sleeps from code. Moves WS auth error logic to auth function so that wrappers can get involved in all the auth fun. New-fandangled shared test package, used exclusively in testing, will be the store of all the constant boilerplate things like timeout values. Moves WS test setup function to only run once when there are multiple WS endpoint tests. Cleans up some struct types * Increases test coverage with tests for config.areAuthenticatedCredentialsValid config.CheckExchangeConfigValues, exchange.SetAPIKeys, exchange.GetAuthenticatedAPISupport, exchange_websocket.CanUseAuthenticatedEndpoitns and exchange_websocket.SetCanUseAuthenticatedEndpoints. Adds b.Websocket.SetCanUseAuthenticatedEndpoints(false) when bitfinex fails to authenticate Fixes a typo. gofmt and goimport * Trim Test Typos * Reformats various websocket types. Adds more specific error messaging to config.areAuthenticatedCredentialsValid
This commit is contained in:
@@ -123,7 +123,8 @@ func (g *Gemini) SetDefaults() {
|
||||
g.APIUrl = g.APIUrlDefault
|
||||
g.WebsocketInit()
|
||||
g.Websocket.Functionality = exchange.WebsocketOrderbookSupported |
|
||||
exchange.WebsocketTradeDataSupported
|
||||
exchange.WebsocketTradeDataSupported |
|
||||
exchange.WebsocketAuthenticatedEndpointsSupported
|
||||
}
|
||||
|
||||
// Setup sets exchange configuration parameters
|
||||
@@ -133,6 +134,7 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) {
|
||||
} else {
|
||||
g.Enabled = true
|
||||
g.AuthenticatedAPISupport = exch.AuthenticatedAPISupport
|
||||
g.AuthenticatedWebsocketAPISupport = exch.AuthenticatedWebsocketAPISupport
|
||||
g.SetAPIKeys(exch.APIKey, exch.APISecret, "", false)
|
||||
g.SetHTTPClientTimeout(exch.HTTPTimeout)
|
||||
g.SetHTTPClientUserAgent(exch.HTTPUserAgent)
|
||||
@@ -142,7 +144,7 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) {
|
||||
g.BaseCurrencies = exch.BaseCurrencies
|
||||
g.AvailablePairs = exch.AvailablePairs
|
||||
g.EnabledPairs = exch.EnabledPairs
|
||||
|
||||
g.WebsocketURL = geminiWebsocketEndpoint
|
||||
err := g.SetCurrencyPairFormat()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -161,6 +163,7 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) {
|
||||
}
|
||||
if exch.UseSandbox {
|
||||
g.APIUrl = geminiSandboxAPIURL
|
||||
g.WebsocketURL = geminiWebsocketSandboxEndpoint
|
||||
}
|
||||
err = g.SetClientProxyAddress(exch.ProxyAddress)
|
||||
if err != nil {
|
||||
@@ -172,8 +175,8 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) {
|
||||
exch.Name,
|
||||
exch.Websocket,
|
||||
exch.Verbose,
|
||||
geminiWebsocketEndpoint,
|
||||
exch.WebsocketURL)
|
||||
g.WebsocketURL,
|
||||
g.WebsocketURL)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -3,11 +3,14 @@ package gemini
|
||||
import (
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/thrasher-/gocryptotrader/common"
|
||||
"github.com/thrasher-/gocryptotrader/config"
|
||||
"github.com/thrasher-/gocryptotrader/currency"
|
||||
exchange "github.com/thrasher-/gocryptotrader/exchanges"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/sharedtestvalues"
|
||||
)
|
||||
|
||||
// Please enter sandbox API keys & assigned roles for better testing procedures
|
||||
@@ -61,8 +64,9 @@ func TestSetup(t *testing.T) {
|
||||
t.Error("Test Failed - Gemini Setup() init error")
|
||||
}
|
||||
|
||||
geminiConfig.AuthenticatedWebsocketAPISupport = true
|
||||
geminiConfig.AuthenticatedAPISupport = true
|
||||
|
||||
geminiConfig.Websocket = true
|
||||
Session[1].Setup(&geminiConfig)
|
||||
Session[2].Setup(&geminiConfig)
|
||||
|
||||
@@ -554,3 +558,32 @@ func TestGetDepositAddress(t *testing.T) {
|
||||
t.Error("Test Failed - GetDepositAddress error cannot be nil")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWsAuth dials websocket, sends login request.
|
||||
func TestWsAuth(t *testing.T) {
|
||||
TestAddSession(t)
|
||||
TestSetDefaults(t)
|
||||
TestSetup(t)
|
||||
g := Session[1]
|
||||
g.WebsocketURL = geminiWebsocketSandboxEndpoint
|
||||
|
||||
if !g.Websocket.IsEnabled() && !g.AuthenticatedWebsocketAPISupport || !areTestAPIKeysSet() {
|
||||
t.Skip(exchange.WebsocketNotEnabled)
|
||||
}
|
||||
var dialer websocket.Dialer
|
||||
go g.WsHandleData()
|
||||
err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
timer := time.NewTimer(sharedtestvalues.WebsocketResponseDefaultTimeout)
|
||||
select {
|
||||
case resp := <-g.Websocket.DataHandler:
|
||||
if resp.(WsSubscriptionAcknowledgementResponse).Type != "subscription_ack" {
|
||||
t.Error("Login failed")
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Error("Expected response")
|
||||
}
|
||||
timer.Stop()
|
||||
}
|
||||
|
||||
@@ -195,8 +195,13 @@ type ErrorCapture struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// Response defines the main response type
|
||||
type Response struct {
|
||||
// WsResponse generic response
|
||||
type WsResponse struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// WsMarketUpdateResponse defines the main response type
|
||||
type WsMarketUpdateResponse struct {
|
||||
Type string `json:"type"`
|
||||
EventID int64 `json:"eventId"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
@@ -221,5 +226,192 @@ type Event struct {
|
||||
type ReadData struct {
|
||||
Raw []byte
|
||||
Currency currency.Pair
|
||||
FeedType string
|
||||
}
|
||||
|
||||
// WsRequestPayload Request info to subscribe to a WS enpoint
|
||||
type WsRequestPayload struct {
|
||||
Request string `json:"request"`
|
||||
Nonce int64 `json:"nonce"`
|
||||
}
|
||||
|
||||
// WsSubscriptionAcknowledgementResponse The first message you receive acknowledges your subscription
|
||||
type WsSubscriptionAcknowledgementResponse struct {
|
||||
Type string `json:"type"`
|
||||
AccountID int64 `json:"accountId"`
|
||||
SubscriptionID string `json:"subscriptionId"`
|
||||
SymbolFilter []string `json:"symbolFilter"`
|
||||
APISessionFilter []string `json:"apiSessionFilter"`
|
||||
EventTypeFilter []string `json:"eventTypeFilter"`
|
||||
}
|
||||
|
||||
// WsHeartbeatResponse Gemini will send a heartbeat every five seconds so you'll know your WebSocket connection is active.
|
||||
type WsHeartbeatResponse struct {
|
||||
Type string `json:"type"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
Sequence int64 `json:"sequence"`
|
||||
TraceID string `json:"trace_id"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsActiveOrdersResponse contains active orders
|
||||
type WsActiveOrdersResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderRejectedResponse ws response
|
||||
type WsOrderRejectedResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
EventID string `json:"event_id"`
|
||||
Reason string `json:"reason"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderBookedResponse ws response
|
||||
type WsOrderBookedResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
EventID string `json:"event_id"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderFilledResponse ws response
|
||||
type WsOrderFilledResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
Fill WsOrderFilledData `json:"fill"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderFilledData ws response data
|
||||
type WsOrderFilledData struct {
|
||||
TradeID string `json:"trade_id"`
|
||||
Liquidity string `json:"liquidity"`
|
||||
Price float64 `json:"price,string"`
|
||||
Amount float64 `json:"amount,string"`
|
||||
Fee float64 `json:"fee,string"`
|
||||
FeeCurrency string `json:"fee_currency"`
|
||||
}
|
||||
|
||||
// WsOrderCancelledResponse ws response
|
||||
type WsOrderCancelledResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
EventID string `json:"event_id"`
|
||||
CancelCommandID string `json:"cancel_command_id,omitempty"`
|
||||
Reason string `json:"reason"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderCancellationRejectedResponse ws response
|
||||
type WsOrderCancellationRejectedResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
EventID string `json:"event_id"`
|
||||
CancelCommandID string `json:"cancel_command_id"`
|
||||
Reason string `json:"reason"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
// WsOrderClosedResponse ws response
|
||||
type WsOrderClosedResponse struct {
|
||||
Type string `json:"type"`
|
||||
OrderID string `json:"order_id"`
|
||||
EventID string `json:"event_id"`
|
||||
APISession string `json:"api_session"`
|
||||
Symbol currency.Pair `json:"symbol"`
|
||||
Side string `json:"side"`
|
||||
OrderType string `json:"order_type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Timestampms int64 `json:"timestampms"`
|
||||
IsLive bool `json:"is_live"`
|
||||
IsCancelled bool `json:"is_cancelled"`
|
||||
IsHidden bool `json:"is_hidden"`
|
||||
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
|
||||
ExecutedAmount float64 `json:"executed_amount,string"`
|
||||
RemainingAmount float64 `json:"remaining_amount,string"`
|
||||
OriginalAmount float64 `json:"original_amount,string"`
|
||||
Price float64 `json:"price,string"`
|
||||
SocketSequence int64 `json:"socket_sequence"`
|
||||
}
|
||||
|
||||
@@ -14,12 +14,15 @@ import (
|
||||
"github.com/thrasher-/gocryptotrader/currency"
|
||||
exchange "github.com/thrasher-/gocryptotrader/exchanges"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
|
||||
log "github.com/thrasher-/gocryptotrader/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
geminiWebsocketEndpoint = "wss://api.gemini.com/v1/marketdata/%s?%s"
|
||||
geminiWsEvent = "event"
|
||||
geminiWsMarketData = "marketdata"
|
||||
geminiWebsocketEndpoint = "wss://api.gemini.com/v1/"
|
||||
geminiWebsocketSandboxEndpoint = "wss://api.sandbox.gemini.com/v1/"
|
||||
geminiWsEvent = "event"
|
||||
geminiWsMarketData = "marketdata"
|
||||
geminiWsOrderEvents = "order/events"
|
||||
)
|
||||
|
||||
// Instantiates a communications channel between websocket connections
|
||||
@@ -37,12 +40,14 @@ func (g *Gemini) WsConnect() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dialer.Proxy = http.ProxyURL(proxy)
|
||||
}
|
||||
|
||||
go g.WsHandleData()
|
||||
|
||||
err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents)
|
||||
if err != nil {
|
||||
log.Errorf("%v - authentication failed: %v", g.Name, err)
|
||||
}
|
||||
return g.WsSubscribe(&dialer)
|
||||
}
|
||||
|
||||
@@ -52,59 +57,75 @@ func (g *Gemini) WsSubscribe(dialer *websocket.Dialer) error {
|
||||
for i, c := range enabledCurrencies {
|
||||
val := url.Values{}
|
||||
val.Set("heartbeat", "true")
|
||||
|
||||
endpoint := fmt.Sprintf(g.Websocket.GetWebsocketURL(),
|
||||
endpoint := fmt.Sprintf("%s%s/%s?%s",
|
||||
g.WebsocketURL,
|
||||
geminiWsMarketData,
|
||||
c.String(),
|
||||
val.Encode())
|
||||
|
||||
conn, _, err := dialer.Dial(endpoint, http.Header{})
|
||||
conn, conStatus, err := dialer.Dial(endpoint, http.Header{})
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("%v %v %v Error: %v", endpoint, conStatus, conStatus.StatusCode, err)
|
||||
}
|
||||
|
||||
go g.WsReadData(conn, c, geminiWsMarketData)
|
||||
|
||||
go g.WsReadData(conn, c)
|
||||
if len(enabledCurrencies)-1 == i {
|
||||
return nil
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second) // rate limiter, limit of 12 requests per
|
||||
// minute
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WsSecureSubscribe will connect to Gemini's secure endpoint
|
||||
func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error {
|
||||
if !g.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
|
||||
return fmt.Errorf("%v AuthenticatedWebsocketAPISupport not enabled", g.Name)
|
||||
}
|
||||
payload := WsRequestPayload{
|
||||
Request: fmt.Sprintf("/v1/%v", url),
|
||||
Nonce: time.Now().UnixNano(),
|
||||
}
|
||||
PayloadJSON, err := common.JSONEncode(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v sendAuthenticatedHTTPRequest: Unable to JSON request", g.Name)
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%v%v", g.WebsocketURL, url)
|
||||
PayloadBase64 := common.Base64Encode(PayloadJSON)
|
||||
hmac := common.GetHMAC(common.HashSHA512_384, []byte(PayloadBase64), []byte(g.APISecret))
|
||||
headers := http.Header{}
|
||||
headers.Add("Content-Length", "0")
|
||||
headers.Add("Content-Type", "text/plain")
|
||||
headers.Add("X-GEMINI-PAYLOAD", PayloadBase64)
|
||||
headers.Add("X-GEMINI-APIKEY", g.APIKey)
|
||||
headers.Add("X-GEMINI-SIGNATURE", common.HexEncodeToString(hmac))
|
||||
headers.Add("Cache-Control", "no-cache")
|
||||
|
||||
conn, conStatus, err := dialer.Dial(endpoint, headers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v %v %v Error: %v", endpoint, conStatus, conStatus.StatusCode, err)
|
||||
}
|
||||
go g.WsReadData(conn, currency.Pair{})
|
||||
return nil
|
||||
}
|
||||
|
||||
// WsReadData reads from the websocket connection and returns the websocket
|
||||
// response
|
||||
func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair, feedType string) {
|
||||
func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair) {
|
||||
g.Websocket.Wg.Add(1)
|
||||
|
||||
defer func() {
|
||||
err := ws.Close()
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - Unable to to close Websocket connection. Error: %s",
|
||||
err)
|
||||
}
|
||||
g.Websocket.Wg.Done()
|
||||
}()
|
||||
|
||||
defer g.Websocket.Wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-g.Websocket.ShutdownC:
|
||||
return
|
||||
|
||||
default:
|
||||
_, resp, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
return
|
||||
}
|
||||
|
||||
g.Websocket.TrafficAlert <- struct{}{}
|
||||
comms <- ReadData{Raw: resp, Currency: c, FeedType: feedType}
|
||||
comms <- ReadData{Raw: resp, Currency: c}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// WsHandleData handles all the websocket data coming from the websocket
|
||||
@@ -112,119 +133,191 @@ func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair, feedType string
|
||||
func (g *Gemini) WsHandleData() {
|
||||
g.Websocket.Wg.Add(1)
|
||||
defer g.Websocket.Wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-g.Websocket.ShutdownC:
|
||||
return
|
||||
|
||||
case resp := <-comms:
|
||||
switch resp.FeedType {
|
||||
case geminiWsEvent:
|
||||
|
||||
case geminiWsMarketData:
|
||||
var result Response
|
||||
// Gemini likes to send empty arrays
|
||||
if string(resp.Raw) == "[]" {
|
||||
continue
|
||||
}
|
||||
var result map[string]interface{}
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- fmt.Errorf("%v Error: %v, Raw: %v", g.Name, err, string(resp.Raw))
|
||||
continue
|
||||
}
|
||||
switch result["type"] {
|
||||
case "subscription_ack":
|
||||
var result WsSubscriptionAcknowledgementResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
switch result.Type {
|
||||
case "update":
|
||||
if result.Timestamp == 0 && result.TimestampMS == 0 {
|
||||
var bids, asks []orderbook.Item
|
||||
for _, event := range result.Events {
|
||||
if event.Reason != "initial" {
|
||||
g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only")
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Side == "ask" {
|
||||
asks = append(asks, orderbook.Item{
|
||||
Amount: event.Remaining,
|
||||
Price: event.Price,
|
||||
})
|
||||
} else {
|
||||
bids = append(bids, orderbook.Item{
|
||||
Amount: event.Remaining,
|
||||
Price: event.Price,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var newOrderBook orderbook.Base
|
||||
newOrderBook.Asks = asks
|
||||
newOrderBook.Bids = bids
|
||||
newOrderBook.AssetType = "SPOT"
|
||||
newOrderBook.Pair = resp.Currency
|
||||
|
||||
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
|
||||
g.GetName(),
|
||||
false)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
break
|
||||
}
|
||||
|
||||
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency,
|
||||
Asset: "SPOT",
|
||||
Exchange: g.GetName()}
|
||||
|
||||
} else {
|
||||
for _, event := range result.Events {
|
||||
if event.Type == "trade" {
|
||||
g.Websocket.DataHandler <- exchange.TradeData{
|
||||
Timestamp: time.Now(),
|
||||
CurrencyPair: resp.Currency,
|
||||
AssetType: "SPOT",
|
||||
Exchange: g.GetName(),
|
||||
EventTime: result.Timestamp,
|
||||
Price: event.Price,
|
||||
Amount: event.Amount,
|
||||
Side: event.MakerSide,
|
||||
}
|
||||
|
||||
} else {
|
||||
var i orderbook.Item
|
||||
i.Amount = event.Remaining
|
||||
i.Price = event.Price
|
||||
if event.Side == "ask" {
|
||||
err := g.Websocket.Orderbook.Update(nil,
|
||||
[]orderbook.Item{i},
|
||||
resp.Currency,
|
||||
time.Now(),
|
||||
g.GetName(),
|
||||
"SPOT")
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
}
|
||||
} else {
|
||||
err := g.Websocket.Orderbook.Update([]orderbook.Item{i},
|
||||
nil,
|
||||
resp.Currency,
|
||||
time.Now(),
|
||||
g.GetName(),
|
||||
"SPOT")
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency,
|
||||
Asset: "SPOT",
|
||||
Exchange: g.GetName()}
|
||||
}
|
||||
|
||||
case "heartbeat":
|
||||
|
||||
default:
|
||||
g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - unhandled data %s",
|
||||
resp.Raw)
|
||||
g.Websocket.DataHandler <- result
|
||||
case "initial":
|
||||
var result WsSubscriptionAcknowledgementResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "accepted":
|
||||
var result WsActiveOrdersResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "booked":
|
||||
var result WsOrderBookedResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "fill":
|
||||
var result WsOrderFilledResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "cancelled":
|
||||
var result WsOrderCancelledResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "closed":
|
||||
var result WsOrderClosedResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "heartbeat":
|
||||
var result WsHeartbeatResponse
|
||||
err := common.JSONDecode(resp.Raw, &result)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.Websocket.DataHandler <- result
|
||||
case "update":
|
||||
if resp.Currency.IsEmpty() {
|
||||
g.Websocket.DataHandler <- fmt.Errorf("%v - unhandled data %s",
|
||||
g.Name, resp.Raw)
|
||||
continue
|
||||
}
|
||||
var marketUpdate WsMarketUpdateResponse
|
||||
err := common.JSONDecode(resp.Raw, &marketUpdate)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
g.wsProcessUpdate(marketUpdate, resp.Currency)
|
||||
default:
|
||||
g.Websocket.DataHandler <- fmt.Errorf("%v - unhandled data %s",
|
||||
g.Name, resp.Raw)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wsProcessUpdate handles order book data
|
||||
func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pair) {
|
||||
if result.Timestamp == 0 && result.TimestampMS == 0 {
|
||||
var bids, asks []orderbook.Item
|
||||
for _, event := range result.Events {
|
||||
if event.Reason != "initial" {
|
||||
g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only")
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Side == "ask" {
|
||||
asks = append(asks, orderbook.Item{
|
||||
Amount: event.Remaining,
|
||||
Price: event.Price,
|
||||
})
|
||||
} else {
|
||||
bids = append(bids, orderbook.Item{
|
||||
Amount: event.Remaining,
|
||||
Price: event.Price,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var newOrderBook orderbook.Base
|
||||
newOrderBook.Asks = asks
|
||||
newOrderBook.Bids = bids
|
||||
newOrderBook.AssetType = "SPOT"
|
||||
newOrderBook.Pair = pair
|
||||
|
||||
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
|
||||
g.GetName(),
|
||||
false)
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
return
|
||||
}
|
||||
|
||||
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: pair,
|
||||
Asset: "SPOT",
|
||||
Exchange: g.GetName()}
|
||||
} else {
|
||||
for _, event := range result.Events {
|
||||
if event.Type == "trade" {
|
||||
g.Websocket.DataHandler <- exchange.TradeData{
|
||||
Timestamp: time.Now(),
|
||||
CurrencyPair: pair,
|
||||
AssetType: "SPOT",
|
||||
Exchange: g.Name,
|
||||
EventTime: result.Timestamp,
|
||||
Price: event.Price,
|
||||
Amount: event.Amount,
|
||||
Side: event.MakerSide,
|
||||
}
|
||||
|
||||
} else {
|
||||
var i orderbook.Item
|
||||
i.Amount = event.Remaining
|
||||
i.Price = event.Price
|
||||
if event.Side == "ask" {
|
||||
err := g.Websocket.Orderbook.Update(nil,
|
||||
[]orderbook.Item{i},
|
||||
pair,
|
||||
time.Now(),
|
||||
g.GetName(),
|
||||
"SPOT")
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
}
|
||||
} else {
|
||||
err := g.Websocket.Orderbook.Update([]orderbook.Item{i},
|
||||
nil,
|
||||
pair,
|
||||
time.Now(),
|
||||
g.GetName(),
|
||||
"SPOT")
|
||||
if err != nil {
|
||||
g.Websocket.DataHandler <- err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: pair,
|
||||
Asset: "SPOT",
|
||||
Exchange: g.GetName()}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,3 +369,13 @@ func (g *Gemini) SubscribeToWebsocketChannels(channels []exchange.WebsocketChann
|
||||
func (g *Gemini) UnsubscribeToWebsocketChannels(channels []exchange.WebsocketChannelSubscription) error {
|
||||
return common.ErrFunctionNotSupported
|
||||
}
|
||||
|
||||
// GetSubscriptions returns a copied list of subscriptions
|
||||
func (g *Gemini) GetSubscriptions() ([]exchange.WebsocketChannelSubscription, error) {
|
||||
return nil, common.ErrFunctionNotSupported
|
||||
}
|
||||
|
||||
// AuthenticateWebsocket sends an authentication message to the websocket
|
||||
func (g *Gemini) AuthenticateWebsocket() error {
|
||||
return common.ErrFunctionNotSupported
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user