Files
gocryptotrader/exchanges/gemini/gemini_websocket.go
Adam 504c2fad6d Feature: Implement funding rates, futures and coin margin (exchange API coverage) (#530)
* ALMOST THERE

* more api wips

* more api thingz

* testing n more api wipz

* more apiz

* more wips

* what is goin on

* more wips

* whip n testing

* testing

* testing

no keys

* remove log

* kraken is broken

ugh

* still broken

* fixing auth funcs + usdtm api docs

* wip

* api stuffs

* whip

* more wips

* whip

* more wip

* api wip n testing

* wip

* wip

* unsaved

* wip n testing

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* whip

* wrapper authenticated functions

* adding asset type and fixing dependencies

* wip

* binance auth wrapper start

* wrapper functionality

* wip

* wip

* wip

* wrapper cancel functions

* order submission for wrappers

* wip

* more error fixing and nits

* websocket beginning n error fix

* wip

* WOW

* glorious n shazzy nits

* useless nits

* wip

* fixing things

* merge stuffs

* crapveyor

* crapveyor rebuild

* probably broke more things than he fixed

* rm lns n other thangs

* hope

* please

* stop it

* done

* ofcourse

* rm vb

* fix lbank

* appveyor please

* float lev

* DONT ASK RYAN FOR HELP EVER

* wip

* wip

* endpoint upgrades continued

* path upgrade

* NeeeNeeeNeeeNeeeNING

* fix stuffs

* fixing time issue

* fixing broken funcs

* glorious nits

* shaz changes

* fixing errors for fundmon

* more error fixing for fundmon

* test running past 30s

* basic changes

* THX AGAIN SHAZBERT

* path system upgrade

* config upgrade

* unsaved stuffs

* broken wip config upgrade

* path system upgrade contd.

* path system upgrade contd

* path upgrade ready for review

* testing verbose removed

* linter stuffs

* appveyor stuffs

* appveyor stuff

* fixed?

* bugfix

* wip

* broken stuff

* fix test

* wierd hack fix

* appveyor pls stop

* error found

* more useless nits

* bitmex err

* broken wip

* broken wip path upgrade change to uint32

* changed url lookups to uint

* WOW

* ready4review

* config fixed HOPEFULLY

* config fix and glorious changes

* efficient way of getting orders and open orders

* binance wrapper logic fixing

* testing, adding tests and fixing lot of errrrrs

* merge master

* appveyor stuffs

* appveyor stuffs

* fmt

* test

* octalLiteral issue fix?

* octalLiteral fix?

* rm vb

* prnt ln to restart

* adding testz

* test fixzzz

* READY FOR REVIEW

* Actually ready now

* FORMATTING

* addressing shazzy n glorious nits

* crapveyor

* rm vb

* small change

* fixing err

* shazbert nits

* review changes

* requested changes

* more requested changes

* noo

* last nit fixes

* restart appveyor

* improving test cov

* Update .golangci.yml

* shazbert changes

* moving pair formatting

* format pair update wip

* path upgrade complete

* error fix

* appveyor linters

* more linters

* remove testexch

* more formatting changes

* changes

* shazbert changes

* checking older requested changes to ensure completion

* wip

* fixing broken code

* error fix

* all fixed

* additional changes

* more changes

* remove commented code

* ftx margin api

* appveyor fixes

* more appveyor issues + test addition

* more appveyor issues + test addition

* remove unnecessary

* testing

* testing, fixing okex api, error fix

* git merge fix

* go sum

* glorious changes and error fix

* rm vb

* more glorious changes and go mod tidy

* fixed now

* okex testing upgrade

* old config migration and batch fetching fix

* added test

* glorious requested changes WIP

* tested and fixed

* go fmted

* go fmt and test fix

* additional funcs and tests for fundingRates

* OKEX tested and fixed

* appveyor fixes

* ineff assign

* 1 glorious change

* error fix

* typo

* shazbert changes

* glorious code changes and path fixing huobi WIP

* adding assetType to accountinfo functions

* fixing panic

* panic fix and updating account info wrappers WIP

* updateaccountinfo updated

* testing WIP binance USDT n Coin Margined and Kraken Futures

* auth functions tested and fixed

* added test

* config reverted

* shazbert and glorious changes

* shazbert and glorious changes

* latest changes and portfolio update

* go fmt change:

* remove commented codes

* improved error checking

* index out of range fix

* rm ln

* critical nit

* glorious latest changes

* appveyor changes

* shazbert change

* easier readability

* latest glorious changes

* shadow dec

* assetstore updated

* last change

* another last change

* merge changes

* go mod tidy

* thrasher requested changes wip

* improving struct layouts

* appveyor go fmt

* remove unnecessary code

* shazbert changes

* small change

* oopsie

* tidy

* configtest reverted

* error fix

* oopsie

* for what

* test patch fix

* insecurities

* fixing tests

* fix config
2021-02-12 16:19:18 +11:00

471 lines
14 KiB
Go

// Package gemini exchange documentation can be found at
// https://docs.sandbox.gemini.com
package gemini
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
geminiWebsocketEndpoint = "wss://api.gemini.com/v1/"
geminiWebsocketSandboxEndpoint = "wss://api.sandbox.gemini.com/v1/"
geminiWsMarketData = "marketdata"
geminiWsOrderEvents = "order/events"
)
// Instantiates a communications channel between websocket connections
var comms = make(chan ReadData)
var responseMaxLimit time.Duration
var responseCheckTimeout time.Duration
// WsConnect initiates a websocket connection
func (g *Gemini) WsConnect() error {
if !g.Websocket.IsEnabled() || !g.IsEnabled() {
return errors.New(stream.WebsocketNotEnabled)
}
var dialer websocket.Dialer
if g.Websocket.GetProxyAddress() != "" {
proxy, err := url.Parse(g.Websocket.GetProxyAddress())
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
go g.wsReadData()
err := g.WsSecureSubscribe(&dialer, geminiWsOrderEvents)
if err != nil {
log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", g.Name, err)
}
return g.WsSubscribe(&dialer)
}
// WsSubscribe subscribes to the full websocket suite on gemini exchange
func (g *Gemini) WsSubscribe(dialer *websocket.Dialer) error {
enabledCurrencies, err := g.GetEnabledPairs(asset.Spot)
if err != nil {
return err
}
for i := range enabledCurrencies {
val := url.Values{}
val.Set("heartbeat", "true")
val.Set("bids", "true")
val.Set("offers", "true")
val.Set("trades", "true")
wsEndpoint, err := g.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
endpoint := fmt.Sprintf("%s%s/%s?%s",
wsEndpoint,
geminiWsMarketData,
enabledCurrencies[i].String(),
val.Encode())
connection := &stream.WebsocketConnection{
ExchangeName: g.Name,
URL: endpoint,
Verbose: g.Verbose,
ResponseMaxLimit: responseMaxLimit,
Traffic: g.Websocket.TrafficAlert,
Match: g.Websocket.Match,
}
err = connection.Dial(dialer, http.Header{})
if err != nil {
return fmt.Errorf("%v Websocket connection %v error. Error %v",
g.Name, endpoint, err)
}
g.connections = append(g.connections, connection)
go g.wsFunnelConnectionData(connection, enabledCurrencies[i])
}
return nil
}
// WsSecureSubscribe will connect to Gemini's secure endpoint
func (g *Gemini) WsSecureSubscribe(dialer *websocket.Dialer, url string) error {
if !g.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
return fmt.Errorf("%v AuthenticatedWebsocketAPISupport not enabled", g.Name)
}
payload := WsRequestPayload{
Request: fmt.Sprintf("/v1/%v", url),
Nonce: time.Now().UnixNano(),
}
PayloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("%v sendAuthenticatedHTTPRequest: Unable to JSON request", g.Name)
}
wsEndpoint, err := g.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
endpoint := wsEndpoint + url
PayloadBase64 := crypto.Base64Encode(PayloadJSON)
hmac := crypto.GetHMAC(crypto.HashSHA512_384, []byte(PayloadBase64), []byte(g.API.Credentials.Secret))
headers := http.Header{}
headers.Add("Content-Length", "0")
headers.Add("Content-Type", "text/plain")
headers.Add("X-GEMINI-PAYLOAD", PayloadBase64)
headers.Add("X-GEMINI-APIKEY", g.API.Credentials.Key)
headers.Add("X-GEMINI-SIGNATURE", crypto.HexEncodeToString(hmac))
headers.Add("Cache-Control", "no-cache")
g.Websocket.AuthConn = &stream.WebsocketConnection{
ExchangeName: g.Name,
URL: endpoint,
Verbose: g.Verbose,
ResponseMaxLimit: responseMaxLimit,
Match: g.Websocket.Match,
}
err = g.Websocket.AuthConn.Dial(dialer, headers)
if err != nil {
return fmt.Errorf("%v Websocket connection %v error. Error %v", g.Name, endpoint, err)
}
go g.wsFunnelConnectionData(g.Websocket.AuthConn, currency.Pair{})
return nil
}
// wsFunnelConnectionData receives data from multiple connections and passes it to wsReadData
func (g *Gemini) wsFunnelConnectionData(ws stream.Connection, c currency.Pair) {
g.Websocket.Wg.Add(1)
defer g.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
if resp.Raw == nil {
return
}
comms <- ReadData{Raw: resp.Raw, Currency: c}
}
}
// wsReadData receives and passes on websocket messages for processing
func (g *Gemini) wsReadData() {
g.Websocket.Wg.Add(1)
defer g.Websocket.Wg.Done()
for {
select {
case <-g.Websocket.ShutdownC:
for i := range g.connections {
err := g.connections[i].Shutdown()
if err != nil {
log.Errorln(log.ExchangeSys, err)
}
g.connections[i] = nil
}
g.connections = nil
return
case resp := <-comms:
// Gemini likes to send empty arrays
if string(resp.Raw) == "[]" {
continue
}
err := g.wsHandleData(resp.Raw, resp.Currency)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
}
func (g *Gemini) wsHandleData(respRaw []byte, curr currency.Pair) error {
// only order details are sent in arrays
if strings.HasPrefix(string(respRaw), "[") {
var result []WsOrderResponse
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
for i := range result {
oSide, err := order.StringToOrderSide(result[i].Side)
if err != nil {
g.Websocket.DataHandler <- order.ClassificationError{
Exchange: g.Name,
OrderID: result[i].OrderID,
Err: err,
}
}
var oType order.Type
oType, err = stringToOrderType(result[i].OrderType)
if err != nil {
g.Websocket.DataHandler <- order.ClassificationError{
Exchange: g.Name,
OrderID: result[i].OrderID,
Err: err,
}
}
var oStatus order.Status
oStatus, err = stringToOrderStatus(result[i].Type)
if err != nil {
g.Websocket.DataHandler <- order.ClassificationError{
Exchange: g.Name,
OrderID: result[i].OrderID,
Err: err,
}
}
p, err := currency.NewPairFromString(result[i].Symbol)
if err != nil {
g.Websocket.DataHandler <- order.ClassificationError{
Exchange: g.Name,
OrderID: result[i].OrderID,
Err: err,
}
}
var a asset.Item
a, err = g.GetPairAssetType(p)
if err != nil {
return err
}
g.Websocket.DataHandler <- &order.Detail{
HiddenOrder: result[i].IsHidden,
Price: result[i].Price,
Amount: result[i].OriginalAmount,
ExecutedAmount: result[i].ExecutedAmount,
RemainingAmount: result[i].RemainingAmount,
Exchange: g.Name,
ID: result[i].OrderID,
Type: oType,
Side: oSide,
Status: oStatus,
AssetType: a,
Date: time.Unix(0, result[i].Timestampms*int64(time.Millisecond)),
Pair: p,
}
}
return nil
}
var result map[string]interface{}
err := json.Unmarshal(respRaw, &result)
if err != nil {
return fmt.Errorf("%v Error: %v, Raw: %v", g.Name, err, string(respRaw))
}
if _, ok := result["type"]; ok {
switch result["type"] {
case "subscription_ack":
var result WsSubscriptionAcknowledgementResponse
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
g.Websocket.DataHandler <- result
case "unsubscribe":
var result wsUnsubscribeResponse
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
g.Websocket.DataHandler <- result
case "initial":
var result WsSubscriptionAcknowledgementResponse
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
g.Websocket.DataHandler <- result
case "heartbeat":
return nil
case "update":
if curr.IsEmpty() {
return fmt.Errorf("%v - `update` response error. Currency is empty %s",
g.Name, respRaw)
}
var marketUpdate WsMarketUpdateResponse
err := json.Unmarshal(respRaw, &marketUpdate)
if err != nil {
return err
}
g.wsProcessUpdate(marketUpdate, curr)
case "candles_1m_updates",
"candles_5m_updates",
"candles_15m_updates",
"candles_30m_updates",
"candles_1h_updates",
"candles_6h_updates",
"candles_1d_updates":
var candle wsCandleResponse
err := json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
for i := range candle.Changes {
g.Websocket.DataHandler <- stream.KlineData{
Timestamp: time.Unix(int64(candle.Changes[i][0])*1000, 0),
Pair: curr,
AssetType: asset.Spot,
Exchange: g.Name,
Interval: result["type"].(string),
OpenPrice: candle.Changes[i][1],
ClosePrice: candle.Changes[i][4],
HighPrice: candle.Changes[i][2],
LowPrice: candle.Changes[i][3],
Volume: candle.Changes[i][5],
}
}
default:
g.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: g.Name + stream.UnhandledMessage + string(respRaw)}
return nil
}
} else if _, ok := result["result"]; ok {
switch result["result"].(string) {
case "error":
if _, ok := result["reason"]; ok {
if _, ok := result["message"]; ok {
return errors.New(result["reason"].(string) + " - " + result["message"].(string))
}
}
return fmt.Errorf("%v Unhandled websocket error %s", g.Name, respRaw)
default:
g.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: g.Name + stream.UnhandledMessage + string(respRaw)}
return nil
}
}
return nil
}
func stringToOrderStatus(status string) (order.Status, error) {
switch status {
case "accepted":
return order.New, nil
case "booked":
return order.Active, nil
case "fill":
return order.Filled, nil
case "cancelled":
return order.Cancelled, nil
case "cancel_rejected":
return order.Rejected, nil
case "closed":
return order.Filled, nil
default:
return order.UnknownStatus, errors.New(status + " not recognised as order status")
}
}
func stringToOrderType(oType string) (order.Type, error) {
switch oType {
case "exchange limit", "auction-only limit", "indication-of-interest limit":
return order.Limit, nil
case "market buy", "market sell", "block_trade":
// block trades are conducted off order-book, so their type is market,
// but would be considered a hidden trade
return order.Market, nil
default:
return order.UnknownType, errors.New(oType + " not recognised as order type")
}
}
// wsProcessUpdate handles order book data
func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pair) {
if result.Timestamp == 0 && result.TimestampMS == 0 {
var bids, asks []orderbook.Item
for i := range result.Events {
if result.Events[i].Reason != "initial" {
g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only")
continue
}
if result.Events[i].Side == "ask" {
asks = append(asks, orderbook.Item{
Amount: result.Events[i].Remaining,
Price: result.Events[i].Price,
})
} else {
bids = append(bids, orderbook.Item{
Amount: result.Events[i].Remaining,
Price: result.Events[i].Price,
})
}
}
orderbook.Reverse(bids) // Correct bid alignment
var newOrderBook orderbook.Base
newOrderBook.Asks = asks
newOrderBook.Bids = bids
newOrderBook.AssetType = asset.Spot
newOrderBook.Pair = pair
newOrderBook.ExchangeName = g.Name
newOrderBook.VerificationBypass = g.OrderbookVerificationBypass
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
g.Websocket.DataHandler <- err
return
}
} else {
var asks, bids []orderbook.Item
var trades []trade.Data
for i := range result.Events {
switch result.Events[i].Type {
case "trade":
tSide, err := order.StringToOrderSide(result.Events[i].MakerSide)
if err != nil {
g.Websocket.DataHandler <- order.ClassificationError{
Exchange: g.Name,
Err: err,
}
}
trades = append(trades, trade.Data{
Timestamp: time.Unix(0, result.TimestampMS*int64(time.Millisecond)),
CurrencyPair: pair,
AssetType: asset.Spot,
Exchange: g.Name,
Price: result.Events[i].Price,
Amount: result.Events[i].Amount,
Side: tSide,
TID: strconv.FormatInt(result.Events[i].ID, 10),
})
case "change":
item := orderbook.Item{
Amount: result.Events[i].Remaining,
Price: result.Events[i].Price,
}
if strings.EqualFold(result.Events[i].Side, order.Ask.String()) {
asks = append(asks, item)
} else {
bids = append(bids, item)
}
default:
g.Websocket.DataHandler <- fmt.Errorf("%s - Unhandled websocket update: %+v", g.Name, result)
}
}
if len(trades) > 0 && g.IsSaveTradeDataEnabled() {
err := trade.AddTradesToBuffer(g.Name, trades...)
if err != nil {
g.Websocket.DataHandler <- err
}
}
if len(asks) == 0 && len(bids) == 0 {
return
}
err := g.Websocket.Orderbook.Update(&buffer.Update{
Asks: asks,
Bids: bids,
Pair: pair,
UpdateTime: time.Unix(0, result.TimestampMS*int64(time.Millisecond)),
Asset: asset.Spot,
})
if err != nil {
g.Websocket.DataHandler <- fmt.Errorf("%v %v", g.Name, err)
}
}
}