From 41415ca3b9256158382a85ce18a8bc4ad22cbba6 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 23 Jan 2019 14:23:11 +1100 Subject: [PATCH] Websocket update increasing exchange coverage and bug fixes (#233) Websocket update increasing exchange coverage and bug fixes --- CONTRIBUTORS | 6 +- LICENSE | 2 +- README.md | 28 +- config/config.go | 2 +- .../currencyconverterapi.go | 8 +- exchanges/alphapoint/alphapoint.go | 4 +- exchanges/alphapoint/alphapoint_wrapper.go | 2 +- exchanges/anx/anx.go | 6 +- exchanges/binance/binance.go | 7 +- exchanges/binance/binance_websocket.go | 44 +-- exchanges/bitfinex/bitfinex.go | 6 +- exchanges/bitfinex/bitfinex_websocket.go | 47 ++- exchanges/bitflyer/bitflyer.go | 3 +- exchanges/bithumb/bithumb.go | 3 +- exchanges/bithumb/bithumb_wrapper.go | 2 +- exchanges/bitmex/bitmex.go | 7 +- exchanges/bitmex/bitmex_websocket.go | 77 +++-- exchanges/bitstamp/bitstamp.go | 5 +- exchanges/bitstamp/bitstamp_websocket.go | 4 +- exchanges/bittrex/bittrex.go | 3 +- exchanges/btcc/btcc_websocket.go | 58 ++-- exchanges/btcmarkets/btcmarkets.go | 3 +- exchanges/coinbasepro/coinbasepro.go | 5 +- .../coinbasepro/coinbasepro_websocket.go | 37 +- exchanges/coinut/coinut.go | 6 +- exchanges/coinut/coinut_websocket.go | 37 +- exchanges/exchange_websocket.go | 97 +++++- exchanges/exchange_websocket_test.go | 31 +- exchanges/exmo/exmo_wrapper.go | 2 +- exchanges/gateio/gateio.go | 17 +- exchanges/gateio/gateio_types.go | 49 +++ exchanges/gateio/gateio_websocket.go | 327 ++++++++++++++++++ exchanges/gateio/gateio_wrapper.go | 2 +- exchanges/gemini/gemini.go | 16 +- exchanges/gemini/gemini_types.go | 31 ++ exchanges/gemini/gemini_websocket.go | 232 +++++++++++++ exchanges/gemini/gemini_wrapper.go | 2 +- exchanges/hitbtc/hitbtc.go | 5 +- exchanges/hitbtc/hitbtc_websocket.go | 61 ++-- exchanges/huobi/huobi.go | 7 +- exchanges/huobi/huobi_websocket.go | 78 ++--- exchanges/huobihadax/huobihadax.go | 16 +- exchanges/huobihadax/huobihadax_types.go | 71 ++++ exchanges/huobihadax/huobihadax_websocket.go | 275 +++++++++++++++ exchanges/huobihadax/huobihadax_wrapper.go | 2 +- exchanges/itbit/itbit.go | 3 +- exchanges/itbit/itbit_wrapper.go | 2 +- exchanges/kraken/kraken.go | 5 +- exchanges/kraken/kraken_wrapper.go | 2 +- exchanges/lakebtc/lakebtc.go | 3 +- exchanges/lakebtc/lakebtc_wrapper.go | 3 +- exchanges/liqui/liqui.go | 3 +- exchanges/liqui/liqui_wrapper.go | 2 +- exchanges/localbitcoins/localbitcoins.go | 3 +- .../localbitcoins/localbitcoins_wrapper.go | 2 +- exchanges/okcoin/okcoin.go | 6 +- exchanges/okcoin/okcoin_websocket.go | 53 ++- exchanges/okex/okex.go | 7 +- exchanges/okex/okex_websocket.go | 102 +++--- exchanges/poloniex/poloniex.go | 6 +- exchanges/poloniex/poloniex_websocket.go | 70 ++-- exchanges/wex/wex.go | 3 +- exchanges/yobit/yobit.go | 3 +- exchanges/yobit/yobit_wrapper.go | 2 +- exchanges/zb/zb.go | 16 +- exchanges/zb/{zb_type.go => zb_types.go} | 0 exchanges/zb/zb_websocket.go | 311 +++++++++++++++++ exchanges/zb/zb_websocket_types.go | 57 +++ exchanges/zb/zb_wrapper.go | 2 +- logger/logger.go | 4 +- routines.go | 2 + .../root_templates/root_readme.tmpl | 16 +- 72 files changed, 1982 insertions(+), 439 deletions(-) create mode 100644 exchanges/gateio/gateio_websocket.go create mode 100644 exchanges/gemini/gemini_websocket.go create mode 100644 exchanges/huobihadax/huobihadax_websocket.go rename exchanges/zb/{zb_type.go => zb_types.go} (100%) create mode 100644 exchanges/zb/zb_websocket.go create mode 100644 exchanges/zb/zb_websocket_types.go diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 550c16bc..ab7aafa6 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -8,6 +8,7 @@ ermalguni | https://github.com/ermalguni marcofranssen | https://github.com/marcofranssen cranktakular | https://github.com/cranktakular crackcomm | https://github.com/crackcomm +xtda | https://github.com/xtda andreygrehov | https://github.com/andreygrehov bretep | https://github.com/bretep gam-phon | https://github.com/gam-phon @@ -15,11 +16,12 @@ cornelk | https://github.com/cornelk if1live | https://github.com/if1live soxipy | https://github.com/soxipy herenow | https://github.com/herenow -xtda | https://github.com/xtda blombard | https://github.com/blombard CodeLingoBot | https://github.com/CodeLingoBot +Daanikus | https://github.com/Daanikus daniel-cohen | https://github.com/daniel-cohen frankzougc | https://github.com/frankzougc +woshidama323 | https://github.com/woshidama323 starit | https://github.com/starit Jimexist | https://github.com/Jimexist lookfirst | https://github.com/lookfirst @@ -28,6 +30,4 @@ mattkanwisher | https://github.com/mattkanwisher mKurrels | https://github.com/mKurrels m1kola | https://github.com/m1kola cavapoo2 | https://github.com/cavapoo2 -tongxiaofeng | https://github.com/tongxiaofeng -idealhack | https://github.com/idealhack diff --git a/LICENSE b/LICENSE index f8b345f9..1843ba7f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2014-2018 The GoCryptoTrader Developers +Copyright (c) 2014-2019 The GoCryptoTrader Developers Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index cb66e4f0..a6f1e0f5 100644 --- a/README.md +++ b/README.md @@ -24,19 +24,19 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader | Bitfinex | Yes | Yes | NA | | Bitflyer | Yes | No | NA | | Bithumb | Yes | NA | NA | -| BitMEX | Yes | No | NA | +| BitMEX | Yes | Yes | NA | | Bitstamp | Yes | Yes | No | | Bittrex | Yes | No | NA | | BTCC | Yes | Yes | No | | BTCMarkets | Yes | No | NA | -| COINUT | Yes | No | NA | +| COINUT | Yes | Yes | NA | | Exmo | Yes | NA | NA | | CoinbasePro | Yes | Yes | No| -| GateIO | Yes | No | NA | -| Gemini | Yes | No | No | +| GateIO | Yes | Yes | NA | +| Gemini | Yes | Yes | No | | HitBTC | Yes | Yes | No | -| Huobi.Pro | Yes | No | NA | -| Huobi.Hadax | Yes | No | NA | +| Huobi.Pro | Yes | Yes | NA | +| Huobi.Hadax | Yes | Yes | NA | | ItBit | Yes | NA | No | | Kraken | Yes | NA | NA | | LakeBTC | Yes | No | NA | @@ -44,11 +44,11 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader | LocalBitcoins | Yes | NA | NA | | OKCoin China | Yes | Yes | No | | OKCoin International | Yes | Yes | No | -| OKEX | Yes | No | No | +| OKEX | Yes | Yes | No | | Poloniex | Yes | Yes | NA | | WEX | Yes | NA | NA | | Yobit | Yes | NA | NA | -| ZB.COM | Yes | No | NA | +| ZB.COM | Yes | Yes | NA | We are aiming to support the top 20 highest volume exchanges based off the [CoinMarketCap exchange data](https://coinmarketcap.com/exchanges/volume/24-hour/). @@ -133,14 +133,15 @@ Binaries will be published once the codebase reaches a stable condition. |User|Github|Contribution Amount| |--|--|--| -| thrasher- | https://github.com/thrasher- | 489 | -| shazbert | https://github.com/shazbert | 156 | -| gloriousCode | https://github.com/gloriousCode | 139 | +| thrasher- | https://github.com/thrasher- | 496 | +| shazbert | https://github.com/shazbert | 158 | +| gloriousCode | https://github.com/gloriousCode | 141 | | ermalguni | https://github.com/ermalguni | 14 | | 140am | https://github.com/140am | 8 | | marcofranssen | https://github.com/marcofranssen | 8 | | cranktakular | https://github.com/cranktakular | 5 | | crackcomm | https://github.com/crackcomm | 3 | +| xtda | https://github.com/xtda | 2 | | andreygrehov | https://github.com/andreygrehov | 2 | | bretep | https://github.com/bretep | 2 | | gam-phon | https://github.com/gam-phon | 2 | @@ -148,11 +149,12 @@ Binaries will be published once the codebase reaches a stable condition. | if1live | https://github.com/if1live | 2 | | soxipy | https://github.com/soxipy | 2 | | herenow | https://github.com/herenow | 2 | -| xtda | https://github.com/xtda | 1 | | blombard | https://github.com/blombard | 1 | | CodeLingoBot | https://github.com/CodeLingoBot | 1 | +| Daanikus | https://github.com/Daanikus | 1 | | daniel-cohen | https://github.com/daniel-cohen | 1 | | frankzougc | https://github.com/frankzougc | 1 | +| woshidama323 | https://github.com/woshidama323 | 1 | | starit | https://github.com/starit | 1 | | Jimexist | https://github.com/Jimexist | 1 | | lookfirst | https://github.com/lookfirst | 1 | @@ -161,8 +163,6 @@ Binaries will be published once the codebase reaches a stable condition. | mKurrels | https://github.com/mKurrels | 1 | | m1kola | https://github.com/m1kola | 1 | | cavapoo2 | https://github.com/cavapoo2 | 1 | -| tongxiaofeng | https://github.com/tongxiaofeng | 1 | -| idealhack | https://github.com/idealhack | 1 | diff --git a/config/config.go b/config/config.go index b1aaa233..9451b70d 100644 --- a/config/config.go +++ b/config/config.go @@ -746,7 +746,7 @@ func (c *Config) CheckExchangeConfigValues() error { lastUpdated := common.UnixTimestampToTime(exch.PairsLastUpdated) lastUpdated = lastUpdated.AddDate(0, 0, configPairsLastUpdatedWarningThreshold) if lastUpdated.Unix() <= time.Now().Unix() { - log.Warn(WarningPairsLastUpdatedThresholdExceeded, exch.Name, configPairsLastUpdatedWarningThreshold) + log.Warnf(WarningPairsLastUpdatedThresholdExceeded, exch.Name, configPairsLastUpdatedWarningThreshold) } } diff --git a/currency/forexprovider/currencyconverterapi/currencyconverterapi.go b/currency/forexprovider/currencyconverterapi/currencyconverterapi.go index 309a6e04..97c5af8a 100644 --- a/currency/forexprovider/currencyconverterapi/currencyconverterapi.go +++ b/currency/forexprovider/currencyconverterapi/currencyconverterapi.go @@ -164,5 +164,11 @@ func (c *CurrencyConverter) SendHTTPRequest(endPoint string, values url.Values, } path = path + values.Encode() - return common.SendHTTPGetRequest(path, true, c.Verbose, &result) + err := common.SendHTTPGetRequest(path, true, c.Verbose, &result) + if err != nil { + return fmt.Errorf("Currency converter API SendHTTPRequest error %s with path %s", + err, + path) + } + return nil } diff --git a/exchanges/alphapoint/alphapoint.go b/exchanges/alphapoint/alphapoint.go index 92e15302..8a949d71 100644 --- a/exchanges/alphapoint/alphapoint.go +++ b/exchanges/alphapoint/alphapoint.go @@ -54,7 +54,9 @@ func (a *Alphapoint) SetDefaults() { a.AssetTypes = []string{ticker.Spot} a.SupportsAutoPairUpdating = false a.SupportsRESTTickerBatching = false - a.APIWithdrawPermissions = exchange.WithdrawCryptoWith2FA | exchange.AutoWithdrawCryptoWithAPIPermission | exchange.NoFiatWithdrawals + a.APIWithdrawPermissions = exchange.WithdrawCryptoWith2FA | + exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.NoFiatWithdrawals a.Requester = request.New(a.Name, request.NewRateLimit(time.Minute*10, alphapointAuthRate), request.NewRateLimit(time.Minute*10, alphapointUnauthRate), diff --git a/exchanges/alphapoint/alphapoint_wrapper.go b/exchanges/alphapoint/alphapoint_wrapper.go index 1efcd088..7bb38bab 100644 --- a/exchanges/alphapoint/alphapoint_wrapper.go +++ b/exchanges/alphapoint/alphapoint_wrapper.go @@ -209,7 +209,7 @@ func (a *Alphapoint) GetWebsocket() (*exchange.Websocket, error) { // GetFeeByType returns an estimate of fee based on type of transaction func (a *Alphapoint) GetFeeByType(feeBuilder exchange.FeeBuilder) (float64, error) { - return 0, common.ErrNotYetImplemented + return 0, common.ErrFunctionNotSupported } // GetWithdrawCapabilities returns the types of withdrawal methods permitted by the exchange diff --git a/exchanges/anx/anx.go b/exchanges/anx/anx.go index 4ce4744f..6ac8326e 100644 --- a/exchanges/anx/anx.go +++ b/exchanges/anx/anx.go @@ -58,8 +58,10 @@ func (a *ANX) SetDefaults() { a.ConfigCurrencyPairFormat.Delimiter = "_" a.ConfigCurrencyPairFormat.Uppercase = true a.ConfigCurrencyPairFormat.Index = "" - a.APIWithdrawPermissions = exchange.WithdrawCryptoWithEmail | exchange.AutoWithdrawCryptoWithSetup | - exchange.WithdrawCryptoWith2FA | exchange.WithdrawFiatViaWebsiteOnly + a.APIWithdrawPermissions = exchange.WithdrawCryptoWithEmail | + exchange.AutoWithdrawCryptoWithSetup | + exchange.WithdrawCryptoWith2FA | + exchange.WithdrawFiatViaWebsiteOnly a.AssetTypes = []string{ticker.Spot} a.SupportsAutoPairUpdating = true a.SupportsRESTTickerBatching = false diff --git a/exchanges/binance/binance.go b/exchanges/binance/binance.go index 8ed455fa..3d38e7a0 100644 --- a/exchanges/binance/binance.go +++ b/exchanges/binance/binance.go @@ -82,7 +82,8 @@ func (b *Binance) SetDefaults() { b.AssetTypes = []string{ticker.Spot} b.SupportsAutoPairUpdating = true b.SupportsRESTTickerBatching = true - b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals + b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.NoFiatWithdrawals b.SetValues() b.Requester = request.New(b.Name, request.NewRateLimit(time.Second, binanceAuthRate), @@ -91,6 +92,10 @@ func (b *Binance) SetDefaults() { b.APIUrlDefault = apiURL b.APIUrl = b.APIUrlDefault b.WebsocketInit() + b.Websocket.Functionality = exchange.WebsocketTradeDataSupported | + exchange.WebsocketTickerSupported | + exchange.WebsocketKlineSupported | + exchange.WebsocketOrderbookSupported } // Setup takes in the supplied exchange configuration details and sets params diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 9b3b4f48..648033c2 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -62,7 +62,7 @@ func (b *Binance) SeedLocalCache(p pair.CurrencyPair) error { newOrderBook.LastUpdated = time.Now() newOrderBook.AssetType = "SPOT" - return b.Websocket.Orderbook.LoadSnapshot(newOrderBook, b.GetName()) + return b.Websocket.Orderbook.LoadSnapshot(newOrderBook, b.GetName(), false) } // UpdateLocalCache updates and returns the most recent iteration of the orderbook @@ -182,8 +182,19 @@ func (b *Binance) WSConnect() error { return nil } -// WSReadData reads from the websocket connection -func (b *Binance) WSReadData() { +// WSReadData reads from the websocket connection and returns the response +func (b *Binance) WSReadData() (exchange.WebsocketResponse, error) { + msgType, resp, err := b.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + b.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Type: msgType, Raw: resp}, nil +} + +// WsHandleData handles websocket data from WsReadData +func (b *Binance) WsHandleData() { b.Websocket.Wg.Add(1) defer func() { @@ -201,37 +212,16 @@ func (b *Binance) WSReadData() { return default: - msgType, resp, err := b.WebsocketConn.ReadMessage() + read, err := b.WSReadData() if err != nil { - b.Websocket.DataHandler <- fmt.Errorf("binance_websocket.go - Websocket Read Data. Error: %s", - err) + b.Websocket.DataHandler <- err return } - b.Websocket.TrafficAlert <- struct{}{} - b.Websocket.Intercomm <- exchange.WebsocketResponse{Type: msgType, Raw: resp} - } - } -} - -// WsHandleData handles websocket data from WsReadData -func (b *Binance) WsHandleData() { - b.Websocket.Wg.Add(1) - defer b.Websocket.Wg.Done() - - go b.WSReadData() - - for { - select { - case <-b.Websocket.ShutdownC: - return - - case read := <-b.Websocket.Intercomm: switch read.Type { case websocket.TextMessage: multiStreamData := MultiStreamData{} - - err := common.JSONDecode(read.Raw, &multiStreamData) + err = common.JSONDecode(read.Raw, &multiStreamData) if err != nil { b.Websocket.DataHandler <- fmt.Errorf("binance_websocket.go - Could not load multi stream data: %s", string(read.Raw)) diff --git a/exchanges/bitfinex/bitfinex.go b/exchanges/bitfinex/bitfinex.go index 6065fa20..8384cb99 100644 --- a/exchanges/bitfinex/bitfinex.go +++ b/exchanges/bitfinex/bitfinex.go @@ -94,7 +94,8 @@ func (b *Bitfinex) SetDefaults() { b.Verbose = false b.RESTPollingDelay = 10 b.WebsocketSubdChannels = make(map[int]WebsocketChanInfo) - b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.AutoWithdrawFiatWithAPIPermission + b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.AutoWithdrawFiatWithAPIPermission b.RequestCurrencyPairFormat.Delimiter = "" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "" @@ -109,6 +110,9 @@ func (b *Bitfinex) SetDefaults() { b.APIUrlDefault = bitfinexAPIURLBase b.APIUrl = b.APIUrlDefault b.WebsocketInit() + b.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketOrderbookSupported } // Setup takes in the supplied exchange configuration details and sets params diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index e7cb509a..866a037f 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -187,14 +187,29 @@ func (b *Bitfinex) WsConnect() error { pongReceive = make(chan struct{}, 1) - go b.WsReadData() go b.WsDataHandler() return nil } // WsReadData reads and handles websocket stream data -func (b *Bitfinex) WsReadData() { +func (b *Bitfinex) WsReadData() (exchange.WebsocketResponse, error) { + msgType, resp, err := b.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + b.Websocket.TrafficAlert <- struct{}{} + + return exchange.WebsocketResponse{ + Type: msgType, + Raw: resp, + }, nil + +} + +// WsDataHandler handles data from WsReadData +func (b *Bitfinex) WsDataHandler() { b.Websocket.Wg.Add(1) defer func() { @@ -210,35 +225,14 @@ func (b *Bitfinex) WsReadData() { select { case <-b.Websocket.ShutdownC: return + default: - msgType, resp, err := b.WebsocketConn.ReadMessage() + stream, err := b.WsReadData() if err != nil { b.Websocket.DataHandler <- err return } - b.Websocket.TrafficAlert <- struct{}{} - - b.Websocket.Intercomm <- exchange.WebsocketResponse{ - Type: msgType, - Raw: resp, - } - } - } -} - -// WsDataHandler handles data from WsReadData -func (b *Bitfinex) WsDataHandler() { - b.Websocket.Wg.Add(1) - defer b.Websocket.Wg.Done() - - for { - select { - case <-b.Websocket.ShutdownC: - return - - case stream := <-b.Websocket.Intercomm: - switch stream.Type { case websocket.TextMessage: var result interface{} @@ -320,7 +314,6 @@ func (b *Bitfinex) WsDataHandler() { b.Websocket.DataHandler <- fmt.Errorf("bitfinex_websocket.go inserting snapshot error: %s", err) } - continue } @@ -537,7 +530,7 @@ func (b *Bitfinex) WsInsertSnapshot(p pair.CurrencyPair, assetType string, books newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = p - err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName()) + err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName(), false) if err != nil { return fmt.Errorf("bitfinex.go error - %s", err) } diff --git a/exchanges/bitflyer/bitflyer.go b/exchanges/bitflyer/bitflyer.go index 9876a798..604077d8 100644 --- a/exchanges/bitflyer/bitflyer.go +++ b/exchanges/bitflyer/bitflyer.go @@ -81,7 +81,8 @@ func (b *Bitflyer) SetDefaults() { b.Enabled = false b.Verbose = false b.RESTPollingDelay = 10 - b.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | exchange.AutoWithdrawFiat + b.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | + exchange.AutoWithdrawFiat b.RequestCurrencyPairFormat.Delimiter = "_" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "_" diff --git a/exchanges/bithumb/bithumb.go b/exchanges/bithumb/bithumb.go index 83973eaf..5d25e138 100644 --- a/exchanges/bithumb/bithumb.go +++ b/exchanges/bithumb/bithumb.go @@ -64,7 +64,8 @@ func (b *Bithumb) SetDefaults() { b.Enabled = false b.Verbose = false b.RESTPollingDelay = 10 - b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.AutoWithdrawFiat + b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.AutoWithdrawFiat b.RequestCurrencyPairFormat.Delimiter = "" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "" diff --git a/exchanges/bithumb/bithumb_wrapper.go b/exchanges/bithumb/bithumb_wrapper.go index f4237dcb..837d666f 100644 --- a/exchanges/bithumb/bithumb_wrapper.go +++ b/exchanges/bithumb/bithumb_wrapper.go @@ -297,7 +297,7 @@ func (b *Bithumb) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange. // GetWebsocket returns a pointer to the exchange websocket func (b *Bithumb) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/bitmex/bitmex.go b/exchanges/bitmex/bitmex.go index 35097ce8..70cb7042 100644 --- a/exchanges/bitmex/bitmex.go +++ b/exchanges/bitmex/bitmex.go @@ -116,7 +116,10 @@ func (b *Bitmex) SetDefaults() { b.Enabled = false b.Verbose = false b.RESTPollingDelay = 10 - b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.WithdrawCryptoWithEmail | exchange.WithdrawCryptoWith2FA | exchange.NoFiatWithdrawals + b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.WithdrawCryptoWithEmail | + exchange.WithdrawCryptoWith2FA | + exchange.NoFiatWithdrawals b.RequestCurrencyPairFormat.Delimiter = "" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "" @@ -130,6 +133,8 @@ func (b *Bitmex) SetDefaults() { b.APIUrl = b.APIUrlDefault b.SupportsAutoPairUpdating = true b.WebsocketInit() + b.Websocket.Functionality = exchange.WebsocketTradeDataSupported | + exchange.WebsocketOrderbookSupported } // Setup takes in the supplied exchange configuration details and sets params diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index fb6e871a..57ba026e 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -104,7 +104,6 @@ func (b *Bitmex) WsConnector() error { } go b.wsHandleIncomingData() - go b.wsReadData() err = b.websocketSubscribe() if err != nil { @@ -125,7 +124,21 @@ func (b *Bitmex) WsConnector() error { return nil } -func (b *Bitmex) wsReadData() { +func (b *Bitmex) wsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := b.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + b.Websocket.TrafficAlert <- struct{}{} + + return exchange.WebsocketResponse{ + Raw: resp, + }, nil +} + +// wsHandleIncomingData services incoming data from the websocket connection +func (b *Bitmex) wsHandleIncomingData() { b.Websocket.Wg.Add(1) defer func() { @@ -143,33 +156,12 @@ func (b *Bitmex) wsReadData() { return default: - _, resp, err := b.WebsocketConn.ReadMessage() + resp, err := b.wsReadData() if err != nil { - b.Websocket.DataHandler <- fmt.Errorf("bitmex_websocket.go - websocket connection Error: %s", - err) + b.Websocket.DataHandler <- err return } - b.Websocket.TrafficAlert <- struct{}{} - - b.Websocket.Intercomm <- exchange.WebsocketResponse{ - Raw: resp, - } - } - } -} - -// wsHandleIncomingData services incoming data from the websocket connection -func (b *Bitmex) wsHandleIncomingData() { - b.Websocket.Wg.Add(1) - defer b.Websocket.Wg.Done() - - for { - select { - case <-b.Websocket.ShutdownC: - return - - case resp := <-b.Websocket.Intercomm: message := string(resp.Raw) if common.StringContains(message, "pong") { pongChan <- 1 @@ -180,20 +172,23 @@ func (b *Bitmex) wsHandleIncomingData() { err := b.WebsocketConn.WriteJSON("pong") if err != nil { b.Websocket.DataHandler <- err + continue } } quickCapture := make(map[string]interface{}) - err := common.JSONDecode(resp.Raw, &quickCapture) + err = common.JSONDecode(resp.Raw, &quickCapture) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } var respError WebsocketErrorResponse if _, ok := quickCapture["status"]; ok { err = common.JSONDecode(resp.Raw, &respError) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } b.Websocket.DataHandler <- errors.New(respError.Error) continue @@ -203,7 +198,8 @@ func (b *Bitmex) wsHandleIncomingData() { var decodedResp WebsocketSubscribeResp err := common.JSONDecode(resp.Raw, &decodedResp) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } if decodedResp.Success { @@ -225,7 +221,8 @@ func (b *Bitmex) wsHandleIncomingData() { var decodedResp WebsocketMainResponse err := common.JSONDecode(resp.Raw, &decodedResp) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } switch decodedResp.Table { @@ -233,20 +230,23 @@ func (b *Bitmex) wsHandleIncomingData() { var orderbooks OrderBookData err = common.JSONDecode(resp.Raw, &orderbooks) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } p := pair.NewCurrencyPairFromString(orderbooks.Data[0].Symbol) err = b.processOrderbook(orderbooks.Data, orderbooks.Action, p, "CONTRACT") if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } case bitmexWSTrade: var trades TradeData err = common.JSONDecode(resp.Raw, &trades) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } if trades.Action == bitmexActionInitialData { @@ -256,7 +256,8 @@ func (b *Bitmex) wsHandleIncomingData() { for _, trade := range trades.Data { timestamp, err := time.Parse(time.RFC3339, trade.Timestamp) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } b.Websocket.DataHandler <- exchange.TradeData{ @@ -275,7 +276,8 @@ func (b *Bitmex) wsHandleIncomingData() { err = common.JSONDecode(resp.Raw, &announcement) if err != nil { - log.Error(err) + b.Websocket.DataHandler <- err + continue } if announcement.Action == bitmexActionInitialData { @@ -285,7 +287,8 @@ func (b *Bitmex) wsHandleIncomingData() { b.Websocket.DataHandler <- announcement.Data default: - log.Errorf("Bitmex websocket error: Table unknown - %s", decodedResp.Table) + b.Websocket.DataHandler <- fmt.Errorf("Bitmex websocket error: Table unknown - %s", + decodedResp.Table) } } } @@ -341,7 +344,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = currencyPair - err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName()) + err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName(), false) if err != nil { return fmt.Errorf("bitmex_websocket.go process orderbook error - %s", err) diff --git a/exchanges/bitstamp/bitstamp.go b/exchanges/bitstamp/bitstamp.go index e2730e1c..33822160 100644 --- a/exchanges/bitstamp/bitstamp.go +++ b/exchanges/bitstamp/bitstamp.go @@ -70,7 +70,8 @@ func (b *Bitstamp) SetDefaults() { b.Enabled = false b.Verbose = false b.RESTPollingDelay = 10 - b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.AutoWithdrawFiat + b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.AutoWithdrawFiat b.RequestCurrencyPairFormat.Delimiter = "" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "" @@ -85,6 +86,8 @@ func (b *Bitstamp) SetDefaults() { b.APIUrlDefault = bitstampAPIURL b.APIUrl = b.APIUrlDefault b.WebsocketInit() + b.Websocket.Functionality = exchange.WebsocketOrderbookSupported | + exchange.WebsocketTradeDataSupported } // Setup sets configuration values to bitstamp diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 919e4c1d..7815867a 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -132,7 +132,7 @@ func (b *Bitstamp) WsConnect() error { newOrderbook.LastUpdated = time.Unix(0, orderbookSeed.Timestamp) newOrderbook.AssetType = "SPOT" - err = b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName()) + err = b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName(), false) if err != nil { return err } @@ -147,7 +147,6 @@ func (b *Bitstamp) WsConnect() error { strings.ToLower(p.Pair().String()))) if err != nil { - log.Error(err) return fmt.Errorf("%s Websocket Trade subscription error: %s", b.GetName(), err) @@ -157,7 +156,6 @@ func (b *Bitstamp) WsConnect() error { strings.ToLower(p.Pair().String()))) if err != nil { - log.Error(err) return fmt.Errorf("%s Websocket Trade subscription error: %s", b.GetName(), err) diff --git a/exchanges/bittrex/bittrex.go b/exchanges/bittrex/bittrex.go index 7ef28429..eebd3af9 100644 --- a/exchanges/bittrex/bittrex.go +++ b/exchanges/bittrex/bittrex.go @@ -68,7 +68,8 @@ func (b *Bittrex) SetDefaults() { b.Enabled = false b.Verbose = false b.RESTPollingDelay = 10 - b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.NoFiatWithdrawals + b.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.NoFiatWithdrawals b.RequestCurrencyPairFormat.Delimiter = "-" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "-" diff --git a/exchanges/btcc/btcc_websocket.go b/exchanges/btcc/btcc_websocket.go index 526a06f7..76f46af3 100644 --- a/exchanges/btcc/btcc_websocket.go +++ b/exchanges/btcc/btcc_websocket.go @@ -68,7 +68,6 @@ func (b *BTCC) WsConnect() error { return err } - go b.WsReadData() go b.WsHandleData() err = b.WsSubscribeToOrderbook() @@ -85,9 +84,33 @@ func (b *BTCC) WsConnect() error { } // WsReadData reads data from the websocket connection -func (b *BTCC) WsReadData() { +func (b *BTCC) WsReadData() (exchange.WebsocketResponse, error) { + mtx.Lock() + _, resp, err := b.Conn.ReadMessage() + mtx.Unlock() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + b.Websocket.TrafficAlert <- struct{}{} + + return exchange.WebsocketResponse{ + Raw: resp, + }, nil +} + +// WsHandleData handles read data +func (b *BTCC) WsHandleData() { b.Websocket.Wg.Add(1) - defer b.Websocket.Wg.Done() + + defer func() { + err := b.Conn.Close() + if err != nil { + b.Websocket.DataHandler <- fmt.Errorf("btcc_websocket.go - Unable to close Websocket connection. Error: %s", + err) + } + b.Websocket.Wg.Done() + }() for { select { @@ -95,35 +118,14 @@ func (b *BTCC) WsReadData() { return default: - mtx.Lock() - _, resp, err := b.Conn.ReadMessage() - mtx.Unlock() + resp, err := b.WsReadData() if err != nil { b.Websocket.DataHandler <- err + return } - b.Websocket.TrafficAlert <- struct{}{} - - b.Websocket.Intercomm <- exchange.WebsocketResponse{ - Raw: resp, - } - } - } -} - -// WsHandleData handles read data -func (b *BTCC) WsHandleData() { - b.Websocket.Wg.Add(1) - defer b.Websocket.Wg.Done() - - for { - select { - case <-b.Websocket.ShutdownC: - return - - case resp := <-b.Websocket.Intercomm: var Result WsResponseMain - err := common.JSONDecode(resp.Raw, &Result) + err = common.JSONDecode(resp.Raw, &Result) if err != nil { b.Websocket.DataHandler <- err continue @@ -408,7 +410,7 @@ func (b *BTCC) WsProcessOrderbookSnapshot(ob WsOrderbookSnapshot) error { newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = pair.NewCurrencyPairFromString(ob.Symbol) - err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName()) + err := b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName(), false) if err != nil { return err } diff --git a/exchanges/btcmarkets/btcmarkets.go b/exchanges/btcmarkets/btcmarkets.go index 2bbb9e0a..b7b07690 100644 --- a/exchanges/btcmarkets/btcmarkets.go +++ b/exchanges/btcmarkets/btcmarkets.go @@ -58,7 +58,8 @@ func (b *BTCMarkets) SetDefaults() { b.Verbose = false b.RESTPollingDelay = 10 b.Ticker = make(map[string]Ticker) - b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.AutoWithdrawFiat + b.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.AutoWithdrawFiat b.RequestCurrencyPairFormat.Delimiter = "" b.RequestCurrencyPairFormat.Uppercase = true b.ConfigCurrencyPairFormat.Delimiter = "-" diff --git a/exchanges/coinbasepro/coinbasepro.go b/exchanges/coinbasepro/coinbasepro.go index ffceb0fc..be1e581d 100644 --- a/exchanges/coinbasepro/coinbasepro.go +++ b/exchanges/coinbasepro/coinbasepro.go @@ -70,7 +70,8 @@ func (c *CoinbasePro) SetDefaults() { c.TakerFee = 0.25 c.MakerFee = 0 c.RESTPollingDelay = 10 - c.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.AutoWithdrawFiatWithAPIPermission + c.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.AutoWithdrawFiatWithAPIPermission c.RequestCurrencyPairFormat.Delimiter = "-" c.RequestCurrencyPairFormat.Uppercase = true c.ConfigCurrencyPairFormat.Delimiter = "" @@ -85,6 +86,8 @@ func (c *CoinbasePro) SetDefaults() { c.APIUrlDefault = coinbaseproAPIURL c.APIUrl = c.APIUrlDefault c.WebsocketInit() + c.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketOrderbookSupported } // Setup initialises the exchange parameters with the current configuration diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 3ca5ad5c..8a75faba 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -85,14 +85,24 @@ func (c *CoinbasePro) WsConnect() error { return err } - go c.WsReadData() go c.WsHandleData() return nil } // WsReadData reads data from the websocket connection -func (c *CoinbasePro) WsReadData() { +func (c *CoinbasePro) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := c.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + c.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles read data from websocket connection +func (c *CoinbasePro) WsHandleData() { c.Websocket.Wg.Add(1) defer func() { @@ -110,29 +120,12 @@ func (c *CoinbasePro) WsReadData() { return default: - _, resp, err := c.WebsocketConn.ReadMessage() + resp, err := c.WsReadData() if err != nil { c.Websocket.DataHandler <- err return } - c.Websocket.TrafficAlert <- struct{}{} - c.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: resp} - } - } -} - -// WsHandleData handles read data from websocket connection -func (c *CoinbasePro) WsHandleData() { - c.Websocket.Wg.Add(1) - defer c.Websocket.Wg.Done() - - for { - select { - case <-c.Websocket.ShutdownC: - return - - case resp := <-c.Websocket.Intercomm: type MsgType struct { Type string `json:"type"` Sequence int64 `json:"sequence"` @@ -140,7 +133,7 @@ func (c *CoinbasePro) WsHandleData() { } msgType := MsgType{} - err := common.JSONDecode(resp.Raw, &msgType) + err = common.JSONDecode(resp.Raw, &msgType) if err != nil { c.Websocket.DataHandler <- err continue @@ -245,7 +238,7 @@ func (c *CoinbasePro) ProcessSnapshot(snapshot WebsocketOrderbookSnapshot) error base.CurrencyPair = snapshot.ProductID base.LastUpdated = time.Now() - err := c.Websocket.Orderbook.LoadSnapshot(base, c.GetName()) + err := c.Websocket.Orderbook.LoadSnapshot(base, c.GetName(), false) if err != nil { return err } diff --git a/exchanges/coinut/coinut.go b/exchanges/coinut/coinut.go index 01a6956f..fe52ca93 100644 --- a/exchanges/coinut/coinut.go +++ b/exchanges/coinut/coinut.go @@ -56,7 +56,8 @@ func (c *COINUT) SetDefaults() { c.MakerFee = 0 c.Verbose = false c.RESTPollingDelay = 10 - c.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | exchange.WithdrawFiatViaWebsiteOnly + c.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | + exchange.WithdrawFiatViaWebsiteOnly c.RequestCurrencyPairFormat.Delimiter = "" c.RequestCurrencyPairFormat.Uppercase = true c.ConfigCurrencyPairFormat.Delimiter = "" @@ -71,6 +72,9 @@ func (c *COINUT) SetDefaults() { c.APIUrlDefault = coinutAPIURL c.APIUrl = c.APIUrlDefault c.WebsocketInit() + c.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketTradeDataSupported } // Setup sets the current exchange configuration diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index a2bfe288..4b5e2594 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -28,7 +28,18 @@ var populatedList bool // wss://wsapi-eu.coinut.com // WsReadData reads data from the websocket connection -func (c *COINUT) WsReadData() { +func (c *COINUT) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := c.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + c.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles read data +func (c *COINUT) WsHandleData() { c.Websocket.Wg.Add(1) defer func() { @@ -46,31 +57,14 @@ func (c *COINUT) WsReadData() { return default: - _, resp, err := c.WebsocketConn.ReadMessage() + resp, err := c.WsReadData() if err != nil { c.Websocket.DataHandler <- err return } - c.Websocket.TrafficAlert <- struct{}{} - c.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: resp} - } - } -} - -// WsHandleData handles read data -func (c *COINUT) WsHandleData() { - c.Websocket.Wg.Add(1) - defer c.Websocket.Wg.Done() - - for { - select { - case <-c.Websocket.ShutdownC: - return - - case resp := <-c.Websocket.Intercomm: var incoming wsResponse - err := common.JSONDecode(resp.Raw, &incoming) + err = common.JSONDecode(resp.Raw, &incoming) if err != nil { c.Websocket.DataHandler <- err continue @@ -218,7 +212,6 @@ func (c *COINUT) WsConnect() error { channels = make(map[string]chan []byte) channels["hb"] = make(chan []byte, 1) - go c.WsReadData() go c.WsHandleData() return nil @@ -345,7 +338,7 @@ func (c *COINUT) WsProcessOrderbookSnapshot(ob WsOrderbookSnapshot) error { newOrderbook.AssetType = "SPOT" newOrderbook.LastUpdated = time.Now() - return c.Websocket.Orderbook.LoadSnapshot(newOrderbook, c.GetName()) + return c.Websocket.Orderbook.LoadSnapshot(newOrderbook, c.GetName(), false) } // WsProcessOrderbookUpdate process an orderbook update diff --git a/exchanges/exchange_websocket.go b/exchanges/exchange_websocket.go index 73f76ab5..93755f31 100644 --- a/exchanges/exchange_websocket.go +++ b/exchanges/exchange_websocket.go @@ -3,6 +3,7 @@ package exchange import ( "errors" "fmt" + "strings" "sync" "time" @@ -11,7 +12,25 @@ import ( "github.com/thrasher-/gocryptotrader/exchanges/orderbook" ) +// Websocket functionality list and state consts const ( + NoWebsocketSupport uint32 = 0 + WebsocketTickerSupported uint32 = 1 << (iota - 1) + WebsocketOrderbookSupported + WebsocketKlineSupported + WebsocketTradeDataSupported + WebsocketAccountSupported + WebsocketAllowsRequests + + WebsocketTickerSupportedText = "TICKER STREAMING SUPPORTED" + WebsocketOrderbookSupportedText = "ORDERBOOK STREAMING SUPPORTED" + WebsocketKlineSupportedText = "KLINE STREAMING SUPPORTED" + WebsocketTradeDataSupportedText = "TRADE STREAMING SUPPORTED" + WebsocketAccountSupportedText = "ACCOUNT STREAMING SUPPORTED" + WebsocketAllowsRequestsText = "WEBSOCKET REQUESTS SUPPORTED" + NoWebsocketSupportText = "WEBSOCKET NOT SUPPORTED" + UnknownWebsocketFunctionality = "UNKNOWN FUNCTIONALITY BITMASK" + // WebsocketNotEnabled alerts of a disabled websocket WebsocketNotEnabled = "exchange_websocket_not_enabled" // WebsocketTrafficLimitTime defines a standard time for no traffic from the @@ -45,7 +64,6 @@ func (e *Base) WebsocketSetup(connector func() error, e.Websocket.DataHandler = make(chan interface{}, 1) e.Websocket.Connected = make(chan struct{}, 1) e.Websocket.Disconnected = make(chan struct{}, 1) - e.Websocket.Intercomm = make(chan WebsocketResponse, 1) e.Websocket.TrafficAlert = make(chan struct{}, 1) err := e.Websocket.SetEnabled(wsEnabled) @@ -82,9 +100,6 @@ type Websocket struct { // Disconnected denotes a channel switch for diversion of request flow Disconnected chan struct{} - // Intercomm denotes a channel from read data routine to handle data routine - Intercomm chan WebsocketResponse - // DataHandler pipes websocket data to an exchange websocket data handler DataHandler chan interface{} @@ -101,6 +116,9 @@ type Websocket struct { // TrafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} + + // Functionality defines websocket stream capabilities + Functionality uint32 } // trafficMonitor monitors traffic and switches connection modes for websocket @@ -440,8 +458,10 @@ func (w *WebsocketOrderbookLocal) Update(bidTargets, askTargets []orderbook.Item return nil } -// LoadSnapshot loads initial snapshot of orderbook data -func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook orderbook.Base, exchName string) error { +// LoadSnapshot loads initial snapshot of orderbook data, overite allows full +// orderbook to be completely rewritten because the exchange is a doing a full +// update not an incremental one +func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook orderbook.Base, exchName string, overwrite bool) error { if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 { return errors.New("exchange.go websocket orderbook cache LoadSnapshot() error - snapshot ask and bids are nil") } @@ -451,6 +471,17 @@ func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook orderbook.Base, exch for i := range w.ob { if w.ob[i].Pair == newOrderbook.Pair && w.ob[i].AssetType == newOrderbook.AssetType { + if overwrite { + w.ob[i] = newOrderbook + w.lastUpdated = newOrderbook.LastUpdated + + orderbook.ProcessOrderbook(exchName, + newOrderbook.Pair, + newOrderbook, + newOrderbook.AssetType) + + return nil + } return errors.New("exchange.go websocket orderbook cache LoadSnapshot() error - Snapshot instance already found") } } @@ -615,3 +646,57 @@ type WebsocketPositionUpdated struct { AssetType string Exchange string } + +// GetFunctionality returns a functionality bitmask for the websocket +// connection +func (w *Websocket) GetFunctionality() uint32 { + return w.Functionality +} + +// SupportsFunctionality returns if the functionality is supported as a boolean +func (w *Websocket) SupportsFunctionality(f uint32) bool { + if w.GetFunctionality()&f == f { + return true + } + return false +} + +// FormatFunctionality will return each of the websocket connection compatible +// stream methods as a string +func (w *Websocket) FormatFunctionality() string { + functionality := []string{} + for i := 0; i < 32; i++ { + var check uint32 = 1 << uint32(i) + if w.GetFunctionality()&check != 0 { + switch check { + case WebsocketTickerSupported: + functionality = append(functionality, WebsocketTickerSupportedText) + + case WebsocketOrderbookSupported: + functionality = append(functionality, WebsocketOrderbookSupportedText) + + case WebsocketKlineSupported: + functionality = append(functionality, WebsocketKlineSupportedText) + + case WebsocketTradeDataSupported: + functionality = append(functionality, WebsocketTradeDataSupportedText) + + case WebsocketAccountSupported: + functionality = append(functionality, WebsocketAccountSupportedText) + + case WebsocketAllowsRequests: + functionality = append(functionality, WebsocketAllowsRequestsText) + + default: + functionality = append(functionality, + fmt.Sprintf("%s[1<<%v]", UnknownWebsocketFunctionality, i)) + } + } + } + + if len(functionality) > 0 { + return strings.Join(functionality, " & ") + } + + return NoWebsocketSupportText +} diff --git a/exchanges/exchange_websocket_test.go b/exchanges/exchange_websocket_test.go index d6705ea4..4d27f0f3 100644 --- a/exchanges/exchange_websocket_test.go +++ b/exchanges/exchange_websocket_test.go @@ -161,7 +161,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot1.LastUpdated = time.Now() snapShot1.Pair = pair.NewCurrencyPairFromString("BTCUSD") - wsTest.Websocket.Orderbook.LoadSnapshot(snapShot1, "ExchangeTest") + wsTest.Websocket.Orderbook.LoadSnapshot(snapShot1, "ExchangeTest", false) var snapShot2 orderbook.Base asks = []orderbook.Item{ @@ -199,7 +199,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot2.LastUpdated = time.Now() snapShot2.Pair = pair.NewCurrencyPairFromString("LTCUSD") - wsTest.Websocket.Orderbook.LoadSnapshot(snapShot2, "ExchangeTest") + wsTest.Websocket.Orderbook.LoadSnapshot(snapShot2, "ExchangeTest", false) var snapShot3 orderbook.Base asks = []orderbook.Item{ @@ -237,7 +237,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot3.LastUpdated = time.Now() snapShot3.Pair = pair.NewCurrencyPairFromString("LTCUSD") - wsTest.Websocket.Orderbook.LoadSnapshot(snapShot3, "ExchangeTest") + wsTest.Websocket.Orderbook.LoadSnapshot(snapShot3, "ExchangeTest", false) if len(wsTest.Websocket.Orderbook.ob) != 3 { t.Error("test failed - inserting orderbook data") @@ -309,3 +309,28 @@ func TestUpdate(t *testing.T) { t.Error("test failed - OrderbookUpdate error", err) } } + +func TestFunctionality(t *testing.T) { + var w Websocket + + if w.FormatFunctionality() != NoWebsocketSupportText { + t.Fatalf("Test Failed - FormatFunctionality error expected %s but recieved %s", + NoWebsocketSupportText, w.FormatFunctionality()) + } + + w.Functionality = 1 << 31 + + if w.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" { + t.Fatal("Test Failed - GetFunctionality error incorrect error returned") + } + + w.Functionality = WebsocketOrderbookSupported + + if w.GetFunctionality() != WebsocketOrderbookSupported { + t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned") + } + + if !w.SupportsFunctionality(WebsocketOrderbookSupported) { + t.Fatal("Test Failed - SupportsFunctionality error should be true") + } +} diff --git a/exchanges/exmo/exmo_wrapper.go b/exchanges/exmo/exmo_wrapper.go index 2a4cf64d..a0d0bd08 100644 --- a/exchanges/exmo/exmo_wrapper.go +++ b/exchanges/exmo/exmo_wrapper.go @@ -291,7 +291,7 @@ func (e *EXMO) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.Wit // GetWebsocket returns a pointer to the exchange websocket func (e *EXMO) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index bc923b25..15359180 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" exchange "github.com/thrasher-/gocryptotrader/exchanges" @@ -43,6 +44,7 @@ const ( // Gateio is the overarching type across this package type Gateio struct { + WebsocketConn *websocket.Conn exchange.Base } @@ -52,7 +54,8 @@ func (g *Gateio) SetDefaults() { g.Enabled = false g.Verbose = false g.RESTPollingDelay = 10 - g.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals + g.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.NoFiatWithdrawals g.RequestCurrencyPairFormat.Delimiter = "_" g.RequestCurrencyPairFormat.Uppercase = false g.ConfigCurrencyPairFormat.Delimiter = "_" @@ -69,6 +72,10 @@ func (g *Gateio) SetDefaults() { g.APIUrlSecondaryDefault = gateioMarketURL g.APIUrlSecondary = g.APIUrlSecondaryDefault g.WebsocketInit() + g.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketKlineSupported } // Setup sets user configuration @@ -107,6 +114,14 @@ func (g *Gateio) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = g.WebsocketSetup(g.WsConnect, + exch.Name, + exch.Websocket, + gateioWebsocketEndpoint, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 7a30e975..9f7a8207 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -1,6 +1,7 @@ package gateio import ( + "encoding/json" "time" "github.com/thrasher-/gocryptotrader/currency/symbol" @@ -363,3 +364,51 @@ var WithdrawalFees = map[string]float64{ symbol.TCT: 20, symbol.EXC: 10, } + +// WebsocketRequest defines the initial request in JSON +type WebsocketRequest struct { + ID int64 `json:"id"` + Method string `json:"method"` + Params []interface{} `json:"params"` +} + +// WebsocketResponse defines a websocket response from gateio +type WebsocketResponse struct { + Time int64 `json:"time"` + Channel string `json:"channel"` + Event string `json:""` + Error WebsocketError `json:"error"` + Result struct { + Status string `json:"status"` + } `json:"result"` + Method string `json:"method"` + Params []json.RawMessage `json:"params"` +} + +// WebsocketError defines a websocket error type +type WebsocketError struct { + Code int64 `json:"code"` + Message string `json:"message"` +} + +// WebsocketTicker defines ticker data +type WebsocketTicker struct { + Period int64 `json:"period"` + Open float64 `json:"open,string"` + Close float64 `json:"close,string"` + High float64 `json:"high,string"` + Low float64 `json:"Low,string"` + Last float64 `json:"last,string"` + Change float64 `json:"change,string"` + QuoteVolume float64 `json:"quoteVolume,string"` + BaseVolume float64 `json:"baseVolume,string"` +} + +// WebsocketTrade defines trade data +type WebsocketTrade struct { + ID int64 `json:"id"` + Time float64 `json:"time"` + Price float64 `json:"price,string"` + Amount float64 `json:"amount,string"` + Type string `json:"type"` +} diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go new file mode 100644 index 00000000..7463790a --- /dev/null +++ b/exchanges/gateio/gateio_websocket.go @@ -0,0 +1,327 @@ +package gateio + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency/pair" + exchange "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" +) + +const ( + gateioWebsocketEndpoint = "wss://ws.gate.io/v3/" + gatioWsMethodPing = "ping" +) + +// WsConnect initiates a websocket connection +func (g *Gateio) WsConnect() error { + if !g.Websocket.IsEnabled() || !g.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + if g.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(g.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + var err error + g.WebsocketConn, _, err = dialer.Dial(g.Websocket.GetWebsocketURL(), + http.Header{}) + if err != nil { + return err + } + + go g.WsHandleData() + + return g.WsSubscribe() +} + +// WsSubscribe subscribes to the full websocket suite on ZB exchange +func (g *Gateio) WsSubscribe() error { + enabled := g.GetEnabledCurrencies() + + for _, c := range enabled { + ticker := WebsocketRequest{ + ID: 1337, + Method: "ticker.subscribe", + Params: []interface{}{c.Pair().String()}, + } + + err := g.WebsocketConn.WriteJSON(ticker) + if err != nil { + return err + } + + trade := WebsocketRequest{ + ID: 1337, + Method: "trades.subscribe", + Params: []interface{}{c.Pair().String()}, + } + + err = g.WebsocketConn.WriteJSON(trade) + if err != nil { + return err + } + + depth := WebsocketRequest{ + ID: 1337, + Method: "depth.subscribe", + Params: []interface{}{c.Pair().String(), 30, "0.1"}, + } + + err = g.WebsocketConn.WriteJSON(depth) + if err != nil { + return err + } + + kline := WebsocketRequest{ + ID: 1337, + Method: "kline.subscribe", + Params: []interface{}{c.Pair().String(), 1800}, + } + + err = g.WebsocketConn.WriteJSON(kline) + if err != nil { + return err + } + } + + return nil +} + +// WsReadData reads from the websocket connection and returns the websocket +// response +func (g *Gateio) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := g.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + g.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles all the websocket data coming from the websocket +// connection +func (g *Gateio) WsHandleData() { + g.Websocket.Wg.Add(1) + + defer func() { + err := g.WebsocketConn.Close() + if err != nil { + g.Websocket.DataHandler <- fmt.Errorf("gateio_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + g.Websocket.Wg.Done() + }() + + for { + select { + case <-g.Websocket.ShutdownC: + return + + default: + resp, err := g.WsReadData() + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + var result WebsocketResponse + err = common.JSONDecode(resp.Raw, &result) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + if result.Error.Code != 0 { + g.Websocket.DataHandler <- fmt.Errorf("gateio_websocket.go error %s", + result.Error.Message) + continue + } + + switch { + case common.StringContains(result.Method, "ticker"): + var ticker WebsocketTicker + var c string + err := common.JSONDecode(result.Params[1], &ticker) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + err = common.JSONDecode(result.Params[0], &c) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + g.Websocket.DataHandler <- exchange.TickerData{ + Timestamp: time.Now(), + Pair: pair.NewCurrencyPairFromString(c), + AssetType: "SPOT", + Exchange: g.GetName(), + ClosePrice: ticker.Close, + Quantity: ticker.BaseVolume, + OpenPrice: ticker.Open, + HighPrice: ticker.High, + LowPrice: ticker.Low, + } + + case common.StringContains(result.Method, "trades"): + var trades []WebsocketTrade + var c string + err := common.JSONDecode(result.Params[1], &trades) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + err = common.JSONDecode(result.Params[0], &c) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + for _, trade := range trades { + g.Websocket.DataHandler <- exchange.TradeData{ + Timestamp: time.Now(), + CurrencyPair: pair.NewCurrencyPairFromString(c), + AssetType: "SPOT", + Exchange: g.GetName(), + Price: trade.Price, + Amount: trade.Amount, + Side: trade.Type, + } + } + + case common.StringContains(result.Method, "depth"): + var IsSnapshot bool + var c string + var data = make(map[string][][]string) + err = common.JSONDecode(result.Params[0], &IsSnapshot) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + err = common.JSONDecode(result.Params[2], &c) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + err = common.JSONDecode(result.Params[1], &data) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + var asks, bids []orderbook.Item + + askData, askOk := data["asks"] + for _, ask := range askData { + amount, _ := strconv.ParseFloat(ask[1], 64) + price, _ := strconv.ParseFloat(ask[0], 64) + asks = append(asks, orderbook.Item{ + Amount: amount, + Price: price, + }) + } + + bidData, bidOk := data["bids"] + for _, bid := range bidData { + amount, _ := strconv.ParseFloat(bid[1], 64) + price, _ := strconv.ParseFloat(bid[0], 64) + bids = append(bids, orderbook.Item{ + Amount: amount, + Price: price, + }) + } + + if !askOk && !bidOk { + g.Websocket.DataHandler <- errors.New("gatio websocket error - cannot access ask or bid data") + } + + if IsSnapshot { + if !askOk { + g.Websocket.DataHandler <- errors.New("gatio websocket error - cannot access ask data") + } + + if !bidOk { + g.Websocket.DataHandler <- errors.New("gatio websocket error - cannot access bid data") + } + + var newOrderbook orderbook.Base + newOrderbook.Asks = asks + newOrderbook.Bids = bids + newOrderbook.AssetType = "SPOT" + newOrderbook.CurrencyPair = c + newOrderbook.LastUpdated = time.Now() + newOrderbook.Pair = pair.NewCurrencyPairFromString(c) + + err = g.Websocket.Orderbook.LoadSnapshot(newOrderbook, + g.GetName(), + false) + if err != nil { + g.Websocket.DataHandler <- err + } + } else { + err = g.Websocket.Orderbook.Update(asks, + bids, + pair.NewCurrencyPairFromString(c), + time.Now(), + g.GetName(), + "SPOT") + if err != nil { + g.Websocket.DataHandler <- err + } + } + + g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Pair: pair.NewCurrencyPairFromString(c), + Asset: "SPOT", + Exchange: g.GetName(), + } + + case common.StringContains(result.Method, "kline"): + var data []interface{} + err = common.JSONDecode(result.Params[0], &data) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + open, _ := strconv.ParseFloat(data[1].(string), 64) + close, _ := strconv.ParseFloat(data[2].(string), 64) + high, _ := strconv.ParseFloat(data[3].(string), 64) + low, _ := strconv.ParseFloat(data[4].(string), 64) + volume, _ := strconv.ParseFloat(data[5].(string), 64) + + g.Websocket.DataHandler <- exchange.KlineData{ + Timestamp: time.Now(), + Pair: pair.NewCurrencyPairFromString(data[7].(string)), + AssetType: "SPOT", + Exchange: g.GetName(), + OpenPrice: open, + ClosePrice: close, + HighPrice: high, + LowPrice: low, + Volume: volume, + } + } + } + } +} diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 2ce7cfaf..d64d16a7 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -307,7 +307,7 @@ func (g *Gateio) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.W // GetWebsocket returns a pointer to the exchange websocket func (g *Gateio) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return g.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/gemini/gemini.go b/exchanges/gemini/gemini.go index f23c4b29..bbea5aae 100644 --- a/exchanges/gemini/gemini.go +++ b/exchanges/gemini/gemini.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" exchange "github.com/thrasher-/gocryptotrader/exchanges" @@ -65,6 +66,7 @@ var ( // AddSession, if sandbox test is needed append a new session with with the same // API keys and change the IsSandbox variable to true. type Gemini struct { + WebsocketConn *websocket.Conn exchange.Base Role string RequiresHeartBeat bool @@ -102,7 +104,9 @@ func (g *Gemini) SetDefaults() { g.Enabled = false g.Verbose = false g.RESTPollingDelay = 10 - g.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.AutoWithdrawCryptoWithSetup | exchange.WithdrawFiatViaWebsiteOnly + g.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.AutoWithdrawCryptoWithSetup | + exchange.WithdrawFiatViaWebsiteOnly g.RequestCurrencyPairFormat.Delimiter = "" g.RequestCurrencyPairFormat.Uppercase = true g.ConfigCurrencyPairFormat.Delimiter = "" @@ -117,6 +121,8 @@ func (g *Gemini) SetDefaults() { g.APIUrlDefault = geminiAPIURL g.APIUrl = g.APIUrlDefault g.WebsocketInit() + g.Websocket.Functionality = exchange.WebsocketOrderbookSupported | + exchange.WebsocketTradeDataSupported } // Setup sets exchange configuration parameters @@ -158,6 +164,14 @@ func (g *Gemini) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = g.WebsocketSetup(g.WsConnect, + exch.Name, + exch.Websocket, + geminiWebsocketEndpoint, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/gemini/gemini_types.go b/exchanges/gemini/gemini_types.go index f31d20ea..371b0a80 100644 --- a/exchanges/gemini/gemini_types.go +++ b/exchanges/gemini/gemini_types.go @@ -1,5 +1,7 @@ package gemini +import "github.com/thrasher-/gocryptotrader/currency/pair" + // Ticker holds returned ticker data from the exchange type Ticker struct { Ask float64 `json:"ask,string"` @@ -189,3 +191,32 @@ type ErrorCapture struct { Reason string `json:"reason"` Message string `json:"message"` } + +// Response defines the main response type +type Response struct { + Type string `json:"type"` + EventID int64 `json:"eventId"` + Timestamp int64 `json:"timestamp"` + TimestampMS int64 `json:"timestampms"` + SocketSequence int64 `json:"socket_sequence"` + Events []Event `json:"events"` +} + +// Event defines orderbook and trade data +type Event struct { + Type string `json:"change"` + Reason string `json:"reason"` + Price float64 `json:"price,string"` + Delta float64 `json:"delta,string"` + Remaining float64 `json:"remaining,string"` + Side string `json:"side"` + MakerSide string `json:"makerSide"` + Amount float64 `json:"amount"` +} + +// ReadData defines read data from the websocket connection +type ReadData struct { + Raw []byte + Currency pair.CurrencyPair + FeedType string +} diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go new file mode 100644 index 00000000..c0ba013b --- /dev/null +++ b/exchanges/gemini/gemini_websocket.go @@ -0,0 +1,232 @@ +// Package gemini exchange documentation can be found at +// https://docs.sandbox.gemini.com +package gemini + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency/pair" + exchange "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" +) + +const ( + geminiWebsocketEndpoint = "wss://api.gemini.com/v1/marketdata/%s?%s" + geminiWsEvent = "event" + geminiWsMarketData = "marketdata" +) + +// Instantiates a communications channel between websocket connections +var comms = make(chan ReadData, 1) + +// WsConnect initiates a websocket connection +func (g *Gemini) WsConnect() error { + if !g.Websocket.IsEnabled() || !g.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + if g.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(g.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + go g.WsHandleData() + + return g.WsSubscribe(dialer) +} + +// WsSubscribe subscribes to the full websocket suite on gemini exchange +func (g *Gemini) WsSubscribe(dialer websocket.Dialer) error { + enabledCurrencies := g.GetEnabledCurrencies() + for i, c := range enabledCurrencies { + val := url.Values{} + val.Set("heartbeat", "true") + + endpoint := fmt.Sprintf(g.Websocket.GetWebsocketURL(), + c.Pair().String(), + val.Encode()) + + conn, _, err := dialer.Dial(endpoint, http.Header{}) + if err != nil { + return err + } + + go g.WsReadData(conn, c, geminiWsMarketData) + + if len(enabledCurrencies)-1 == i { + return nil + } + + time.Sleep(5 * time.Second) // rate limiter, limit of 12 requests per + // minute + } + return nil +} + +// WsReadData reads from the websocket connection and returns the websocket +// response +func (g *Gemini) WsReadData(ws *websocket.Conn, c pair.CurrencyPair, feedType string) { + g.Websocket.Wg.Add(1) + + defer func() { + err := ws.Close() + if err != nil { + g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + g.Websocket.Wg.Done() + }() + + for { + select { + case <-g.Websocket.ShutdownC: + return + + default: + _, resp, err := ws.ReadMessage() + if err != nil { + g.Websocket.DataHandler <- err + return + } + + g.Websocket.TrafficAlert <- struct{}{} + comms <- ReadData{Raw: resp, Currency: c, FeedType: feedType} + } + } + +} + +// WsHandleData handles all the websocket data coming from the websocket +// connection +func (g *Gemini) WsHandleData() { + g.Websocket.Wg.Add(1) + defer g.Websocket.Wg.Done() + + for { + select { + case <-g.Websocket.ShutdownC: + return + + case resp := <-comms: + switch resp.FeedType { + case geminiWsEvent: + + case geminiWsMarketData: + var result Response + err := common.JSONDecode(resp.Raw, &result) + if err != nil { + g.Websocket.DataHandler <- err + continue + } + + switch result.Type { + case "update": + if result.Timestamp == 0 && result.TimestampMS == 0 { + var bids, asks []orderbook.Item + for _, event := range result.Events { + if event.Reason != "initial" { + g.Websocket.DataHandler <- errors.New("gemini_websocket.go orderbook should be snapshot only") + continue + } + + if event.Side == "ask" { + asks = append(asks, orderbook.Item{ + Amount: event.Remaining, + Price: event.Price, + }) + } else { + bids = append(bids, orderbook.Item{ + Amount: event.Remaining, + Price: event.Price, + }) + } + } + + var newOrderbook orderbook.Base + newOrderbook.Asks = asks + newOrderbook.Bids = bids + newOrderbook.AssetType = "SPOT" + newOrderbook.CurrencyPair = resp.Currency.Pair().String() + newOrderbook.LastUpdated = time.Now() + newOrderbook.Pair = resp.Currency + + err := g.Websocket.Orderbook.LoadSnapshot(newOrderbook, + g.GetName(), + false) + if err != nil { + g.Websocket.DataHandler <- err + break + } + + g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency, + Asset: "SPOT", + Exchange: g.GetName()} + + } else { + for _, event := range result.Events { + if event.Type == "trade" { + g.Websocket.DataHandler <- exchange.TradeData{ + Timestamp: time.Now(), + CurrencyPair: resp.Currency, + AssetType: "SPOT", + Exchange: g.GetName(), + EventTime: result.Timestamp, + Price: event.Price, + Amount: event.Amount, + Side: event.MakerSide, + } + + } else { + var i orderbook.Item + i.Amount = event.Remaining + i.Price = event.Price + if event.Side == "ask" { + err := g.Websocket.Orderbook.Update(nil, + []orderbook.Item{i}, + resp.Currency, + time.Now(), + g.GetName(), + "SPOT") + if err != nil { + g.Websocket.DataHandler <- err + } + } else { + err := g.Websocket.Orderbook.Update([]orderbook.Item{i}, + nil, + resp.Currency, + time.Now(), + g.GetName(), + "SPOT") + if err != nil { + g.Websocket.DataHandler <- err + } + } + } + } + + g.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{Pair: resp.Currency, + Asset: "SPOT", + Exchange: g.GetName()} + } + + case "heartbeat": + + default: + g.Websocket.DataHandler <- fmt.Errorf("gemini_websocket.go - unhandled data %s", + resp.Raw) + } + } + } + } +} diff --git a/exchanges/gemini/gemini_wrapper.go b/exchanges/gemini/gemini_wrapper.go index cd5c0698..734d533e 100644 --- a/exchanges/gemini/gemini_wrapper.go +++ b/exchanges/gemini/gemini_wrapper.go @@ -229,7 +229,7 @@ func (g *Gemini) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.W // GetWebsocket returns a pointer to the exchange websocket func (g *Gemini) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return g.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/hitbtc/hitbtc.go b/exchanges/hitbtc/hitbtc.go index 02cdd486..cd8108ae 100644 --- a/exchanges/hitbtc/hitbtc.go +++ b/exchanges/hitbtc/hitbtc.go @@ -60,7 +60,8 @@ func (h *HitBTC) SetDefaults() { h.Fee = 0 h.Verbose = false h.RESTPollingDelay = 10 - h.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals + h.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.NoFiatWithdrawals h.RequestCurrencyPairFormat.Delimiter = "" h.RequestCurrencyPairFormat.Uppercase = true h.ConfigCurrencyPairFormat.Delimiter = "-" @@ -75,6 +76,8 @@ func (h *HitBTC) SetDefaults() { h.APIUrlDefault = apiURL h.APIUrl = h.APIUrlDefault h.WebsocketInit() + h.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketOrderbookSupported } // Setup sets user exchange configuration settings diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 33c1309d..1992a727 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -12,7 +12,6 @@ import ( "github.com/thrasher-/gocryptotrader/currency/pair" exchange "github.com/thrasher-/gocryptotrader/exchanges" "github.com/thrasher-/gocryptotrader/exchanges/orderbook" - log "github.com/thrasher-/gocryptotrader/logger" ) const ( @@ -43,7 +42,6 @@ func (h *HitBTC) WsConnect() error { return err } - go h.WsReadData() go h.WsHandleData() err = h.WsSubscribe() @@ -106,7 +104,18 @@ func (h *HitBTC) WsSubscribe() error { } // WsReadData reads from the websocket connection -func (h *HitBTC) WsReadData() { +func (h *HitBTC) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := h.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + h.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles websocket data +func (h *HitBTC) WsHandleData() { h.Websocket.Wg.Add(1) defer func() { @@ -124,32 +133,17 @@ func (h *HitBTC) WsReadData() { return default: - _, resp, err := h.WebsocketConn.ReadMessage() + resp, err := h.WsReadData() if err != nil { h.Websocket.DataHandler <- err return } - h.Websocket.TrafficAlert <- struct{}{} - h.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: resp} - } - } -} - -// WsHandleData handles websocket data -func (h *HitBTC) WsHandleData() { - h.Websocket.Wg.Add(1) - defer h.Websocket.Wg.Done() - - for { - select { - case <-h.Websocket.ShutdownC: - - case resp := <-h.Websocket.Intercomm: var init capture - err := common.JSONDecode(resp.Raw, &init) + err = common.JSONDecode(resp.Raw, &init) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } if init.Error.Message != "" || init.Error.Code != 0 { @@ -168,12 +162,14 @@ func (h *HitBTC) WsHandleData() { var ticker WsTicker err := common.JSONDecode(resp.Raw, &ticker) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } ts, err := time.Parse(time.RFC3339, ticker.Params.Timestamp) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } h.Websocket.DataHandler <- exchange.TickerData{ @@ -191,19 +187,22 @@ func (h *HitBTC) WsHandleData() { var obSnapshot WsOrderbook err := common.JSONDecode(resp.Raw, &obSnapshot) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } err = h.WsProcessOrderbookSnapshot(obSnapshot) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } case "updateOrderbook": var obUpdate WsOrderbook err := common.JSONDecode(resp.Raw, &obUpdate) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } h.WsProcessOrderbookUpdate(obUpdate) @@ -212,14 +211,16 @@ func (h *HitBTC) WsHandleData() { var tradeSnapshot WsTrade err := common.JSONDecode(resp.Raw, &tradeSnapshot) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } case "updateTrades": var tradeUpdates WsTrade err := common.JSONDecode(resp.Raw, &tradeUpdates) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } } } @@ -252,7 +253,7 @@ func (h *HitBTC) WsProcessOrderbookSnapshot(ob WsOrderbook) error { newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(newOrderbook, h.GetName()) + err := h.Websocket.Orderbook.LoadSnapshot(newOrderbook, h.GetName(), false) if err != nil { return err } diff --git a/exchanges/huobi/huobi.go b/exchanges/huobi/huobi.go index c9132248..06b2a96e 100644 --- a/exchanges/huobi/huobi.go +++ b/exchanges/huobi/huobi.go @@ -77,7 +77,8 @@ func (h *HUOBI) SetDefaults() { h.Fee = 0 h.Verbose = false h.RESTPollingDelay = 10 - h.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | exchange.NoFiatWithdrawals + h.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | + exchange.NoFiatWithdrawals h.RequestCurrencyPairFormat.Delimiter = "" h.RequestCurrencyPairFormat.Uppercase = false h.ConfigCurrencyPairFormat.Delimiter = "-" @@ -92,6 +93,9 @@ func (h *HUOBI) SetDefaults() { h.APIUrlDefault = huobiAPIURL h.APIUrl = h.APIUrlDefault h.WebsocketInit() + h.Websocket.Functionality = exchange.WebsocketKlineSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketTradeDataSupported } // Setup sets user configuration @@ -132,7 +136,6 @@ func (h *HUOBI) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } - err = h.WebsocketSetup(h.WsConnect, exch.Name, exch.Websocket, diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 11d6c6c7..12489c7a 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -20,7 +20,7 @@ import ( ) const ( - huobiSocketIOAddress = "wss://api.huobi.pro/ws" + huobiSocketIOAddress = "wss://api.huobi.pro/hbus/ws" wsMarketKline = "market.%s.kline.1min" wsMarketDepth = "market.%s.depth.step0" wsMarketTrade = "market.%s.trade.detail" @@ -50,7 +50,6 @@ func (h *HUOBI) WsConnect() error { } go h.WsHandleData() - go h.WsReadData() err = h.WsSubscribe() if err != nil { @@ -61,7 +60,31 @@ func (h *HUOBI) WsConnect() error { } // WsReadData reads data from the websocket connection -func (h *HUOBI) WsReadData() { +func (h *HUOBI) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := h.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + h.Websocket.TrafficAlert <- struct{}{} + + b := bytes.NewReader(resp) + gReader, err := gzip.NewReader(b) + if err != nil { + return exchange.WebsocketResponse{}, err + } + + unzipped, err := ioutil.ReadAll(gReader) + if err != nil { + return exchange.WebsocketResponse{}, err + } + gReader.Close() + + return exchange.WebsocketResponse{Raw: unzipped}, nil +} + +// WsHandleData handles data read from the websocket connection +func (h *HUOBI) WsHandleData() { h.Websocket.Wg.Add(1) defer func() { @@ -79,43 +102,17 @@ func (h *HUOBI) WsReadData() { return default: - _, resp, err := h.WebsocketConn.ReadMessage() + resp, err := h.WsReadData() if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + return } - h.Websocket.TrafficAlert <- struct{}{} - - b := bytes.NewReader(resp) - gReader, err := gzip.NewReader(b) - if err != nil { - log.Error(err) - } - - unzipped, err := ioutil.ReadAll(gReader) - if err != nil { - log.Error(err) - } - gReader.Close() - - h.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: unzipped} - } - } -} - -// WsHandleData handles data read from the websocket connection -func (h *HUOBI) WsHandleData() { - h.Websocket.Wg.Add(1) - defer h.Websocket.Wg.Done() - - for { - select { - case <-h.Websocket.ShutdownC: - case resp := <-h.Websocket.Intercomm: var init WsResponse - err := common.JSONDecode(resp.Raw, &init) + err = common.JSONDecode(resp.Raw, &init) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } if init.Status == "error" { @@ -142,7 +139,8 @@ func (h *HUOBI) WsHandleData() { var depth WsDepth err := common.JSONDecode(resp.Raw, &depth) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } data := common.SplitStrings(depth.Channel, ".") @@ -153,7 +151,8 @@ func (h *HUOBI) WsHandleData() { var kline WsKline err := common.JSONDecode(resp.Raw, &kline) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } data := common.SplitStrings(kline.Channel, ".") @@ -174,7 +173,8 @@ func (h *HUOBI) WsHandleData() { var trade WsTrade err := common.JSONDecode(resp.Raw, &trade) if err != nil { - log.Error(err) + h.Websocket.DataHandler <- err + continue } data := common.SplitStrings(trade.Channel, ".") @@ -215,7 +215,7 @@ func (h *HUOBI) WsProcessOrderbook(ob WsDepth, symbol string) error { newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(newOrderbook, h.GetName()) + err := h.Websocket.Orderbook.LoadSnapshot(newOrderbook, h.GetName(), false) if err != nil { return err } diff --git a/exchanges/huobihadax/huobihadax.go b/exchanges/huobihadax/huobihadax.go index ee9391a3..cce2bfeb 100644 --- a/exchanges/huobihadax/huobihadax.go +++ b/exchanges/huobihadax/huobihadax.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency/symbol" @@ -59,6 +60,7 @@ const ( // HUOBIHADAX is the overarching type across this package type HUOBIHADAX struct { + WebsocketConn *websocket.Conn exchange.Base } @@ -69,7 +71,8 @@ func (h *HUOBIHADAX) SetDefaults() { h.Fee = 0 h.Verbose = false h.RESTPollingDelay = 10 - h.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | exchange.NoFiatWithdrawals + h.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | + exchange.NoFiatWithdrawals h.RequestCurrencyPairFormat.Delimiter = "" h.RequestCurrencyPairFormat.Uppercase = false h.ConfigCurrencyPairFormat.Delimiter = "-" @@ -84,6 +87,9 @@ func (h *HUOBIHADAX) SetDefaults() { h.APIUrlDefault = huobihadaxAPIURL h.APIUrl = h.APIUrlDefault h.WebsocketInit() + h.Websocket.Functionality = exchange.WebsocketKlineSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketOrderbookSupported } // Setup sets user configuration @@ -123,6 +129,14 @@ func (h *HUOBIHADAX) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = h.WebsocketSetup(h.WsConnect, + exch.Name, + exch.Websocket, + huobiGlobalWebsocketEndpoint, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/huobihadax/huobihadax_types.go b/exchanges/huobihadax/huobihadax_types.go index 3c7f09cc..85a24d6f 100644 --- a/exchanges/huobihadax/huobihadax_types.go +++ b/exchanges/huobihadax/huobihadax_types.go @@ -1,5 +1,7 @@ package huobihadax +import "math/big" + // Response stores the Huobi response information type Response struct { Status string `json:"status"` @@ -248,3 +250,72 @@ type History struct { CreatedAt int64 `json:"created-at"` UpdatedAt int64 `json:"Updated-at"` } + +// WsRequest defines a request data structure +type WsRequest struct { + Topic string `json:"req,omitempty"` + Subscribe string `json:"sub,omitempty"` + ClientGeneratedID string `json:"id,omitempty"` +} + +// WsResponse defines a response from the websocket connection when there +// is an error +type WsResponse struct { + TS int64 `json:"ts"` + Status string `json:"status"` + ErrorCode string `json:"err-code"` + ErrorMessage string `json:"err-msg"` + Ping int64 `json:"ping"` + Channel string `json:"ch"` + Subscribed string `json:"subbed"` +} + +// WsHeartBeat defines a heartbeat request +type WsHeartBeat struct { + ClientNonce int64 `json:"ping"` +} + +// WsDepth defines market depth websocket response +type WsDepth struct { + Channel string `json:"ch"` + Timestamp int64 `json:"ts"` + Tick struct { + Bids []interface{} `json:"bids"` + Asks []interface{} `json:"asks"` + Timestamp int64 `json:"ts"` + Version int64 `json:"version"` + } `json:"tick"` +} + +// WsKline defines market kline websocket response +type WsKline struct { + Channel string `json:"ch"` + Timestamp int64 `json:"ts"` + Tick struct { + ID int64 `json:"id"` + Open float64 `json:"open"` + Close float64 `json:"close"` + Low float64 `json:"low"` + High float64 `json:"high"` + Amount float64 `json:"amount"` + Volume float64 `json:"vol"` + Count int64 `json:"count"` + } +} + +// WsTrade defines market trade websocket response +type WsTrade struct { + Channel string `json:"ch"` + Timestamp int64 `json:"ts"` + Tick struct { + ID int64 `json:"id"` + Timestamp int64 `json:"ts"` + Data []struct { + Amount float64 `json:"amount"` + Timestamp int64 `json:"ts"` + ID big.Int `json:"id,number"` + Price float64 `json:"price"` + Direction string `json:"direction"` + } `json:"data"` + } +} diff --git a/exchanges/huobihadax/huobihadax_websocket.go b/exchanges/huobihadax/huobihadax_websocket.go new file mode 100644 index 00000000..8090d153 --- /dev/null +++ b/exchanges/huobihadax/huobihadax_websocket.go @@ -0,0 +1,275 @@ +package huobihadax + +import ( + "bytes" + "compress/gzip" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency/pair" + exchange "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" +) + +const ( + huobiGlobalWebsocketEndpoint = "wss://api.huobi.pro/ws" + huobiGlobalAssetWebsocketEndpoint = "wss://api.huobi.pro/ws/v1" + huobiGlobalContractWebsocketEndpoint = "wss://www.hbdm.com/ws" + wsMarketKline = "market.%s.kline.1min" + wsMarketDepth = "market.%s.depth.step0" + wsMarketTrade = "market.%s.trade.detail" +) + +// WsConnect initiates a new websocket connection +func (h *HUOBIHADAX) WsConnect() error { + if !h.Websocket.IsEnabled() || !h.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + + if h.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(h.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + var err error + h.WebsocketConn, _, err = dialer.Dial(h.Websocket.GetWebsocketURL(), http.Header{}) + if err != nil { + return err + } + + go h.WsHandleData() + + err = h.WsSubscribe() + if err != nil { + return err + } + + return nil +} + +// WsReadData reads data from the websocket connection +func (h *HUOBIHADAX) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := h.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + h.Websocket.TrafficAlert <- struct{}{} + + b := bytes.NewReader(resp) + gReader, err := gzip.NewReader(b) + if err != nil { + return exchange.WebsocketResponse{}, err + } + + unzipped, err := ioutil.ReadAll(gReader) + if err != nil { + return exchange.WebsocketResponse{}, err + } + gReader.Close() + + return exchange.WebsocketResponse{Raw: unzipped}, nil +} + +// WsHandleData handles data read from the websocket connection +func (h *HUOBIHADAX) WsHandleData() { + h.Websocket.Wg.Add(1) + + defer func() { + err := h.WebsocketConn.Close() + if err != nil { + h.Websocket.DataHandler <- fmt.Errorf("huobi_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + h.Websocket.Wg.Done() + }() + + for { + select { + case <-h.Websocket.ShutdownC: + return + + default: + resp, err := h.WsReadData() + if err != nil { + h.Websocket.DataHandler <- err + return + } + + var init WsResponse + err = common.JSONDecode(resp.Raw, &init) + if err != nil { + h.Websocket.DataHandler <- err + continue + } + + if init.Status == "error" { + h.Websocket.DataHandler <- fmt.Errorf("huobi.go Websocker error %s %s", + init.ErrorCode, + init.ErrorMessage) + continue + } + + if init.Subscribed != "" { + continue + } + + if init.Ping != 0 { + err = h.WebsocketConn.WriteJSON(`{"pong":1337}`) + if err != nil { + h.Websocket.DataHandler <- err + continue + } + continue + } + + switch { + case common.StringContains(init.Channel, "depth"): + var depth WsDepth + err := common.JSONDecode(resp.Raw, &depth) + if err != nil { + h.Websocket.DataHandler <- err + continue + } + + data := common.SplitStrings(depth.Channel, ".") + + h.WsProcessOrderbook(depth, data[1]) + + case common.StringContains(init.Channel, "kline"): + var kline WsKline + err := common.JSONDecode(resp.Raw, &kline) + if err != nil { + h.Websocket.DataHandler <- err + continue + } + + data := common.SplitStrings(kline.Channel, ".") + + h.Websocket.DataHandler <- exchange.KlineData{ + Timestamp: time.Unix(0, kline.Timestamp), + Exchange: h.GetName(), + AssetType: "SPOT", + Pair: pair.NewCurrencyPairFromString(data[1]), + OpenPrice: kline.Tick.Open, + ClosePrice: kline.Tick.Close, + HighPrice: kline.Tick.High, + LowPrice: kline.Tick.Low, + Volume: kline.Tick.Volume, + } + + case common.StringContains(init.Channel, "trade"): + var trade WsTrade + err := common.JSONDecode(resp.Raw, &trade) + if err != nil { + h.Websocket.DataHandler <- err + continue + } + + data := common.SplitStrings(trade.Channel, ".") + + h.Websocket.DataHandler <- exchange.TradeData{ + Exchange: h.GetName(), + AssetType: "SPOT", + CurrencyPair: pair.NewCurrencyPairFromString(data[1]), + Timestamp: time.Unix(0, trade.Tick.Timestamp), + } + } + } + } +} + +// WsProcessOrderbook processes new orderbook data +func (h *HUOBIHADAX) WsProcessOrderbook(ob WsDepth, symbol string) error { + var bids []orderbook.Item + for _, data := range ob.Tick.Bids { + bidLevel := data.([]interface{}) + bids = append(bids, orderbook.Item{Price: bidLevel[0].(float64), + Amount: bidLevel[0].(float64)}) + } + + var asks []orderbook.Item + for _, data := range ob.Tick.Asks { + askLevel := data.([]interface{}) + asks = append(asks, orderbook.Item{Price: askLevel[0].(float64), + Amount: askLevel[0].(float64)}) + } + + p := pair.NewCurrencyPairFromString(symbol) + + var newOrderbook orderbook.Base + newOrderbook.Asks = asks + newOrderbook.Bids = bids + newOrderbook.CurrencyPair = symbol + newOrderbook.LastUpdated = time.Now() + newOrderbook.Pair = p + + err := h.Websocket.Orderbook.LoadSnapshot(newOrderbook, h.GetName(), false) + if err != nil { + return err + } + + h.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Pair: p, + Exchange: h.GetName(), + Asset: "SPOT", + } + + return nil +} + +// WsSubscribe susbcribes to the current websocket streams based on the enabled +// pair +func (h *HUOBIHADAX) WsSubscribe() error { + pairs := h.GetEnabledCurrencies() + + for _, p := range pairs { + fPair := exchange.FormatExchangeCurrency(h.GetName(), p) + + depthTopic := fmt.Sprintf(wsMarketDepth, fPair.String()) + depthJSON, err := common.JSONEncode(WsRequest{Subscribe: depthTopic}) + if err != nil { + return err + } + + err = h.WebsocketConn.WriteMessage(websocket.TextMessage, depthJSON) + if err != nil { + return err + } + + klineTopic := fmt.Sprintf(wsMarketKline, fPair.String()) + KlineJSON, err := common.JSONEncode(WsRequest{Subscribe: klineTopic}) + if err != nil { + return err + } + + err = h.WebsocketConn.WriteMessage(websocket.TextMessage, KlineJSON) + if err != nil { + return err + } + + tradeTopic := fmt.Sprintf(wsMarketTrade, fPair.String()) + tradeJSON, err := common.JSONEncode(WsRequest{Subscribe: tradeTopic}) + if err != nil { + return err + } + + err = h.WebsocketConn.WriteMessage(websocket.TextMessage, tradeJSON) + if err != nil { + return err + } + } + return nil +} diff --git a/exchanges/huobihadax/huobihadax_wrapper.go b/exchanges/huobihadax/huobihadax_wrapper.go index 8c5665e9..808cd00b 100644 --- a/exchanges/huobihadax/huobihadax_wrapper.go +++ b/exchanges/huobihadax/huobihadax_wrapper.go @@ -327,7 +327,7 @@ func (h *HUOBIHADAX) WithdrawFiatFundsToInternationalBank(withdrawRequest exchan // GetWebsocket returns a pointer to the exchange websocket func (h *HUOBIHADAX) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return h.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/itbit/itbit.go b/exchanges/itbit/itbit.go index 0ba68498..3848a0e2 100644 --- a/exchanges/itbit/itbit.go +++ b/exchanges/itbit/itbit.go @@ -49,7 +49,8 @@ func (i *ItBit) SetDefaults() { i.TakerFee = 0.50 i.Verbose = false i.RESTPollingDelay = 10 - i.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | exchange.WithdrawFiatViaWebsiteOnly + i.APIWithdrawPermissions = exchange.WithdrawCryptoViaWebsiteOnly | + exchange.WithdrawFiatViaWebsiteOnly i.RequestCurrencyPairFormat.Delimiter = "" i.RequestCurrencyPairFormat.Uppercase = true i.ConfigCurrencyPairFormat.Delimiter = "" diff --git a/exchanges/itbit/itbit_wrapper.go b/exchanges/itbit/itbit_wrapper.go index 7c82e2f5..18ecdfbc 100644 --- a/exchanges/itbit/itbit_wrapper.go +++ b/exchanges/itbit/itbit_wrapper.go @@ -268,7 +268,7 @@ func (i *ItBit) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.Wi // GetWebsocket returns a pointer to the exchange websocket func (i *ItBit) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/kraken/kraken.go b/exchanges/kraken/kraken.go index 8178ddac..70dcc928 100644 --- a/exchanges/kraken/kraken.go +++ b/exchanges/kraken/kraken.go @@ -63,7 +63,10 @@ func (k *Kraken) SetDefaults() { k.CryptoFee = 0.10 k.Verbose = false k.RESTPollingDelay = 10 - k.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | exchange.WithdrawCryptoWith2FA | exchange.AutoWithdrawFiatWithSetup | exchange.WithdrawFiatWith2FA + k.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithSetup | + exchange.WithdrawCryptoWith2FA | + exchange.AutoWithdrawFiatWithSetup | + exchange.WithdrawFiatWith2FA k.RequestCurrencyPairFormat.Delimiter = "" k.RequestCurrencyPairFormat.Uppercase = true k.RequestCurrencyPairFormat.Separator = "," diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index ffe4c79f..4439095b 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -278,7 +278,7 @@ func (k *Kraken) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.W // GetWebsocket returns a pointer to the exchange websocket func (k *Kraken) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/lakebtc/lakebtc.go b/exchanges/lakebtc/lakebtc.go index 59a79b0c..b4fda691 100644 --- a/exchanges/lakebtc/lakebtc.go +++ b/exchanges/lakebtc/lakebtc.go @@ -49,7 +49,8 @@ func (l *LakeBTC) SetDefaults() { l.MakerFee = 0.15 l.Verbose = false l.RESTPollingDelay = 10 - l.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.WithdrawFiatViaWebsiteOnly + l.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.WithdrawFiatViaWebsiteOnly l.RequestCurrencyPairFormat.Delimiter = "" l.RequestCurrencyPairFormat.Uppercase = true l.ConfigCurrencyPairFormat.Delimiter = "" diff --git a/exchanges/lakebtc/lakebtc_wrapper.go b/exchanges/lakebtc/lakebtc_wrapper.go index a9dce828..ae3c4cc0 100644 --- a/exchanges/lakebtc/lakebtc_wrapper.go +++ b/exchanges/lakebtc/lakebtc_wrapper.go @@ -251,7 +251,8 @@ func (l *LakeBTC) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange. // GetWebsocket returns a pointer to the exchange websocket func (l *LakeBTC) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + // Documents are too vague to implement + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/liqui/liqui.go b/exchanges/liqui/liqui.go index 8ef70965..be7f6ddc 100644 --- a/exchanges/liqui/liqui.go +++ b/exchanges/liqui/liqui.go @@ -52,7 +52,8 @@ func (l *Liqui) SetDefaults() { l.Verbose = false l.RESTPollingDelay = 10 l.Ticker = make(map[string]Ticker) - l.APIWithdrawPermissions = exchange.WithdrawCryptoWithAPIPermission | exchange.NoFiatWithdrawals + l.APIWithdrawPermissions = exchange.WithdrawCryptoWithAPIPermission | + exchange.NoFiatWithdrawals l.RequestCurrencyPairFormat.Delimiter = "_" l.RequestCurrencyPairFormat.Uppercase = false l.RequestCurrencyPairFormat.Separator = "-" diff --git a/exchanges/liqui/liqui_wrapper.go b/exchanges/liqui/liqui_wrapper.go index 678bf21d..0ad62d86 100644 --- a/exchanges/liqui/liqui_wrapper.go +++ b/exchanges/liqui/liqui_wrapper.go @@ -248,7 +248,7 @@ func (l *Liqui) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.Wi // GetWebsocket returns a pointer to the exchange websocket func (l *Liqui) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/localbitcoins/localbitcoins.go b/exchanges/localbitcoins/localbitcoins.go index 1e6cf32b..ff80f2e3 100644 --- a/exchanges/localbitcoins/localbitcoins.go +++ b/exchanges/localbitcoins/localbitcoins.go @@ -117,7 +117,8 @@ func (l *LocalBitcoins) SetDefaults() { l.Verbose = false l.Verbose = false l.RESTPollingDelay = 10 - l.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.WithdrawFiatViaWebsiteOnly + l.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.WithdrawFiatViaWebsiteOnly l.RequestCurrencyPairFormat.Delimiter = "" l.RequestCurrencyPairFormat.Uppercase = true l.ConfigCurrencyPairFormat.Delimiter = "" diff --git a/exchanges/localbitcoins/localbitcoins_wrapper.go b/exchanges/localbitcoins/localbitcoins_wrapper.go index 8cef5811..cc31664a 100644 --- a/exchanges/localbitcoins/localbitcoins_wrapper.go +++ b/exchanges/localbitcoins/localbitcoins_wrapper.go @@ -281,7 +281,7 @@ func (l *LocalBitcoins) WithdrawFiatFundsToInternationalBank(withdrawRequest exc // GetWebsocket returns a pointer to the exchange websocket func (l *LocalBitcoins) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/okcoin/okcoin.go b/exchanges/okcoin/okcoin.go index 83efd09d..074e1f5d 100644 --- a/exchanges/okcoin/okcoin.go +++ b/exchanges/okcoin/okcoin.go @@ -101,10 +101,14 @@ func (o *OKCoin) SetDefaults() { o.Verbose = false o.RESTPollingDelay = 10 o.AssetTypes = []string{ticker.Spot} - o.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.WithdrawFiatViaWebsiteOnly + o.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.WithdrawFiatViaWebsiteOnly o.SupportsAutoPairUpdating = false o.SupportsRESTTickerBatching = false o.WebsocketInit() + o.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketKlineSupported } // Setup sets exchange configuration parameters diff --git a/exchanges/okcoin/okcoin_websocket.go b/exchanges/okcoin/okcoin_websocket.go index c408919f..5cbc80eb 100644 --- a/exchanges/okcoin/okcoin_websocket.go +++ b/exchanges/okcoin/okcoin_websocket.go @@ -71,7 +71,6 @@ func (o *OKCoin) WsConnect() error { o.WebsocketConn.SetPingHandler(o.PingHandler) - go o.WsReadData() go o.WsHandleData() for _, p := range o.GetEnabledCurrencies() { @@ -87,7 +86,18 @@ func (o *OKCoin) WsConnect() error { } // WsReadData reads from the websocket connection -func (o *OKCoin) WsReadData() { +func (o *OKCoin) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := o.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + o.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles stream data from the websocket connection +func (o *OKCoin) WsHandleData() { o.Websocket.Wg.Add(1) defer func() { @@ -105,34 +115,16 @@ func (o *OKCoin) WsReadData() { return default: - _, resp, err := o.WebsocketConn.ReadMessage() + resp, err := o.WsReadData() if err != nil { o.Websocket.DataHandler <- err - return } - o.Websocket.TrafficAlert <- struct{}{} - o.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: resp} - } - } - -} - -// WsHandleData handles stream data from the websocket connection -func (o *OKCoin) WsHandleData() { - o.Websocket.Wg.Add(1) - defer o.Websocket.Wg.Done() - - for { - select { - case <-o.Websocket.ShutdownC: - return - - case resp := <-o.Websocket.Intercomm: var init []WsResponse - err := common.JSONDecode(resp.Raw, &init) + err = common.JSONDecode(resp.Raw, &init) if err != nil { - log.Error(err) + o.Websocket.DataHandler <- err + continue } if init[0].ErrorCode != "" { @@ -166,8 +158,8 @@ func (o *OKCoin) WsHandleData() { err = common.JSONDecode(init[0].Data, &ticker) if err != nil { - log.Error(err) - + o.Websocket.DataHandler <- err + continue } o.Websocket.DataHandler <- exchange.TickerData{ @@ -187,7 +179,8 @@ func (o *OKCoin) WsHandleData() { err = common.JSONDecode(init[0].Data, &orderbook) if err != nil { - log.Error(err) + o.Websocket.DataHandler <- err + continue } o.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ @@ -201,7 +194,8 @@ func (o *OKCoin) WsHandleData() { err = common.JSONDecode(init[0].Data, &klineData) if err != nil { - log.Error(err) + o.Websocket.DataHandler <- err + continue } var klines []WsKlines @@ -237,7 +231,8 @@ func (o *OKCoin) WsHandleData() { var dealsData [][]interface{} err = common.JSONDecode(init[0].Data, &dealsData) if err != nil { - log.Error(err) + o.Websocket.DataHandler <- err + continue } var deals []WsDeals diff --git a/exchanges/okex/okex.go b/exchanges/okex/okex.go index 091ac613..1ab68282 100644 --- a/exchanges/okex/okex.go +++ b/exchanges/okex/okex.go @@ -106,7 +106,8 @@ func (o *OKEX) SetDefaults() { o.Enabled = false o.Verbose = false o.RESTPollingDelay = 10 - o.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals + o.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.NoFiatWithdrawals o.RequestCurrencyPairFormat.Delimiter = "_" o.RequestCurrencyPairFormat.Uppercase = false o.ConfigCurrencyPairFormat.Delimiter = "_" @@ -121,6 +122,10 @@ func (o *OKEX) SetDefaults() { o.APIUrl = o.APIUrlDefault o.AssetTypes = []string{ticker.Spot} o.WebsocketInit() + o.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketTradeDataSupported | + exchange.WebsocketKlineSupported | + exchange.WebsocketOrderbookSupported } // Setup method sets current configuration details if enabled diff --git a/exchanges/okex/okex_websocket.go b/exchanges/okex/okex_websocket.go index df44ef47..aa6210b3 100644 --- a/exchanges/okex/okex_websocket.go +++ b/exchanges/okex/okex_websocket.go @@ -57,7 +57,6 @@ func (o *OKEX) WsConnect() error { } go o.WsHandleData() - go o.WsReadData() go o.wsPingHandler() err = o.WsSubscribe() @@ -117,51 +116,30 @@ func (o *OKEX) WsSubscribe() error { } // WsReadData reads data from the websocket connection -func (o *OKEX) WsReadData() { - o.Websocket.Wg.Add(1) +func (o *OKEX) WsReadData() (exchange.WebsocketResponse, error) { + mType, resp, err := o.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } - defer func() { - err := o.WebsocketConn.Close() + o.Websocket.TrafficAlert <- struct{}{} + + var standardMessage []byte + + switch mType { + case websocket.TextMessage: + standardMessage = resp + + case websocket.BinaryMessage: + reader := flate.NewReader(bytes.NewReader(resp)) + standardMessage, err = ioutil.ReadAll(reader) + reader.Close() if err != nil { - o.Websocket.DataHandler <- fmt.Errorf("okex_websocket.go - Unable to to close Websocket connection. Error: %s", - err) - } - o.Websocket.Wg.Done() - }() - - for { - select { - case <-o.Websocket.ShutdownC: - return - - default: - mType, resp, err := o.WebsocketConn.ReadMessage() - if err != nil { - o.Websocket.DataHandler <- err - return - } - - o.Websocket.TrafficAlert <- struct{}{} - - var standardMessage []byte - - switch mType { - case websocket.TextMessage: - standardMessage = resp - - case websocket.BinaryMessage: - reader := flate.NewReader(bytes.NewReader(resp)) - standardMessage, err = ioutil.ReadAll(reader) - reader.Close() - if err != nil { - o.Websocket.DataHandler <- err - return - } - } - - o.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: standardMessage} + return exchange.WebsocketResponse{}, err } } + + return exchange.WebsocketResponse{Raw: standardMessage}, nil } func (o *OKEX) wsPingHandler() { @@ -188,23 +166,37 @@ func (o *OKEX) wsPingHandler() { // WsHandleData handles the read data from the websocket connection func (o *OKEX) WsHandleData() { o.Websocket.Wg.Add(1) - defer o.Websocket.Wg.Done() + + defer func() { + err := o.WebsocketConn.Close() + if err != nil { + o.Websocket.DataHandler <- fmt.Errorf("okex_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + o.Websocket.Wg.Done() + }() for { select { case <-o.Websocket.ShutdownC: return - case resp := <-o.Websocket.Intercomm: + default: + resp, err := o.WsReadData() + if err != nil { + o.Websocket.DataHandler <- err + return + } + multiStreamDataArr := []MultiStreamData{} - err := common.JSONDecode(resp.Raw, &multiStreamDataArr) + err = common.JSONDecode(resp.Raw, &multiStreamDataArr) if err != nil { if strings.Contains(string(resp.Raw), "pong") { continue } else { - log.Error(err) - return + o.Websocket.DataHandler <- err + continue } } @@ -234,8 +226,8 @@ func (o *OKEX) WsHandleData() { err = common.JSONDecode(multiStreamData.Data, &ticker) if err != nil { - log.Errorf("OKEX Ticker Decode Error: %s", err) - return + o.Websocket.DataHandler <- err + continue } o.Websocket.DataHandler <- exchange.TickerData{ @@ -249,8 +241,8 @@ func (o *OKEX) WsHandleData() { err = common.JSONDecode(multiStreamData.Data, &deals) if err != nil { - log.Errorf("OKEX Deals Decode Error: %s", err) - return + o.Websocket.DataHandler <- err + continue } for _, trade := range deals { @@ -274,8 +266,8 @@ func (o *OKEX) WsHandleData() { err := common.JSONDecode(multiStreamData.Data, &klines) if err != nil { - log.Errorf("OKEX Klines Decode Error: %s", err) - return + o.Websocket.DataHandler <- err + continue } for _, kline := range klines { @@ -304,8 +296,8 @@ func (o *OKEX) WsHandleData() { err := common.JSONDecode(multiStreamData.Data, &depth) if err != nil { - log.Errorf("OKEX Depth Decode Error: %s", err) - return + o.Websocket.DataHandler <- err + continue } o.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ diff --git a/exchanges/poloniex/poloniex.go b/exchanges/poloniex/poloniex.go index 12686943..c329fdb4 100644 --- a/exchanges/poloniex/poloniex.go +++ b/exchanges/poloniex/poloniex.go @@ -66,7 +66,8 @@ func (p *Poloniex) SetDefaults() { p.Fee = 0 p.Verbose = false p.RESTPollingDelay = 10 - p.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.NoFiatWithdrawals + p.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.NoFiatWithdrawals p.RequestCurrencyPairFormat.Delimiter = "_" p.RequestCurrencyPairFormat.Uppercase = true p.ConfigCurrencyPairFormat.Delimiter = "_" @@ -81,6 +82,9 @@ func (p *Poloniex) SetDefaults() { p.APIUrlDefault = poloniexAPIURL p.APIUrl = p.APIUrlDefault p.WebsocketInit() + p.Websocket.Functionality = exchange.WebsocketTradeDataSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketTickerSupported } // Setup sets user exchange configuration settings diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index 59eaae86..4c322284 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -64,7 +64,6 @@ func (p *Poloniex) WsConnect() error { } } - go p.WsReadData() go p.WsHandleData() return p.WsSubscribe() @@ -105,34 +104,14 @@ func (p *Poloniex) WsSubscribe() error { } // WsReadData reads data from the websocket connection -func (p *Poloniex) WsReadData() { - p.Websocket.Wg.Add(1) - - defer func() { - err := p.WebsocketConn.Close() - if err != nil { - p.Websocket.DataHandler <- fmt.Errorf("poloniex_websocket.go - Unable to to close Websocket connection. Error: %s", - err) - } - p.Websocket.Wg.Done() - }() - - for { - select { - case <-p.Websocket.ShutdownC: - return - - default: - _, resp, err := p.WebsocketConn.ReadMessage() - if err != nil { - p.Websocket.DataHandler <- err - return - } - - p.Websocket.TrafficAlert <- struct{}{} - p.Websocket.Intercomm <- exchange.WebsocketResponse{Raw: resp} - } +func (p *Poloniex) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := p.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err } + + p.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil } func getWSDataType(data interface{}) string { @@ -151,18 +130,33 @@ func checkSubscriptionSuccess(data []interface{}) bool { // WsHandleData handles data from the websocket connection func (p *Poloniex) WsHandleData() { p.Websocket.Wg.Add(1) - defer p.Websocket.Wg.Done() + + defer func() { + err := p.WebsocketConn.Close() + if err != nil { + p.Websocket.DataHandler <- fmt.Errorf("poloniex_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + p.Websocket.Wg.Done() + }() for { select { case <-p.Websocket.ShutdownC: return - case resp := <-p.Websocket.Intercomm: - var result interface{} - err := common.JSONDecode(resp.Raw, &result) + default: + resp, err := p.WsReadData() if err != nil { - log.Errorf("poloniex websocket decode error - %s", err) + p.Websocket.DataHandler <- err + return + } + + var result interface{} + err = common.JSONDecode(resp.Raw, &result) + if err != nil { + p.Websocket.DataHandler <- err + continue } data := result.([]interface{}) @@ -222,19 +216,19 @@ func (p *Poloniex) WsHandleData() { dataL3map := dataL3[1].(map[string]interface{}) currencyPair, ok := dataL3map["currencyPair"].(string) if !ok { - log.Error("poloniex.go error - could not find currency pair in map") + p.Websocket.DataHandler <- errors.New("poloniex.go error - could not find currency pair in map") continue } orderbookData, ok := dataL3map["orderBook"].([]interface{}) if !ok { - log.Error("poloniex.go error - could not find orderbook data in map") + p.Websocket.DataHandler <- errors.New("poloniex.go error - could not find orderbook data in map") continue } err := p.WsProcessOrderbookSnapshot(orderbookData, currencyPair) if err != nil { - log.Error(err) + p.Websocket.DataHandler <- err continue } @@ -247,7 +241,7 @@ func (p *Poloniex) WsHandleData() { currencyPair := CurrencyPairID[chanID] err := p.WsProcessOrderbookUpdate(dataL3, currencyPair) if err != nil { - log.Error(err) + p.Websocket.DataHandler <- err continue } @@ -335,7 +329,7 @@ func (p *Poloniex) WsProcessOrderbookSnapshot(ob []interface{}, symbol string) e newOrderbook.LastUpdated = time.Now() newOrderbook.Pair = pair.NewCurrencyPairFromString(symbol) - return p.Websocket.Orderbook.LoadSnapshot(newOrderbook, p.GetName()) + return p.Websocket.Orderbook.LoadSnapshot(newOrderbook, p.GetName(), false) } // WsProcessOrderbookUpdate processses new orderbook updates diff --git a/exchanges/wex/wex.go b/exchanges/wex/wex.go index 7701ddc9..65f63ba6 100644 --- a/exchanges/wex/wex.go +++ b/exchanges/wex/wex.go @@ -56,7 +56,8 @@ func (w *WEX) SetDefaults() { w.Verbose = false w.RESTPollingDelay = 10 w.Ticker = make(map[string]Ticker) - w.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.NoFiatWithdrawals + w.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.NoFiatWithdrawals w.RequestCurrencyPairFormat.Delimiter = "_" w.RequestCurrencyPairFormat.Uppercase = false w.RequestCurrencyPairFormat.Separator = "-" diff --git a/exchanges/yobit/yobit.go b/exchanges/yobit/yobit.go index c7b6639f..b493dc8c 100644 --- a/exchanges/yobit/yobit.go +++ b/exchanges/yobit/yobit.go @@ -55,7 +55,8 @@ func (y *Yobit) SetDefaults() { y.RESTPollingDelay = 10 y.AuthenticatedAPISupport = true y.Ticker = make(map[string]Ticker) - y.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | exchange.WithdrawFiatViaWebsiteOnly + y.APIWithdrawPermissions = exchange.AutoWithdrawCryptoWithAPIPermission | + exchange.WithdrawFiatViaWebsiteOnly y.RequestCurrencyPairFormat.Delimiter = "_" y.RequestCurrencyPairFormat.Uppercase = false y.RequestCurrencyPairFormat.Separator = "-" diff --git a/exchanges/yobit/yobit_wrapper.go b/exchanges/yobit/yobit_wrapper.go index d9ce768b..ef63a3ce 100644 --- a/exchanges/yobit/yobit_wrapper.go +++ b/exchanges/yobit/yobit_wrapper.go @@ -255,7 +255,7 @@ func (y *Yobit) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.Wi // GetWebsocket returns a pointer to the exchange websocket func (y *Yobit) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return nil, common.ErrFunctionNotSupported } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/exchanges/zb/zb.go b/exchanges/zb/zb.go index ee2f59cf..af865c3c 100644 --- a/exchanges/zb/zb.go +++ b/exchanges/zb/zb.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency/pair" @@ -43,6 +44,7 @@ const ( // 47.91.169.147 api.zb.com // 47.52.55.212 trade.zb.com type ZB struct { + WebsocketConn *websocket.Conn exchange.Base } @@ -53,7 +55,8 @@ func (z *ZB) SetDefaults() { z.Fee = 0 z.Verbose = false z.RESTPollingDelay = 10 - z.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | exchange.NoFiatWithdrawals + z.APIWithdrawPermissions = exchange.AutoWithdrawCrypto | + exchange.NoFiatWithdrawals z.RequestCurrencyPairFormat.Delimiter = "_" z.RequestCurrencyPairFormat.Uppercase = false z.ConfigCurrencyPairFormat.Delimiter = "_" @@ -70,6 +73,9 @@ func (z *ZB) SetDefaults() { z.APIUrlSecondaryDefault = zbMarketURL z.APIUrlSecondary = z.APIUrlSecondaryDefault z.WebsocketInit() + z.Websocket.Functionality = exchange.WebsocketTickerSupported | + exchange.WebsocketOrderbookSupported | + exchange.WebsocketTradeDataSupported } // Setup sets user configuration @@ -109,6 +115,14 @@ func (z *ZB) Setup(exch config.ExchangeConfig) { if err != nil { log.Fatal(err) } + err = z.WebsocketSetup(z.WsConnect, + exch.Name, + exch.Websocket, + zbWebsocketAPI, + exch.WebsocketURL) + if err != nil { + log.Fatal(err) + } } } diff --git a/exchanges/zb/zb_type.go b/exchanges/zb/zb_types.go similarity index 100% rename from exchanges/zb/zb_type.go rename to exchanges/zb/zb_types.go diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go new file mode 100644 index 00000000..323d8d77 --- /dev/null +++ b/exchanges/zb/zb_websocket.go @@ -0,0 +1,311 @@ +package zb + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-/gocryptotrader/common" + "github.com/thrasher-/gocryptotrader/currency/pair" + "github.com/thrasher-/gocryptotrader/exchanges" + "github.com/thrasher-/gocryptotrader/exchanges/orderbook" +) + +const ( + zbWebsocketAPI = "wss://api.zb.cn:9999/websocket" +) + +// WsConnect initiates a websocket connection +func (z *ZB) WsConnect() error { + if !z.Websocket.IsEnabled() || !z.IsEnabled() { + return errors.New(exchange.WebsocketNotEnabled) + } + + var dialer websocket.Dialer + if z.Websocket.GetProxyAddress() != "" { + proxy, err := url.Parse(z.Websocket.GetProxyAddress()) + if err != nil { + return err + } + + dialer.Proxy = http.ProxyURL(proxy) + } + + var err error + z.WebsocketConn, _, err = dialer.Dial(z.Websocket.GetWebsocketURL(), + http.Header{}) + if err != nil { + return err + } + + go z.WsHandleData() + + return z.WsSubscribe() +} + +// WsSubscribe subscribes to the full websocket suite on ZB exchange +func (z *ZB) WsSubscribe() error { + markets := Subscription{ + Event: "addChannel", + Channel: "markets", + } + + reqMarkets, err := common.JSONEncode(markets) + if err != nil { + return err + } + + err = z.WebsocketConn.WriteMessage(websocket.TextMessage, reqMarkets) + if err != nil { + return err + } + + for _, c := range z.GetEnabledCurrencies() { + cPair := c.FirstCurrency.Lower() + c.SecondCurrency.Lower() + + ticker := Subscription{ + Event: "addChannel", + Channel: fmt.Sprintf("%s_ticker", cPair), + } + + reqTicker, err := common.JSONEncode(ticker) + if err != nil { + return err + } + + err = z.WebsocketConn.WriteMessage(websocket.TextMessage, reqTicker) + if err != nil { + return err + } + + depth := Subscription{ + Event: "addChannel", + Channel: fmt.Sprintf("%s_depth", cPair), + } + + reqDepth, err := common.JSONEncode(depth) + if err != nil { + return err + } + + err = z.WebsocketConn.WriteMessage(websocket.TextMessage, reqDepth) + if err != nil { + return err + } + + trades := Subscription{ + Event: "addChannel", + Channel: fmt.Sprintf("%s_trades", cPair), + } + + reqTrades, err := common.JSONEncode(trades) + if err != nil { + return err + } + + err = z.WebsocketConn.WriteMessage(websocket.TextMessage, reqTrades) + if err != nil { + return err + } + } + + return nil +} + +// WsReadData reads from the websocket connection and returns the websocket +// response +func (z *ZB) WsReadData() (exchange.WebsocketResponse, error) { + _, resp, err := z.WebsocketConn.ReadMessage() + if err != nil { + return exchange.WebsocketResponse{}, err + } + + z.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Raw: resp}, nil +} + +// WsHandleData handles all the websocket data coming from the websocket +// connection +func (z *ZB) WsHandleData() { + z.Websocket.Wg.Add(1) + + defer func() { + err := z.WebsocketConn.Close() + if err != nil { + z.Websocket.DataHandler <- fmt.Errorf("zb_websocket.go - Unable to to close Websocket connection. Error: %s", + err) + } + z.Websocket.Wg.Done() + }() + + for { + select { + case <-z.Websocket.ShutdownC: + + default: + resp, err := z.WsReadData() + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + var result Generic + err = common.JSONDecode(resp.Raw, &result) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + switch { + case common.StringContains(result.Channel, "markets"): + if !result.Success { + z.Websocket.DataHandler <- fmt.Errorf("zb_websocket.go error - unsuccessful market response %s", wsErrCodes[result.Code]) + continue + } + + var markets Markets + err := common.JSONDecode(result.Data, &markets) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + case common.StringContains(result.Channel, "ticker"): + cPair := common.SplitStrings(result.Channel, "_") + + var ticker WsTicker + + err := common.JSONDecode(resp.Raw, &ticker) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + z.Websocket.DataHandler <- exchange.TickerData{ + Timestamp: time.Unix(0, ticker.Date), + Pair: pair.NewCurrencyPairFromString(cPair[0]), + AssetType: "SPOT", + Exchange: z.GetName(), + ClosePrice: ticker.Data.Last, + HighPrice: ticker.Data.High, + LowPrice: ticker.Data.Low, + } + + case common.StringContains(result.Channel, "depth"): + var depth WsDepth + err := common.JSONDecode(resp.Raw, &depth) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + var asks []orderbook.Item + for _, askDepth := range depth.Asks { + ask := askDepth.([]interface{}) + asks = append(asks, orderbook.Item{ + Amount: ask[1].(float64), + Price: ask[0].(float64), + }) + } + + var bids []orderbook.Item + for _, bidDepth := range depth.Bids { + bid := bidDepth.([]interface{}) + bids = append(bids, orderbook.Item{ + Amount: bid[1].(float64), + Price: bid[0].(float64), + }) + } + + channelInfo := common.SplitStrings(result.Channel, "_") + cPair := pair.NewCurrencyPairFromString(channelInfo[0]) + + var newOrderbook orderbook.Base + newOrderbook.Asks = asks + newOrderbook.Bids = bids + newOrderbook.AssetType = "SPOT" + newOrderbook.Pair = cPair + newOrderbook.CurrencyPair = channelInfo[0] + newOrderbook.LastUpdated = time.Now() + + err = z.Websocket.Orderbook.LoadSnapshot(newOrderbook, z.GetName(), true) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + z.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Pair: cPair, + Asset: "SPOT", + Exchange: z.GetName(), + } + + case common.StringContains(result.Channel, "trades"): + var trades WsTrades + err := common.JSONDecode(resp.Raw, &trades) + if err != nil { + z.Websocket.DataHandler <- err + continue + } + + // Most up to date trade + t := trades.Data[len(trades.Data)-1] + + channelInfo := common.SplitStrings(result.Channel, "_") + cPair := pair.NewCurrencyPairFromString(channelInfo[0]) + + z.Websocket.DataHandler <- exchange.TradeData{ + Timestamp: time.Unix(0, t.Date), + CurrencyPair: cPair, + AssetType: "SPOT", + Exchange: z.GetName(), + EventTime: t.Date, + Price: t.Price, + Amount: t.Amount, + Side: t.TradeType, + } + + default: + z.Websocket.DataHandler <- errors.New("zb_websocket.go error - unhandled websocket response") + continue + } + } + } +} + +var wsErrCodes = map[int64]string{ + 1000: "Successful call", + 1001: "General error message", + 1002: "internal error", + 1003: "Verification failed", + 1004: "Financial security password lock", + 1005: "The fund security password is incorrect. Please confirm and re-enter.", + 1006: "Real-name certification is awaiting review or review", + 1007: "Channel is empty", + 1008: "Event is empty", + 1009: "This interface is being maintained", + 1011: "Not open yet", + 1012: "Insufficient permissions", + 1013: "Can not trade, if you have any questions, please contact online customer service", + 1014: "Cannot be sold during the pre-sale period", + 2002: "Insufficient balance in Bitcoin account", + 2003: "Insufficient balance of Litecoin account", + 2005: "Insufficient balance in Ethereum account", + 2006: "Insufficient balance in ETC currency account", + 2007: "Insufficient balance of BTS currency account", + 2008: "Insufficient balance in EOS currency account", + 2009: "Insufficient account balance", + 3001: "Pending order not found", + 3002: "Invalid amount", + 3003: "Invalid quantity", + 3004: "User does not exist", + 3005: "Invalid parameter", + 3006: "Invalid IP or inconsistent with the bound IP", + 3007: "Request time has expired", + 3008: "Transaction history not found", + 4001: "API interface is locked", + 4002: "Request too frequently", +} diff --git a/exchanges/zb/zb_websocket_types.go b/exchanges/zb/zb_websocket_types.go new file mode 100644 index 00000000..be406f20 --- /dev/null +++ b/exchanges/zb/zb_websocket_types.go @@ -0,0 +1,57 @@ +package zb + +import "encoding/json" + +// Subscription defines an intial subscription type to be sent +type Subscription struct { + Event string `json:"event"` + Channel string `json:"channel"` +} + +// Generic defines a generic fields associated with many return types +type Generic struct { + Code int64 `json:"code"` + Success bool `json:"success"` + Channel string `json:"channel"` + Message string `json:"message"` + No int64 `json:"no"` + Data json.RawMessage `json:"data"` +} + +// Markets defines market data +type Markets map[string]struct { + AmountScale int64 `json:"amountScale"` + PriceScale int64 `json:"priceScale"` +} + +// WsTicker defines websocket ticker data +type WsTicker struct { + Date int64 `json:"date,string"` + Data struct { + Volume24Hr float64 `json:"vol,string"` + High float64 `json:"high,string"` + Low float64 `json:"low,string"` + Last float64 `json:"last,string"` + Buy float64 `json:"buy,string"` + Sell float64 `json:"sell,string"` + } `json:"ticker"` +} + +// WsDepth defines websocket orderbook data +type WsDepth struct { + Timestamp int64 `json:"timestamp"` + Asks []interface{} `json:"asks"` + Bids []interface{} `json:"bids"` +} + +// WsTrades defines websocket trade data +type WsTrades struct { + Data []struct { + Amount float64 `json:"amount,string"` + Price float64 `json:"price,string"` + TID int64 `json:"tid"` + Date int64 `json:"date"` + Type string `json:"type"` + TradeType string `json:"trade_type"` + } `json:"data"` +} diff --git a/exchanges/zb/zb_wrapper.go b/exchanges/zb/zb_wrapper.go index 669e2504..ce52c23b 100644 --- a/exchanges/zb/zb_wrapper.go +++ b/exchanges/zb/zb_wrapper.go @@ -275,7 +275,7 @@ func (z *ZB) WithdrawFiatFundsToInternationalBank(withdrawRequest exchange.Withd // GetWebsocket returns a pointer to the exchange websocket func (z *ZB) GetWebsocket() (*exchange.Websocket, error) { - return nil, common.ErrNotYetImplemented + return z.Websocket, nil } // GetFeeByType returns an estimate of fee based on type of transaction diff --git a/logger/logger.go b/logger/logger.go index 42d753ca..49841f20 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -40,11 +40,11 @@ func setDefaultOutputs() { log.Ldate|log.Ltime) infoLogger = log.New(os.Stdout, - "[INFO]: ", + "[INFO]: ", log.Ldate|log.Ltime) warnLogger = log.New(os.Stdout, - "[WARN]: ", + "[WARN]: ", log.Ldate|log.Ltime) errorLogger = log.New(os.Stdout, diff --git a/routines.go b/routines.go index bd97a55b..8e676b3d 100644 --- a/routines.go +++ b/routines.go @@ -293,6 +293,8 @@ func WebsocketRoutine(verbose bool) { ws, err := bot.exchanges[i].GetWebsocket() if err != nil { + log.Debugf("Websocket not enabled for %s", + bot.exchanges[i].GetName()) return } diff --git a/tools/documentation/root_templates/root_readme.tmpl b/tools/documentation/root_templates/root_readme.tmpl index 09a5ac2d..6a6a316e 100644 --- a/tools/documentation/root_templates/root_readme.tmpl +++ b/tools/documentation/root_templates/root_readme.tmpl @@ -25,19 +25,19 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader | Bitfinex | Yes | Yes | NA | | Bitflyer | Yes | No | NA | | Bithumb | Yes | NA | NA | -| BitMEX | Yes | No | NA | +| BitMEX | Yes | Yes | NA | | Bitstamp | Yes | Yes | No | | Bittrex | Yes | No | NA | | BTCC | Yes | Yes | No | | BTCMarkets | Yes | No | NA | -| COINUT | Yes | No | NA | +| COINUT | Yes | Yes | NA | | Exmo | Yes | NA | NA | | CoinbasePro | Yes | Yes | No| -| GateIO | Yes | No | NA | -| Gemini | Yes | No | No | +| GateIO | Yes | Yes | NA | +| Gemini | Yes | Yes | No | | HitBTC | Yes | Yes | No | -| Huobi.Pro | Yes | No | NA | -| Huobi.Hadax | Yes | No | NA | +| Huobi.Pro | Yes | Yes | NA | +| Huobi.Hadax | Yes | Yes | NA | | ItBit | Yes | NA | No | | Kraken | Yes | NA | NA | | LakeBTC | Yes | No | NA | @@ -45,11 +45,11 @@ Join our slack to discuss all things related to GoCryptoTrader! [GoCryptoTrader | LocalBitcoins | Yes | NA | NA | | OKCoin China | Yes | Yes | No | | OKCoin International | Yes | Yes | No | -| OKEX | Yes | No | No | +| OKEX | Yes | Yes | No | | Poloniex | Yes | Yes | NA | | WEX | Yes | NA | NA | | Yobit | Yes | NA | NA | -| ZB.COM | Yes | No | NA | +| ZB.COM | Yes | Yes | NA | We are aiming to support the top 20 highest volume exchanges based off the [CoinMarketCap exchange data](https://coinmarketcap.com/exchanges/volume/24-hour/).