diff --git a/currency/code_types.go b/currency/code_types.go index c689601f..d3906579 100644 --- a/currency/code_types.go +++ b/currency/code_types.go @@ -335,6 +335,7 @@ var ( PHX = NewCode("PHX") GO = NewCode("GO") PAX = NewCode("PAX") + PAXG = NewCode("PAXG") EDO = NewCode("EDO") WINGS = NewCode("WINGS") NAV = NewCode("NAV") @@ -1652,4 +1653,11 @@ var ( ZWD = NewCode("ZWD") XETH = NewCode("XETH") FX_BTC = NewCode("FX_BTC") // nolint // Cryptocurrency code + AAVE = NewCode("AAVE") + YFI = NewCode("YFI") + BAL = NewCode("BAL") + UMA = NewCode("UMA") + SNX = NewCode("SNX") + CRV = NewCode("CRV") + OXT = NewCode("OXT") ) diff --git a/exchanges/gemini/gemini.go b/exchanges/gemini/gemini.go index fce369c8..e314b380 100644 --- a/exchanges/gemini/gemini.go +++ b/exchanges/gemini/gemini.go @@ -14,7 +14,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/common/crypto" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/request" - "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/log" ) @@ -59,9 +58,6 @@ const ( // API keys and change the IsSandbox variable to true. type Gemini struct { exchange.Base - Role string - RequiresHeartBeat bool - connections []stream.Connection } // GetSymbols returns all available symbols for trading diff --git a/exchanges/gemini/gemini_test.go b/exchanges/gemini/gemini_test.go index 0b7e476e..a4ed98e4 100644 --- a/exchanges/gemini/gemini_test.go +++ b/exchanges/gemini/gemini_test.go @@ -2,6 +2,7 @@ package gemini import ( "net/url" + "strings" "testing" "time" @@ -38,6 +39,27 @@ func TestGetSymbols(t *testing.T) { } } +func TestFetchTradablePairs(t *testing.T) { + t.Parallel() + r, err := g.FetchTradablePairs(asset.Spot) + if err != nil { + t.Fatal(err) + } + pairs, err := currency.NewPairsFromStrings(r) + if err != nil { + t.Fatal(err) + } + if !pairs.Contains(currency.NewPair(currency.STORJ, currency.USD), false) { + t.Error("expected pair STORJ-USD") + } + if !pairs.Contains(currency.NewPair(currency.BTC, currency.USD), false) { + t.Error("expected pair BTC-USD") + } + if !pairs.Contains(currency.NewPair(currency.AAVE, currency.USD), false) { + t.Error("expected pair AAVE-BTC") + } +} + func TestGetTicker(t *testing.T) { t.Parallel() _, err := g.GetTicker("BTCUSD") @@ -563,7 +585,7 @@ func TestWsAuth(t *testing.T) { } var dialer websocket.Dialer go g.wsReadData() - err = g.WsSecureSubscribe(&dialer, geminiWsOrderEvents) + err = g.WsAuth(&dialer) if err != nil { t.Error(err) } @@ -580,27 +602,17 @@ func TestWsAuth(t *testing.T) { } func TestWsMissingRole(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } - pressXToJSON := []byte(`{ "result":"error", "reason":"MissingRole", "message":"To access this endpoint, you need to log in to the website and go to the settings page to assign one of these roles [FundManager] to API key wujB3szN54gtJ4QDhqRJ which currently has roles [Trader]" }`) - err = g.wsHandleData(pressXToJSON, pair) - if err == nil { + if err := g.wsHandleData(pressXToJSON); err == nil { t.Error("Expected error") } } func TestWsOrderEventSubscriptionResponse(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`[ { "type" : "accepted", "order_id" : "372456298", @@ -619,7 +631,7 @@ func TestWsOrderEventSubscriptionResponse(t *testing.T) { "original_amount" : "14.0296", "price" : "1059.54" } ]`) - err = g.wsHandleData(pressXToJSON, pair) + err := g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -641,7 +653,7 @@ func TestWsOrderEventSubscriptionResponse(t *testing.T) { "price": "3592.00", "socket_sequence": 13 }]`) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -662,7 +674,7 @@ func TestWsOrderEventSubscriptionResponse(t *testing.T) { "total_spend": "200.00", "socket_sequence": 29 }]`) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -683,7 +695,7 @@ func TestWsOrderEventSubscriptionResponse(t *testing.T) { "original_amount": "25", "socket_sequence": 26 }]`) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -705,17 +717,13 @@ func TestWsOrderEventSubscriptionResponse(t *testing.T) { "original_amount" : "500", "socket_sequence" : 32307 } ]`) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } } func TestWsSubAck(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "subscription_ack", "accountId": 5365, @@ -731,17 +739,12 @@ func TestWsSubAck(t *testing.T) { "closed" ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsHeartbeat(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "heartbeat", "timestampms": 1547742998508, @@ -749,17 +752,12 @@ func TestWsHeartbeat(t *testing.T) { "trace_id": "b8biknoqppr32kc7gfgg", "socket_sequence": 37 }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsUnsubscribe(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "unsubscribe", "subscriptions": [{ @@ -775,17 +773,13 @@ func TestWsUnsubscribe(t *testing.T) { ]} ] }`) - err = g.wsHandleData(pressXToJSON, pair) + err := g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } } func TestWsTradeData(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "update", "eventId": 5375547515, @@ -802,17 +796,12 @@ func TestWsTradeData(t *testing.T) { } ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsAuctionData(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "eventId": 371469414, "socket_sequence":4009, @@ -839,17 +828,12 @@ func TestWsAuctionData(t *testing.T) { ], "type": "update" }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsBlockTrade(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type":"update", "eventId":1111597035, @@ -865,17 +849,27 @@ func TestWsBlockTrade(t *testing.T) { } ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { + t.Error(err) + } +} + +func TestWSTrade(t *testing.T) { + pressXToJSON := []byte(`{ + "type": "trade", + "symbol": "BTCUSD", + "event_id": 3575573053, + "timestamp": 151231241, + "price": "9004.21000000", + "quantity": "0.09110000", + "side": "buy" + }`) + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsCandles(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "candles_15m_updates", "symbol": "BTCUSD", @@ -898,17 +892,12 @@ func TestWsCandles(t *testing.T) { ] ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsAuctions(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "eventId": 372481811, "socket_sequence":23, @@ -925,8 +914,7 @@ func TestWsAuctions(t *testing.T) { ], "type": "update" }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } @@ -950,8 +938,7 @@ func TestWsAuctions(t *testing.T) { } ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } @@ -982,17 +969,12 @@ func TestWsAuctions(t *testing.T) { } ] }`) - err = g.wsHandleData(pressXToJSON, pair) - if err != nil { + if err := g.wsHandleData(pressXToJSON); err != nil { t.Error(err) } } func TestWsMarketData(t *testing.T) { - pair, err := currency.NewPairFromString("BTCUSD") - if err != nil { - t.Fatal(err) - } pressXToJSON := []byte(`{ "type": "update", "eventId": 5375461993, @@ -1016,7 +998,7 @@ func TestWsMarketData(t *testing.T) { } ] } `) - err = g.wsHandleData(pressXToJSON, pair) + err := g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -1044,7 +1026,7 @@ func TestWsMarketData(t *testing.T) { } ] } `) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } @@ -1066,12 +1048,109 @@ func TestWsMarketData(t *testing.T) { } ] } `) - err = g.wsHandleData(pressXToJSON, pair) + err = g.wsHandleData(pressXToJSON) if err != nil { t.Error(err) } } +func TestWsError(t *testing.T) { + tt := []struct { + Data []byte + ErrorExpected bool + ErrorShouldContain string + }{ + { + Data: []byte(`{"type": "test"}`), + ErrorExpected: false, + }, + { + Data: []byte(`{"result": "bla"}`), + ErrorExpected: false, + }, + { + Data: []byte(`{"result": "error"}`), + ErrorExpected: true, + ErrorShouldContain: "Unhandled websocket error", + }, + { + Data: []byte(`{"result": "error","reason": "InvalidJson"}`), + ErrorExpected: true, + ErrorShouldContain: "InvalidJson", + }, + { + Data: []byte(`{"result": "error","reason": "InvalidJson", "message": "WeAreGoingToTheMoonKirby"}`), + ErrorExpected: true, + ErrorShouldContain: "InvalidJson - WeAreGoingToTheMoonKirby", + }, + } + + for x := range tt { + err := g.wsHandleData(tt[x].Data) + if tt[x].ErrorExpected && err != nil && !strings.Contains(err.Error(), tt[x].ErrorShouldContain) { + t.Errorf("expected error to contain: %s, got: %s", + tt[x].ErrorShouldContain, err.Error(), + ) + } else if !tt[x].ErrorExpected && err != nil { + t.Errorf("unexpected error: %s", err) + } + } +} + +func TestWsLevel2Update(t *testing.T) { + pressXToJSON := []byte(`{ + "type": "l2_updates", + "symbol": "BTCUSD", + "changes": [ + [ + "buy", + "9122.04", + "0.00121425" + ], + [ + "sell", + "9122.07", + "0.98942292" + ] + ], + "trades": [{ + "type": "trade", + "symbol": "BTCUSD", + "event_id": 169841458, + "timestamp": 1560976400428, + "price": "9122.04", + "quantity": "0.0073173", + "side": "sell" + }], + "auction_events": [{ + "type": "auction_result", + "symbol": "BTCUSD", + "time_ms": 1560974400000, + "result": "success", + "highest_bid_price": "9150.80", + "lowest_ask_price": "9150.81", + "collar_price": "9146.93", + "auction_price": "9145.00", + "auction_quantity": "470.10390845" + }, + { + "type": "auction_indicative", + "symbol": "BTCUSD", + "time_ms": 1560974385000, + "result": "success", + "highest_bid_price": "9150.80", + "lowest_ask_price": "9150.81", + "collar_price": "9146.84", + "auction_price": "9134.04", + "auction_quantity": "389.3094317" + } + ] + }`) + if err := g.wsHandleData(pressXToJSON); err != nil { + t.Error(err) + } +} + func TestResponseToStatus(t *testing.T) { type TestCases struct { Case string diff --git a/exchanges/gemini/gemini_types.go b/exchanges/gemini/gemini_types.go index d0cc07cd..db13b085 100644 --- a/exchanges/gemini/gemini_types.go +++ b/exchanges/gemini/gemini_types.go @@ -2,6 +2,17 @@ package gemini import "github.com/thrasher-corp/gocryptotrader/currency" +const ( + marketDataLevel2 = "l2" + candles1m = "candles_1m" + candles5m = "candles_5m" + candles15m = "candles_15m" + candles30m = "candles_30m" + candles1hr = "candles_1h" + candles6hr = "candles_6h" + candles1d = "candles_1d" +) + // Ticker holds returned ticker data from the exchange type Ticker struct { Ask float64 `json:"ask,string"` @@ -217,40 +228,6 @@ type ErrorCapture struct { Message string `json:"message"` } -// WsResponse generic response -type WsResponse struct { - Type string `json:"type"` -} - -// WsMarketUpdateResponse defines the main response type -type WsMarketUpdateResponse struct { - Type string `json:"type"` - EventID int64 `json:"eventId"` - Timestamp int64 `json:"timestamp"` - TimestampMS int64 `json:"timestampms"` - SocketSequence int64 `json:"socket_sequence"` - Events []Event `json:"events"` -} - -// Event defines orderbook and trade data -type Event struct { - Type string `json:"type"` - Reason string `json:"reason"` - Price float64 `json:"price,string"` - Delta float64 `json:"delta,string"` - Remaining float64 `json:"remaining,string"` - Side string `json:"side"` - MakerSide string `json:"makerSide"` - ID int64 `json:"tid"` - Amount float64 `json:"amount,string"` -} - -// ReadData defines read data from the websocket connection -type ReadData struct { - Raw []byte - Currency currency.Pair -} - // WsRequestPayload Request info to subscribe to a WS enpoint type WsRequestPayload struct { Request string `json:"request"` @@ -311,16 +288,48 @@ type WsOrderFilledData struct { FeeCurrency string `json:"fee_currency"` } -type wsUnsubscribeResponse struct { - Type string `json:"type"` - Subscriptions []struct { - Name string `json:"name"` - Symbols []string `json:"symbols"` - } `json:"subscriptions"` -} - type wsCandleResponse struct { Type string `json:"type"` Symbol string `json:"symbol"` Changes [][]float64 `json:"changes"` } + +type wsSubscriptions struct { + Name string `json:"name"` + Symbols []string `json:"symbols"` +} + +type wsSubscribeRequest struct { + Type string `json:"type"` + Subscriptions []wsSubscriptions `json:"subscriptions"` +} + +type wsTrade struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + EventID int64 `json:"event_id"` + Timestamp int64 `json:"timestamp"` + Price float64 `json:"price,string"` + Quantity float64 `json:"quantity,string"` + Side string `json:"side"` +} + +type wsAuctionResult struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + Result string `json:"result"` + TimeMilliseconds int64 `json:"time_ms"` + HighestBidPrice float64 `json:"highest_bid_price,string"` + LowestBidPrice float64 `json:"lowest_ask_price,string"` + CollarPrice float64 `json:"collar_price,string"` + AuctionPrice float64 `json:"auction_price,string"` + AuctionQuantity float64 `json:"auction_quantity,string"` +} + +type wsL2MarketData struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + Changes [][3]string `json:"changes"` + Trades []wsTrade `json:"trades"` + AuctionEvents []wsAuctionResult `json:"auction_events"` +} diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index bf302647..446e697c 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -7,12 +7,12 @@ import ( "errors" "fmt" "net/http" - "net/url" "strconv" "strings" "time" "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" @@ -26,16 +26,14 @@ import ( ) const ( - geminiWebsocketEndpoint = "wss://api.gemini.com/v1/" + geminiWebsocketEndpoint = "wss://api.gemini.com" geminiWebsocketSandboxEndpoint = "wss://api.sandbox.gemini.com/v1/" geminiWsMarketData = "marketdata" geminiWsOrderEvents = "order/events" ) // Instantiates a communications channel between websocket connections -var comms = make(chan ReadData) -var responseMaxLimit time.Duration -var responseCheckTimeout time.Duration +var comms = make(chan stream.Response) // WsConnect initiates a websocket connection func (g *Gemini) WsConnect() error { @@ -44,69 +42,145 @@ func (g *Gemini) WsConnect() error { } var dialer websocket.Dialer - if g.Websocket.GetProxyAddress() != "" { - proxy, err := url.Parse(g.Websocket.GetProxyAddress()) - if err != nil { - return err - } - dialer.Proxy = http.ProxyURL(proxy) - } - - go g.wsReadData() - err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents) - if err != nil { - log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", g.Name, err) - } - return g.WsSubscribe(&dialer) -} - -// WsSubscribe subscribes to the full websocket suite on gemini exchange -func (g *Gemini) WsSubscribe(dialer *websocket.Dialer) error { - enabledCurrencies, err := g.GetEnabledPairs(asset.Spot) + err := g.Websocket.Conn.Dial(&dialer, http.Header{}) if err != nil { return err } - for i := range enabledCurrencies { - val := url.Values{} - val.Set("heartbeat", "true") - val.Set("bids", "true") - val.Set("offers", "true") - val.Set("trades", "true") - wsEndpoint, err := g.API.Endpoints.GetURL(exchange.WebsocketSpot) + + go g.wsReadData() + go g.wsFunnelConnectionData(g.Websocket.Conn) + + if g.Websocket.CanUseAuthenticatedEndpoints() { + err := g.WsAuth(&dialer) if err != nil { - return err + log.Errorf(log.ExchangeSys, "%v - websocket authentication failed: %v\n", g.Name, err) + g.Websocket.SetCanUseAuthenticatedEndpoints(false) } - endpoint := fmt.Sprintf("%s%s/%s?%s", - wsEndpoint, - geminiWsMarketData, - enabledCurrencies[i].String(), - val.Encode()) - connection := &stream.WebsocketConnection{ - ExchangeName: g.Name, - URL: endpoint, - Verbose: g.Verbose, - ResponseMaxLimit: responseMaxLimit, - Traffic: g.Websocket.TrafficAlert, - Match: g.Websocket.Match, - } - err = connection.Dial(dialer, http.Header{}) - if err != nil { - return fmt.Errorf("%v Websocket connection %v error. Error %v", - g.Name, endpoint, err) - } - g.connections = append(g.connections, connection) - go g.wsFunnelConnectionData(connection, enabledCurrencies[i]) } return nil } -// WsSecureSubscribe will connect to Gemini's secure endpoint -func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error { +// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() +func (g *Gemini) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { + // See gemini_types.go for more subscription/candle vars + var channels = []string{ + marketDataLevel2, + candles1d, + } + + pairs, err := g.GetEnabledPairs(asset.Spot) + if err != nil { + return nil, err + } + + var subscriptions []stream.ChannelSubscription + for x := range channels { + for y := range pairs { + subscriptions = append(subscriptions, stream.ChannelSubscription{ + Channel: channels[x], + Currency: pairs[y], + Asset: asset.Spot, + }) + } + } + return subscriptions, nil +} + +// Subscribe sends a websocket message to receive data from the channel +func (g *Gemini) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { + var channels []string + for x := range channelsToSubscribe { + if common.StringDataCompareInsensitive(channels, channelsToSubscribe[x].Channel) { + continue + } + channels = append(channels, channelsToSubscribe[x].Channel) + } + + var pairs currency.Pairs + for x := range channelsToSubscribe { + if pairs.Contains(channelsToSubscribe[x].Currency, true) { + continue + } + pairs = append(pairs, channelsToSubscribe[x].Currency) + } + + fmtPairs, err := g.FormatExchangeCurrencies(pairs, asset.Spot) + if err != nil { + return err + } + + var subs []wsSubscriptions + for x := range channels { + subs = append(subs, wsSubscriptions{ + Name: channels[x], + Symbols: strings.Split(fmtPairs, ","), + }) + } + + wsSub := wsSubscribeRequest{ + Type: "subscribe", + Subscriptions: subs, + } + err = g.Websocket.Conn.SendJSONMessage(wsSub) + if err != nil { + return err + } + + g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) + return nil +} + +// Unsubscribe sends a websocket message to stop receiving data from the channel +func (g *Gemini) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { + var channels []string + for x := range channelsToUnsubscribe { + if common.StringDataCompareInsensitive(channels, channelsToUnsubscribe[x].Channel) { + continue + } + channels = append(channels, channelsToUnsubscribe[x].Channel) + } + + var pairs currency.Pairs + for x := range channelsToUnsubscribe { + if pairs.Contains(channelsToUnsubscribe[x].Currency, true) { + continue + } + pairs = append(pairs, channelsToUnsubscribe[x].Currency) + } + + fmtPairs, err := g.FormatExchangeCurrencies(pairs, asset.Spot) + if err != nil { + return err + } + + var subs []wsSubscriptions + for x := range channels { + subs = append(subs, wsSubscriptions{ + Name: channels[x], + Symbols: strings.Split(fmtPairs, ","), + }) + } + + wsSub := wsSubscribeRequest{ + Type: "unsubscribe", + Subscriptions: subs, + } + err = g.Websocket.Conn.SendJSONMessage(wsSub) + if err != nil { + return err + } + + g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToUnsubscribe...) + return nil +} + +// WsAuth will connect to Gemini's secure endpoint +func (g *Gemini) WsAuth(dialer *websocket.Dialer) error { if !g.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) { return fmt.Errorf("%v AuthenticatedWebsocketAPISupport not enabled", g.Name) } payload := WsRequestPayload{ - Request: fmt.Sprintf("/v1/%v", url), + Request: "/v1/" + geminiWsOrderEvents, Nonce: time.Now().UnixNano(), } PayloadJSON, err := json.Marshal(payload) @@ -117,7 +191,7 @@ func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error { if err != nil { return err } - endpoint := wsEndpoint + url + endpoint := wsEndpoint + geminiWsOrderEvents PayloadBase64 := crypto.Base64Encode(PayloadJSON) hmac := crypto.GetHMAC(crypto.HashSHA512_384, []byte(PayloadBase64), []byte(g.API.Credentials.Secret)) headers := http.Header{} @@ -128,23 +202,16 @@ func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error { headers.Add("X-GEMINI-SIGNATURE", crypto.HexEncodeToString(hmac)) headers.Add("Cache-Control", "no-cache") - g.Websocket.AuthConn = &stream.WebsocketConnection{ - ExchangeName: g.Name, - URL: endpoint, - Verbose: g.Verbose, - ResponseMaxLimit: responseMaxLimit, - Match: g.Websocket.Match, - } err = g.Websocket.AuthConn.Dial(dialer, headers) if err != nil { return fmt.Errorf("%v Websocket connection %v error. Error %v", g.Name, endpoint, err) } - go g.wsFunnelConnectionData(g.Websocket.AuthConn, currency.Pair{}) + go g.wsFunnelConnectionData(g.Websocket.AuthConn) return nil } // wsFunnelConnectionData receives data from multiple connections and passes it to wsReadData -func (g *Gemini) wsFunnelConnectionData(ws stream.Connection, c currency.Pair) { +func (g *Gemini) wsFunnelConnectionData(ws stream.Connection) { g.Websocket.Wg.Add(1) defer g.Websocket.Wg.Done() for { @@ -152,7 +219,7 @@ func (g *Gemini) wsFunnelConnectionData(ws stream.Connection, c currency.Pair) { if resp.Raw == nil { return } - comms <- ReadData{Raw: resp.Raw, Currency: c} + comms <- stream.Response{Raw: resp.Raw} } } @@ -163,21 +230,9 @@ func (g *Gemini) wsReadData() { for { select { case <-g.Websocket.ShutdownC: - for i := range g.connections { - err := g.connections[i].Shutdown() - if err != nil { - log.Errorln(log.ExchangeSys, err) - } - g.connections[i] = nil - } - g.connections = nil return case resp := <-comms: - // Gemini likes to send empty arrays - if string(resp.Raw) == "[]" { - continue - } - err := g.wsHandleData(resp.Raw, resp.Currency) + err := g.wsHandleData(resp.Raw) if err != nil { g.Websocket.DataHandler <- err } @@ -185,7 +240,7 @@ func (g *Gemini) wsReadData() { } } -func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { +func (g *Gemini) wsHandleData(respRaw []byte) error { // only order details are sent in arrays if strings.HasPrefix(string(respRaw), "[") { var result []WsOrderResponse @@ -222,20 +277,21 @@ func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { } } - p, err := currency.NewPairFromString(result[i].Symbol) - if err != nil { - g.Websocket.DataHandler <- order.ClassificationError{ - Exchange: g.Name, - OrderID: result[i].OrderID, - Err: err, - } - } - - var a asset.Item - a, err = g.GetPairAssetType(p) + enabledPairs, err := g.GetAvailablePairs(asset.Spot) if err != nil { return err } + + format, err := g.GetPairFormat(asset.Spot, true) + if err != nil { + return err + } + + pair, err := currency.NewPairFromFormattedPairs(result[i].Symbol, enabledPairs, format) + if err != nil { + return err + } + g.Websocket.DataHandler <- &order.Detail{ HiddenOrder: result[i].IsHidden, Price: result[i].Price, @@ -247,9 +303,9 @@ func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { Type: oType, Side: oSide, Status: oStatus, - AssetType: a, + AssetType: asset.Spot, Date: time.Unix(0, result[i].Timestampms*int64(time.Millisecond)), - Pair: p, + Pair: pair, } } return nil @@ -261,15 +317,61 @@ func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { } if _, ok := result["type"]; ok { switch result["type"] { - case "subscription_ack": - var result WsSubscriptionAcknowledgementResponse + case "l2_updates": + var l2MarketData *wsL2MarketData + err := json.Unmarshal(respRaw, &l2MarketData) + if err != nil { + return err + } + return g.wsProcessUpdate(l2MarketData) + case "trade": + if !g.IsSaveTradeDataEnabled() { + return nil + } + + var result wsTrade err := json.Unmarshal(respRaw, &result) if err != nil { return err } - g.Websocket.DataHandler <- result - case "unsubscribe": - var result wsUnsubscribeResponse + + tSide, err := order.StringToOrderSide(result.Side) + if err != nil { + g.Websocket.DataHandler <- order.ClassificationError{ + Exchange: g.Name, + Err: err, + } + } + + enabledPairs, err := g.GetEnabledPairs(asset.Spot) + if err != nil { + return err + } + + format, err := g.GetPairFormat(asset.Spot, true) + if err != nil { + return err + } + + pair, err := currency.NewPairFromFormattedPairs(result.Symbol, enabledPairs, format) + if err != nil { + return err + } + + tradeEvent := trade.Data{ + Timestamp: time.Unix(result.Timestamp/1000, 0), + CurrencyPair: pair, + AssetType: asset.Spot, + Exchange: g.Name, + Price: result.Price, + Amount: result.Quantity, + Side: tSide, + TID: strconv.FormatInt(result.EventID, 10), + } + + return trade.AddTradesToBuffer(g.Name, tradeEvent) + case "subscription_ack": + var result WsSubscriptionAcknowledgementResponse err := json.Unmarshal(respRaw, &result) if err != nil { return err @@ -284,17 +386,6 @@ func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { g.Websocket.DataHandler <- result case "heartbeat": return nil - case "update": - if curr.IsEmpty() { - return fmt.Errorf("%v - `update` response error. Currency is empty %s", - g.Name, respRaw) - } - var marketUpdate WsMarketUpdateResponse - err := json.Unmarshal(respRaw, &marketUpdate) - if err != nil { - return err - } - g.wsProcessUpdate(marketUpdate, curr) case "candles_1m_updates", "candles_5m_updates", "candles_15m_updates", @@ -303,36 +394,54 @@ func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error { "candles_6h_updates", "candles_1d_updates": var candle wsCandleResponse - err := json.Unmarshal(respRaw, &result) + err := json.Unmarshal(respRaw, &candle) if err != nil { return err } + enabledPairs, err := g.GetEnabledPairs(asset.Spot) + if err != nil { + return err + } + + format, err := g.GetPairFormat(asset.Spot, true) + if err != nil { + return err + } + + pair, err := currency.NewPairFromFormattedPairs(candle.Symbol, enabledPairs, format) + if err != nil { + return err + } + for i := range candle.Changes { + if len(candle.Changes[i]) != 6 { + continue + } g.Websocket.DataHandler <- stream.KlineData{ - Timestamp: time.Unix(int64(candle.Changes[i][0])*1000, 0), - Pair: curr, + Timestamp: time.Unix(int64(candle.Changes[i][0])/1000, 0), + Pair: pair, AssetType: asset.Spot, Exchange: g.Name, Interval: result["type"].(string), OpenPrice: candle.Changes[i][1], - ClosePrice: candle.Changes[i][4], HighPrice: candle.Changes[i][2], LowPrice: candle.Changes[i][3], + ClosePrice: candle.Changes[i][4], Volume: candle.Changes[i][5], } } - default: g.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: g.Name + stream.UnhandledMessage + string(respRaw)} return nil } - } else if _, ok := result["result"]; ok { - switch result["result"].(string) { + } else if r, ok := result["result"].(string); ok { + switch r { case "error": - if _, ok := result["reason"]; ok { - if _, ok := result["message"]; ok { - return errors.New(result["reason"].(string) + " - " + result["message"].(string)) + if reason, ok := result["reason"].(string); ok { + if msg, ok := result["message"].(string); ok { + reason += " - " + msg } + return errors.New(reason) } return fmt.Errorf("%v Unhandled websocket error %s", g.Name, respRaw) default: @@ -375,29 +484,45 @@ func stringToOrderType(oType string) (order.Type, error) { } } -// wsProcessUpdate handles order book data -func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pair) { - if result.Timestamp == 0 && result.TimestampMS == 0 { - var bids, asks []orderbook.Item - for i := range result.Events { - if result.Events[i].Reason != "initial" { - g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only") - continue - } - if result.Events[i].Side == "ask" { - asks = append(asks, orderbook.Item{ - Amount: result.Events[i].Remaining, - Price: result.Events[i].Price, - }) - } else { - bids = append(bids, orderbook.Item{ - Amount: result.Events[i].Remaining, - Price: result.Events[i].Price, - }) - } - } +func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error { + isInitial := len(result.Changes) > 0 && len(result.Trades) > 0 + enabledPairs, err := g.GetEnabledPairs(asset.Spot) + if err != nil { + return err + } - orderbook.Reverse(bids) // Correct bid alignment + format, err := g.GetPairFormat(asset.Spot, true) + if err != nil { + return err + } + + pair, err := currency.NewPairFromFormattedPairs(result.Symbol, enabledPairs, format) + if err != nil { + return err + } + + var bids, asks []orderbook.Item + for x := range result.Changes { + price, err := strconv.ParseFloat(result.Changes[x][1], 64) + if err != nil { + return err + } + amount, err := strconv.ParseFloat(result.Changes[x][2], 64) + if err != nil { + return err + } + obItem := orderbook.Item{ + Amount: amount, + Price: price, + } + if result.Changes[x][0] == "buy" { + bids = append(bids, obItem) + continue + } + asks = append(asks, obItem) + } + + if isInitial { var newOrderBook orderbook.Base newOrderBook.Asks = asks newOrderBook.Bids = bids @@ -407,64 +532,51 @@ func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pa newOrderBook.VerificationBypass = g.OrderbookVerificationBypass err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { - g.Websocket.DataHandler <- err - return + return err } } else { - var asks, bids []orderbook.Item - var trades []trade.Data - for i := range result.Events { - switch result.Events[i].Type { - case "trade": - tSide, err := order.StringToOrderSide(result.Events[i].MakerSide) - if err != nil { - g.Websocket.DataHandler <- order.ClassificationError{ - Exchange: g.Name, - Err: err, - } - } - trades = append(trades, trade.Data{ - Timestamp: time.Unix(0, result.TimestampMS*int64(time.Millisecond)), - CurrencyPair: pair, - AssetType: asset.Spot, - Exchange: g.Name, - Price: result.Events[i].Price, - Amount: result.Events[i].Amount, - Side: tSide, - TID: strconv.FormatInt(result.Events[i].ID, 10), - }) - case "change": - item := orderbook.Item{ - Amount: result.Events[i].Remaining, - Price: result.Events[i].Price, - } - if strings.EqualFold(result.Events[i].Side, order.Ask.String()) { - asks = append(asks, item) - } else { - bids = append(bids, item) - } - default: - g.Websocket.DataHandler <- fmt.Errorf("%s - Unhandled websocket update: %+v", g.Name, result) - } - } - if len(trades) > 0 && g.IsSaveTradeDataEnabled() { - err := trade.AddTradesToBuffer(g.Name, trades...) - if err != nil { - g.Websocket.DataHandler <- err - } - } if len(asks) == 0 && len(bids) == 0 { - return + return nil } err := g.Websocket.Orderbook.Update(&buffer.Update{ - Asks: asks, - Bids: bids, - Pair: pair, - UpdateTime: time.Unix(0, result.TimestampMS*int64(time.Millisecond)), - Asset: asset.Spot, + Asks: asks, + Bids: bids, + Pair: pair, + Asset: asset.Spot, }) if err != nil { - g.Websocket.DataHandler <- fmt.Errorf("%v %v", g.Name, err) + return err } } + + if len(result.AuctionEvents) > 0 { + g.Websocket.DataHandler <- result.AuctionEvents + } + + if !g.IsSaveTradeDataEnabled() { + return nil + } + + var trades []trade.Data + for x := range result.Trades { + tSide, err := order.StringToOrderSide(result.Trades[x].Side) + if err != nil { + g.Websocket.DataHandler <- order.ClassificationError{ + Exchange: g.Name, + Err: err, + } + } + trades = append(trades, trade.Data{ + Timestamp: time.Unix(result.Trades[x].Timestamp/1000, 0), + CurrencyPair: pair, + AssetType: asset.Spot, + Exchange: g.Name, + Price: result.Trades[x].Price, + Amount: result.Trades[x].Quantity, + Side: tSide, + TID: strconv.FormatInt(result.Trades[x].EventID, 10), + }) + } + + return trade.AddTradesToBuffer(g.Name, trades...) } diff --git a/exchanges/gemini/gemini_wrapper.go b/exchanges/gemini/gemini_wrapper.go index 95e1a66b..22c9aa6a 100644 --- a/exchanges/gemini/gemini_wrapper.go +++ b/exchanges/gemini/gemini_wrapper.go @@ -59,8 +59,14 @@ func (g *Gemini) SetDefaults() { g.API.CredentialsValidator.RequiresKey = true g.API.CredentialsValidator.RequiresSecret = true - requestFmt := ¤cy.PairFormat{Uppercase: true} - configFmt := ¤cy.PairFormat{Uppercase: true} + requestFmt := ¤cy.PairFormat{ + Uppercase: true, + Separator: ",", + } + configFmt := ¤cy.PairFormat{ + Uppercase: true, + Delimiter: currency.DashDelimiter, + } err := g.SetGlobalPairsManager(requestFmt, configFmt, asset.Spot) if err != nil { log.Errorln(log.ExchangeSys, err) @@ -93,6 +99,8 @@ func (g *Gemini) SetDefaults() { AuthenticatedEndpoints: true, MessageSequenceNumbers: true, KlineFetching: true, + Subscribe: true, + Unsubscribe: true, }, WithdrawPermissions: exchange.AutoWithdrawCryptoWithAPIPermission | exchange.AutoWithdrawCryptoWithSetup | @@ -144,7 +152,7 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error { return err } - return g.Websocket.Setup(&stream.WebsocketSetup{ + err = g.Websocket.Setup(&stream.WebsocketSetup{ Enabled: exch.Features.Enabled.Websocket, Verbose: exch.Verbose, AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, @@ -153,10 +161,31 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error { ExchangeName: exch.Name, RunningURL: wsRunningURL, Connector: g.WsConnect, + Subscriber: g.Subscribe, + UnSubscriber: g.Unsubscribe, + GenerateSubscriptions: g.GenerateDefaultSubscriptions, Features: &g.Features.Supports.WebsocketCapabilities, OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit, BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled, - SortBuffer: true, + }) + if err != nil { + return err + } + + err = g.Websocket.SetupNewConnection(stream.ConnectionSetup{ + ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, + ResponseMaxLimit: exch.WebsocketResponseMaxLimit, + URL: geminiWebsocketEndpoint + "/v2/" + geminiWsMarketData, + }) + if err != nil { + return err + } + + return g.Websocket.SetupNewConnection(stream.ConnectionSetup{ + ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout, + ResponseMaxLimit: exch.WebsocketResponseMaxLimit, + URL: geminiWebsocketEndpoint + "/v1/" + geminiWsOrderEvents, + Authenticated: true, }) } @@ -175,25 +204,92 @@ func (g *Gemini) Run() { g.PrintEnabledPairs() } - if !g.GetEnabledFeatures().AutoPairUpdates { + forceUpdate := false + format, err := g.GetPairFormat(asset.Spot, false) + if err != nil { + log.Errorf(log.ExchangeSys, "%s failed to get enabled currencies. Err %s\n", + g.Name, + err) return } - err := g.UpdateTradablePairs(false) + enabled, err := g.CurrencyPairs.GetPairs(asset.Spot, true) if err != nil { - log.Errorf(log.ExchangeSys, "%s failed to update tradable pairs. Err: %s", g.Name, err) + log.Errorf(log.ExchangeSys, "%s failed to get enabled currencies. Err %s\n", + g.Name, + err) + return + } + + avail, err := g.CurrencyPairs.GetPairs(asset.Spot, false) + if err != nil { + log.Errorf(log.ExchangeSys, "%s failed to get available currencies. Err %s\n", + g.Name, + err) + return + } + + if !common.StringDataContains(enabled.Strings(), format.Delimiter) || + !common.StringDataContains(avail.Strings(), format.Delimiter) { + var enabledPairs currency.Pairs + enabledPairs, err = currency.NewPairsFromStrings([]string{ + currency.BTC.String() + format.Delimiter + currency.USD.String()}) + if err != nil { + log.Errorf(log.ExchangeSys, "%s failed to update currencies. Err %s\n", + g.Name, + err) + } else { + log.Warn(log.ExchangeSys, + "Available pairs for Gemini reset due to config upgrade, please enable the ones you would like to use again") + forceUpdate = true + + err = g.UpdatePairs(enabledPairs, asset.Spot, true, true) + if err != nil { + log.Errorf(log.ExchangeSys, + "%s failed to update currencies. Err: %s\n", + g.Name, + err) + } + } + } + + if !g.GetEnabledFeatures().AutoPairUpdates && !forceUpdate { + return + } + err = g.UpdateTradablePairs(forceUpdate) + if err != nil { + log.Errorf(log.ExchangeSys, + "%s failed to update tradable pairs. Err: %s", + g.Name, + err) } } // FetchTradablePairs returns a list of the exchanges tradable pairs func (g *Gemini) FetchTradablePairs(asset asset.Item) ([]string, error) { - return g.GetSymbols() + pairs, err := g.GetSymbols() + if err != nil { + return nil, err + } + + var tradablePairs []string + for x := range pairs { + switch len(pairs[x]) { + case 8: + tradablePairs = append(tradablePairs, pairs[x][0:5]+currency.DashDelimiter+pairs[x][5:]) + case 7: + tradablePairs = append(tradablePairs, pairs[x][0:4]+currency.DashDelimiter+pairs[x][4:]) + default: + tradablePairs = append(tradablePairs, pairs[x][0:3]+currency.DashDelimiter+pairs[x][3:]) + } + } + return tradablePairs, nil } // UpdateTradablePairs updates the exchanges available pairs and stores // them in the exchanges config func (g *Gemini) UpdateTradablePairs(forceUpdate bool) error { - pairs, err := g.GetSymbols() + pairs, err := g.FetchTradablePairs(asset.Spot) if err != nil { return err } @@ -563,7 +659,12 @@ func (g *Gemini) GetActiveOrders(req *order.GetOrdersRequest) ([]order.Detail, e return nil, err } - format, err := g.GetPairFormat(asset.Spot, false) + availPairs, err := g.GetAvailablePairs(asset.Spot) + if err != nil { + return nil, err + } + + format, err := g.GetPairFormat(asset.Spot, true) if err != nil { return nil, err } @@ -571,7 +672,7 @@ func (g *Gemini) GetActiveOrders(req *order.GetOrdersRequest) ([]order.Detail, e var orders []order.Detail for i := range resp { var symbol currency.Pair - symbol, err = currency.NewPairDelimiter(resp[i].Symbol, format.Delimiter) + symbol, err = currency.NewPairFromFormattedPairs(resp[i].Symbol, availPairs, format) if err != nil { return nil, err } diff --git a/go.sum b/go.sum index 62131640..ed5e0a6d 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8= -github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= diff --git a/testdata/http_mock/gemini/gemini.json b/testdata/http_mock/gemini/gemini.json index d7854ec0..97e06e59 100644 --- a/testdata/http_mock/gemini/gemini.json +++ b/testdata/http_mock/gemini/gemini.json @@ -2135,8 +2135,16 @@ { "data": [ "btcusd", + "btcdai", + "btcgbp", + "btceur", + "btcsgd", "ethbtc", "ethusd", + "ethgbp", + "etheur", + "ethsgd", + "ethdai", "bchusd", "bchbtc", "bcheth", @@ -2148,7 +2156,34 @@ "zecbtc", "zeceth", "zecbch", - "zecltc" + "zecltc", + "batusd", + "batbtc", + "bateth", + "linkusd", + "linkbtc", + "linketh", + "daiusd", + "oxtusd", + "oxtbtc", + "oxteth", + "filusd", + "ampusd", + "paxgusd", + "compusd", + "mkrusd", + "zrxusd", + "kncusd", + "storjusd", + "manausd", + "aaveusd", + "snxusd", + "yfiusd", + "umausd", + "balusd", + "crvusd", + "renusd", + "uniusd" ], "queryString": "", "bodyParams": "",