Merge branch 'master' into engine

This commit is contained in:
Adrian Gallagher
2019-06-21 18:10:55 +10:00
87 changed files with 5669 additions and 992 deletions

View File

@@ -3,11 +3,14 @@ package gemini
import (
"net/url"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/config"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/sharedtestvalues"
)
// Please enter sandbox API keys & assigned roles for better testing procedures
@@ -62,6 +65,7 @@ func TestSetup(t *testing.T) {
}
geminiConfig.API.AuthenticatedSupport = true
geminiConfig.API.AuthenticatedWebsocketSupport = true
Session[1].Setup(geminiConfig)
Session[2].Setup(geminiConfig)
@@ -559,3 +563,32 @@ func TestGetDepositAddress(t *testing.T) {
t.Error("Test Failed - GetDepositAddress error cannot be nil")
}
}
// TestWsAuth dials websocket, sends login request.
func TestWsAuth(t *testing.T) {
TestAddSession(t)
TestSetDefaults(t)
TestSetup(t)
g := Session[1]
g.API.Endpoints.WebsocketURL = geminiWebsocketSandboxEndpoint
if !g.Websocket.IsEnabled() && !g.API.AuthenticatedWebsocketSupport || !areTestAPIKeysSet() {
t.Skip(exchange.WebsocketNotEnabled)
}
var dialer websocket.Dialer
go g.WsHandleData()
err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents)
if err != nil {
t.Error(err)
}
timer := time.NewTimer(sharedtestvalues.WebsocketResponseDefaultTimeout)
select {
case resp := <-g.Websocket.DataHandler:
if resp.(WsSubscriptionAcknowledgementResponse).Type != "subscription_ack" {
t.Error("Login failed")
}
case <-timer.C:
t.Error("Expected response")
}
timer.Stop()
}

View File

@@ -195,8 +195,13 @@ type ErrorCapture struct {
Message string `json:"message"`
}
// Response defines the main response type
type Response struct {
// WsResponse generic response
type WsResponse struct {
Type string `json:"type"`
}
// WsMarketUpdateResponse defines the main response type
type WsMarketUpdateResponse struct {
Type string `json:"type"`
EventID int64 `json:"eventId"`
Timestamp int64 `json:"timestamp"`
@@ -221,5 +226,192 @@ type Event struct {
type ReadData struct {
Raw []byte
Currency currency.Pair
FeedType string
}
// WsRequestPayload Request info to subscribe to a WS enpoint
type WsRequestPayload struct {
Request string `json:"request"`
Nonce int64 `json:"nonce"`
}
// WsSubscriptionAcknowledgementResponse The first message you receive acknowledges your subscription
type WsSubscriptionAcknowledgementResponse struct {
Type string `json:"type"`
AccountID int64 `json:"accountId"`
SubscriptionID string `json:"subscriptionId"`
SymbolFilter []string `json:"symbolFilter"`
APISessionFilter []string `json:"apiSessionFilter"`
EventTypeFilter []string `json:"eventTypeFilter"`
}
// WsHeartbeatResponse Gemini will send a heartbeat every five seconds so you'll know your WebSocket connection is active.
type WsHeartbeatResponse struct {
Type string `json:"type"`
Timestampms int64 `json:"timestampms"`
Sequence int64 `json:"sequence"`
TraceID string `json:"trace_id"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsActiveOrdersResponse contains active orders
type WsActiveOrdersResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderRejectedResponse ws response
type WsOrderRejectedResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
EventID string `json:"event_id"`
Reason string `json:"reason"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderBookedResponse ws response
type WsOrderBookedResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
EventID string `json:"event_id"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderFilledResponse ws response
type WsOrderFilledResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
Fill WsOrderFilledData `json:"fill"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderFilledData ws response data
type WsOrderFilledData struct {
TradeID string `json:"trade_id"`
Liquidity string `json:"liquidity"`
Price float64 `json:"price,string"`
Amount float64 `json:"amount,string"`
Fee float64 `json:"fee,string"`
FeeCurrency string `json:"fee_currency"`
}
// WsOrderCancelledResponse ws response
type WsOrderCancelledResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
EventID string `json:"event_id"`
CancelCommandID string `json:"cancel_command_id,omitempty"`
Reason string `json:"reason"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderCancellationRejectedResponse ws response
type WsOrderCancellationRejectedResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
EventID string `json:"event_id"`
CancelCommandID string `json:"cancel_command_id"`
Reason string `json:"reason"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}
// WsOrderClosedResponse ws response
type WsOrderClosedResponse struct {
Type string `json:"type"`
OrderID string `json:"order_id"`
EventID string `json:"event_id"`
APISession string `json:"api_session"`
Symbol currency.Pair `json:"symbol"`
Side string `json:"side"`
OrderType string `json:"order_type"`
Timestamp string `json:"timestamp"`
Timestampms int64 `json:"timestampms"`
IsLive bool `json:"is_live"`
IsCancelled bool `json:"is_cancelled"`
IsHidden bool `json:"is_hidden"`
AvgExecutionPrice float64 `json:"avg_execution_price,string"`
ExecutedAmount float64 `json:"executed_amount,string"`
RemainingAmount float64 `json:"remaining_amount,string"`
OriginalAmount float64 `json:"original_amount,string"`
Price float64 `json:"price,string"`
SocketSequence int64 `json:"socket_sequence"`
}

View File

@@ -11,16 +11,20 @@ import (
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/common/crypto"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/asset"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
log "github.com/thrasher-/gocryptotrader/logger"
)
const (
geminiWebsocketEndpoint = "wss://api.gemini.com/v1/marketdata/%s?%s"
geminiWsEvent = "event"
geminiWsMarketData = "marketdata"
geminiWebsocketEndpoint = "wss://api.gemini.com/v1/"
geminiWebsocketSandboxEndpoint = "wss://api.sandbox.gemini.com/v1/"
geminiWsEvent = "event"
geminiWsMarketData = "marketdata"
geminiWsOrderEvents = "order/events"
)
// Instantiates a communications channel between websocket connections
@@ -38,12 +42,14 @@ func (g *Gemini) WsConnect() error {
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
go g.WsHandleData()
err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents)
if err != nil {
log.Errorf("%v - authentication failed: %v", g.Name, err)
}
return g.WsSubscribe(&dialer)
}
@@ -53,59 +59,76 @@ func (g *Gemini) WsSubscribe(dialer *websocket.Dialer) error {
for i, c := range enabledCurrencies {
val := url.Values{}
val.Set("heartbeat", "true")
endpoint := fmt.Sprintf(g.Websocket.GetWebsocketURL(),
endpoint := fmt.Sprintf("%s%s/%s?%s",
g.API.Endpoints.WebsocketURL,
geminiWsMarketData,
c.String(),
val.Encode())
conn, _, err := dialer.Dial(endpoint, http.Header{})
conn, conStatus, err := dialer.Dial(endpoint, http.Header{})
if err != nil {
return err
return fmt.Errorf("%s websocket endpoint: %v Status: %v Error: %v", g.Name,
endpoint, conStatus, err)
}
go g.WsReadData(conn, c, geminiWsMarketData)
go g.WsReadData(conn, c)
if len(enabledCurrencies)-1 == i {
return nil
}
time.Sleep(5 * time.Second) // rate limiter, limit of 12 requests per
// minute
}
return nil
}
// WsSecureSubscribe will connect to Gemini's secure endpoint
func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error {
if !g.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
return fmt.Errorf("%v AuthenticatedWebsocketAPISupport not enabled", g.Name)
}
payload := WsRequestPayload{
Request: fmt.Sprintf("/v1/%v", url),
Nonce: time.Now().UnixNano(),
}
PayloadJSON, err := common.JSONEncode(payload)
if err != nil {
return fmt.Errorf("%v sendAuthenticatedHTTPRequest: Unable to JSON request", g.Name)
}
endpoint := fmt.Sprintf("%v%v", g.API.Endpoints.WebsocketURL, url)
PayloadBase64 := crypto.Base64Encode(PayloadJSON)
hmac := crypto.GetHMAC(crypto.HashSHA512_384, []byte(PayloadBase64), []byte(g.API.Credentials.Secret))
headers := http.Header{}
headers.Add("Content-Length", "0")
headers.Add("Content-Type", "text/plain")
headers.Add("X-GEMINI-PAYLOAD", PayloadBase64)
headers.Add("X-GEMINI-APIKEY", g.API.Credentials.Key)
headers.Add("X-GEMINI-SIGNATURE", crypto.HexEncodeToString(hmac))
headers.Add("Cache-Control", "no-cache")
conn, conStatus, err := dialer.Dial(endpoint, headers)
if err != nil {
return fmt.Errorf("%v %v %v Error: %v", endpoint, conStatus, conStatus.StatusCode, err)
}
go g.WsReadData(conn, currency.Pair{})
return nil
}
// WsReadData reads from the websocket connection and returns the websocket
// response
func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair, feedType string) {
func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair) {
g.Websocket.Wg.Add(1)
defer func() {
err := ws.Close()
if err != nil {
g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - Unable to to close Websocket connection. Error: %s",
err)
}
g.Websocket.Wg.Done()
}()
defer g.Websocket.Wg.Done()
for {
select {
case <-g.Websocket.ShutdownC:
return
default:
_, resp, err := ws.ReadMessage()
if err != nil {
g.Websocket.DataHandler <- err
return
}
g.Websocket.TrafficAlert <- struct{}{}
comms <- ReadData{Raw: resp, Currency: c, FeedType: feedType}
comms <- ReadData{Raw: resp, Currency: c}
}
}
}
// WsHandleData handles all the websocket data coming from the websocket
@@ -113,120 +136,191 @@ func (g *Gemini) WsReadData(ws *websocket.Conn, c currency.Pair, feedType string
func (g *Gemini) WsHandleData() {
g.Websocket.Wg.Add(1)
defer g.Websocket.Wg.Done()
for {
select {
case <-g.Websocket.ShutdownC:
return
case resp := <-comms:
switch resp.FeedType {
case geminiWsEvent:
case geminiWsMarketData:
var result Response
// Gemini likes to send empty arrays
if string(resp.Raw) == "[]" {
continue
}
var result map[string]interface{}
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- fmt.Errorf("%v Error: %v, Raw: %v", g.Name, err, string(resp.Raw))
continue
}
switch result["type"] {
case "subscription_ack":
var result WsSubscriptionAcknowledgementResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
switch result.Type {
case "update":
if result.Timestamp == 0 && result.TimestampMS == 0 {
var bids, asks []orderbook.Item
for _, event := range result.Events {
if event.Reason != "initial" {
g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only")
continue
}
if event.Side == "ask" {
asks = append(asks, orderbook.Item{
Amount: event.Remaining,
Price: event.Price,
})
} else {
bids = append(bids, orderbook.Item{
Amount: event.Remaining,
Price: event.Price,
})
}
}
var newOrderBook orderbook.Base
newOrderBook.Asks = asks
newOrderBook.Bids = bids
newOrderBook.AssetType = asset.Spot
newOrderBook.LastUpdated = time.Now()
newOrderBook.Pair = resp.Currency
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
g.GetName(),
false)
if err != nil {
g.Websocket.DataHandler <- err
break
}
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency,
Asset: asset.Spot,
Exchange: g.GetName()}
} else {
for _, event := range result.Events {
if event.Type == "trade" {
g.Websocket.DataHandler <- exchange.TradeData{
Timestamp: time.Now(),
CurrencyPair: resp.Currency,
AssetType: asset.Spot,
Exchange: g.GetName(),
EventTime: result.Timestamp,
Price: event.Price,
Amount: event.Amount,
Side: event.MakerSide,
}
} else {
var i orderbook.Item
i.Amount = event.Remaining
i.Price = event.Price
if event.Side == "ask" {
err := g.Websocket.Orderbook.Update(nil,
[]orderbook.Item{i},
resp.Currency,
time.Now(),
g.GetName(),
asset.Spot)
if err != nil {
g.Websocket.DataHandler <- err
}
} else {
err := g.Websocket.Orderbook.Update([]orderbook.Item{i},
nil,
resp.Currency,
time.Now(),
g.GetName(),
asset.Spot)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
}
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency,
Asset: asset.Spot,
Exchange: g.GetName()}
}
case "heartbeat":
default:
g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - unhandled data %s",
resp.Raw)
g.Websocket.DataHandler <- result
case "initial":
var result WsSubscriptionAcknowledgementResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "accepted":
var result WsActiveOrdersResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "booked":
var result WsOrderBookedResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "fill":
var result WsOrderFilledResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "cancelled":
var result WsOrderCancelledResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "closed":
var result WsOrderClosedResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "heartbeat":
var result WsHeartbeatResponse
err := common.JSONDecode(resp.Raw, &result)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.Websocket.DataHandler <- result
case "update":
if resp.Currency.IsEmpty() {
g.Websocket.DataHandler <- fmt.Errorf("%v - unhandled data %s",
g.Name, resp.Raw)
continue
}
var marketUpdate WsMarketUpdateResponse
err := common.JSONDecode(resp.Raw, &marketUpdate)
if err != nil {
g.Websocket.DataHandler <- err
continue
}
g.wsProcessUpdate(marketUpdate, resp.Currency)
default:
g.Websocket.DataHandler <- fmt.Errorf("%v - unhandled data %s",
g.Name, resp.Raw)
}
}
}
}
// wsProcessUpdate handles order book data
func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pair) {
if result.Timestamp == 0 && result.TimestampMS == 0 {
var bids, asks []orderbook.Item
for _, event := range result.Events {
if event.Reason != "initial" {
g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only")
continue
}
if event.Side == "ask" {
asks = append(asks, orderbook.Item{
Amount: event.Remaining,
Price: event.Price,
})
} else {
bids = append(bids, orderbook.Item{
Amount: event.Remaining,
Price: event.Price,
})
}
}
var newOrderBook orderbook.Base
newOrderBook.Asks = asks
newOrderBook.Bids = bids
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = pair
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook,
g.GetName(),
false)
if err != nil {
g.Websocket.DataHandler <- err
return
}
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: pair,
Asset: asset.Spot,
Exchange: g.GetName()}
} else {
for _, event := range result.Events {
if event.Type == "trade" {
g.Websocket.DataHandler <- exchange.TradeData{
Timestamp: time.Now(),
CurrencyPair: pair,
AssetType: asset.Spot,
Exchange: g.Name,
EventTime: result.Timestamp,
Price: event.Price,
Amount: event.Amount,
Side: event.MakerSide,
}
} else {
var i orderbook.Item
i.Amount = event.Remaining
i.Price = event.Price
if event.Side == "ask" {
err := g.Websocket.Orderbook.Update(nil,
[]orderbook.Item{i},
pair,
time.Now(),
g.GetName(),
asset.Spot)
if err != nil {
g.Websocket.DataHandler <- err
}
} else {
err := g.Websocket.Orderbook.Update([]orderbook.Item{i},
nil,
pair,
time.Now(),
g.GetName(),
asset.Spot)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
}
g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: pair,
Asset: asset.Spot,
Exchange: g.GetName()}
}
}

View File

@@ -480,3 +480,13 @@ func (g *Gemini) SubscribeToWebsocketChannels(channels []exchange.WebsocketChann
func (g *Gemini) UnsubscribeToWebsocketChannels(channels []exchange.WebsocketChannelSubscription) error {
return common.ErrFunctionNotSupported
}
// GetSubscriptions returns a copied list of subscriptions
func (g *Gemini) GetSubscriptions() ([]exchange.WebsocketChannelSubscription, error) {
return nil, common.ErrFunctionNotSupported
}
// AuthenticateWebsocket sends an authentication message to the websocket
func (g *Gemini) AuthenticateWebsocket() error {
return common.ErrFunctionNotSupported
}