Files
gocryptotrader/exchanges/exchange_websocket.go
Scott 107cf76373 Kraken websocket support (#264)
* Initial commit. Adds ticker, candle and trade, subscription support

* Adds support for spread and orderbooks

* Adds new currency pair delimiter ("/"), Adds dedicated websocket Connected channel handler, Updates Kraken websocket capability definition, Refines websocket tests to connect and disconnect without freezing, separates WebsocketUnsubscribeEventRequest ChannelID into its own struct WebsocketUnsubscribeByChannelIDEventRequest to prevent bad json WS requests, Adds asset type to orderbook, Kraken WS handles connection better

* Removes duplicate type, reverts config value

* Addresses error returns and changes writeToWebsocket to use byte array. Removes deferred funcs in tests. Increases test listening limit for rare cases

* Fixes verbose log. Rearranges WS Connect async ordering. Fixes DATA RACE. Fixes random okex tests. Ensures Kraken WS tests only connect once
2019-04-04 10:21:44 +11:00

684 lines
17 KiB
Go

package exchange
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/thrasher-/gocryptotrader/config"
"github.com/thrasher-/gocryptotrader/currency"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
)
// Websocket functionality list and state consts
const (
NoWebsocketSupport uint32 = 0
WebsocketTickerSupported uint32 = 1 << (iota - 1)
WebsocketOrderbookSupported
WebsocketKlineSupported
WebsocketTradeDataSupported
WebsocketAccountSupported
WebsocketAllowsRequests
WebsocketTickerSupportedText = "TICKER STREAMING SUPPORTED"
WebsocketOrderbookSupportedText = "ORDERBOOK STREAMING SUPPORTED"
WebsocketKlineSupportedText = "KLINE STREAMING SUPPORTED"
WebsocketTradeDataSupportedText = "TRADE STREAMING SUPPORTED"
WebsocketAccountSupportedText = "ACCOUNT STREAMING SUPPORTED"
WebsocketAllowsRequestsText = "WEBSOCKET REQUESTS SUPPORTED"
NoWebsocketSupportText = "WEBSOCKET NOT SUPPORTED"
UnknownWebsocketFunctionality = "UNKNOWN FUNCTIONALITY BITMASK"
// WebsocketNotEnabled alerts of a disabled websocket
WebsocketNotEnabled = "exchange_websocket_not_enabled"
// WebsocketTrafficLimitTime defines a standard time for no traffic from the
// websocket connection
WebsocketTrafficLimitTime = 5 * time.Second
// WebsocketStateTimeout defines a const for when a websocket connection
// times out, will be handled by the routine management system
WebsocketStateTimeout = "TIMEOUT"
websocketRestablishConnection = 1 * time.Second
)
// WebsocketInit initialises the websocket struct
func (e *Base) WebsocketInit() {
e.Websocket = &Websocket{
defaultURL: "",
enabled: false,
proxyAddr: "",
runningURL: "",
init: true,
}
}
// WebsocketSetup sets main variables for websocket connection
func (e *Base) WebsocketSetup(connector func() error,
exchangeName string,
wsEnabled bool,
defaultURL,
runningURL string) error {
e.Websocket.DataHandler = make(chan interface{}, 1)
e.Websocket.Connected = make(chan struct{}, 1)
e.Websocket.Disconnected = make(chan struct{}, 1)
e.Websocket.TrafficAlert = make(chan struct{}, 1)
err := e.Websocket.SetWsStatusAndConnection(wsEnabled)
if err != nil {
return err
}
e.Websocket.SetDefaultURL(defaultURL)
e.Websocket.SetConnector(connector)
e.Websocket.SetWebsocketURL(runningURL)
e.Websocket.SetExchangeName(exchangeName)
e.Websocket.init = false
return nil
}
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing in routines.go
type Websocket struct {
proxyAddr string
defaultURL string
runningURL string
exchangeName string
enabled bool
init bool
connected bool
connector func() error
m sync.Mutex
// Connected denotes a channel switch for diversion of request flow
Connected chan struct{}
// Disconnected denotes a channel switch for diversion of request flow
Disconnected chan struct{}
// DataHandler pipes websocket data to an exchange websocket data handler
DataHandler chan interface{}
// ShutdownC is the main shutdown channel used within an exchange package
// called by its own defined Shutdown function
ShutdownC chan struct{}
// Orderbook is a local cache of orderbooks
Orderbook WebsocketOrderbookLocal
// Wg defines a wait group for websocket routines for cleanly shutting down
// routines
Wg sync.WaitGroup
// TrafficAlert monitors if there is a halt in traffic throughput
TrafficAlert chan struct{}
// Functionality defines websocket stream capabilities
Functionality uint32
}
// trafficMonitor monitors traffic and switches connection modes for websocket
func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) {
w.Wg.Add(1)
wg.Done() // Makes sure we are unlocking after we add to waitgroup
defer func() {
if w.connected {
w.Disconnected <- struct{}{}
}
w.Wg.Done()
}()
// Define an initial traffic timer which will be a delay then fall over to
// WebsocketTrafficLimitTime after first response
trafficTimer := time.NewTimer(5 * time.Second)
for {
select {
case <-w.ShutdownC: // Returns on shutdown channel close
return
case <-w.TrafficAlert: // Resets timer on traffic
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
}
trafficTimer.Reset(WebsocketTrafficLimitTime)
case <-trafficTimer.C: // Falls through when timer runs out
newtimer := time.NewTimer(10 * time.Second) // New secondary timer set
if w.connected {
// If connected divert traffic to rest
w.Disconnected <- struct{}{}
w.connected = false
}
select {
case <-w.ShutdownC: // Returns on shutdown channel close
return
case <-newtimer.C: // If secondary timer runs state timeout is sent to the data handler
w.DataHandler <- WebsocketStateTimeout
return
case <-w.TrafficAlert: // If in this time response traffic comes through
trafficTimer.Reset(WebsocketTrafficLimitTime)
if !w.connected {
// If not connected divert traffic from REST to websocket
w.Connected <- struct{}{}
w.connected = true
}
}
}
}
}
// Connect intiates a websocket connection by using a package defined connection
// function
func (w *Websocket) Connect() error {
w.m.Lock()
defer w.m.Unlock()
if !w.IsEnabled() {
return errors.New(WebsocketNotEnabled)
}
if w.connected {
return errors.New("exchange_websocket.go error - already connected, cannot connect again")
}
w.ShutdownC = make(chan struct{}, 1)
err := w.connector()
if err != nil {
return fmt.Errorf("exchange_websocket.go connection error %s",
err)
}
if !w.connected {
w.Connected <- struct{}{}
w.connected = true
}
var anotherWG sync.WaitGroup
anotherWG.Add(1)
go w.trafficMonitor(&anotherWG)
anotherWG.Wait()
return nil
}
// IsConnected exposes websocket connection status
func (w *Websocket) IsConnected() bool {
return w.connected
}
// Shutdown attempts to shut down a websocket connection and associated routines
// by using a package defined shutdown function
func (w *Websocket) Shutdown() error {
w.m.Lock()
defer func() {
w.Orderbook.FlushCache()
w.m.Unlock()
}()
if !w.connected {
return errors.New("exchange_websocket.go error - System not connected to shut down")
}
timer := time.NewTimer(5 * time.Second)
c := make(chan struct{}, 1)
go func(c chan struct{}) {
close(w.ShutdownC)
w.Wg.Wait()
c <- struct{}{}
}(c)
select {
case <-c:
w.connected = false
return nil
case <-timer.C:
return fmt.Errorf("%s - Websocket routines failed to shutdown",
w.GetName())
}
}
// SetWebsocketURL sets websocket URL
func (w *Websocket) SetWebsocketURL(websocketURL string) {
if websocketURL == "" || websocketURL == config.WebsocketURLNonDefaultMessage {
w.runningURL = w.defaultURL
return
}
w.runningURL = websocketURL
}
// GetWebsocketURL returns the running websocket URL
func (w *Websocket) GetWebsocketURL() string {
return w.runningURL
}
// SetWsStatusAndConnection sets if websocket is enabled
// it will also connect/disconnect the websocket connection
func (w *Websocket) SetWsStatusAndConnection(enabled bool) error {
if w.enabled == enabled {
if w.init {
return nil
}
return fmt.Errorf("exchange_websocket.go error - already set as %t",
enabled)
}
w.enabled = enabled
if !w.init {
if enabled {
if w.connected {
return nil
}
return w.Connect()
}
if !w.connected {
return nil
}
return w.Shutdown()
}
return nil
}
// IsEnabled returns bool
func (w *Websocket) IsEnabled() bool {
return w.enabled
}
// SetProxyAddress sets websocket proxy address
func (w *Websocket) SetProxyAddress(proxyAddr string) error {
if w.proxyAddr == proxyAddr {
return errors.New("exchange_websocket.go error - Setting proxy address - same address")
}
w.proxyAddr = proxyAddr
if !w.init && w.enabled {
if w.connected {
err := w.Shutdown()
if err != nil {
return err
}
return w.Connect()
}
return w.Connect()
}
return nil
}
// GetProxyAddress returns the current websocket proxy
func (w *Websocket) GetProxyAddress() string {
return w.proxyAddr
}
// SetDefaultURL sets default websocket URL
func (w *Websocket) SetDefaultURL(defaultURL string) {
w.defaultURL = defaultURL
}
// GetDefaultURL returns the default websocket URL
func (w *Websocket) GetDefaultURL() string {
return w.defaultURL
}
// SetConnector sets connection function
func (w *Websocket) SetConnector(connector func() error) {
w.connector = connector
}
// SetExchangeName sets exchange name
func (w *Websocket) SetExchangeName(exchName string) {
w.exchangeName = exchName
}
// GetName returns exchange name
func (w *Websocket) GetName() string {
return w.exchangeName
}
// WebsocketOrderbookLocal defines a local cache of orderbooks for amending,
// appending and deleting changes and updates the main store in orderbook.go
type WebsocketOrderbookLocal struct {
ob []*orderbook.Base
lastUpdated time.Time
m sync.Mutex
}
// Update updates a local cache using bid targets and ask targets then updates
// main cache in orderbook.go
// Volume == 0; deletion at price target
// Price target not found; append of price target
// Price target found; amend volume of price target
func (w *WebsocketOrderbookLocal) Update(bidTargets, askTargets []orderbook.Item,
p currency.Pair,
updated time.Time,
exchName, assetType string) error {
if bidTargets == nil && askTargets == nil {
return errors.New("exchange.go websocket orderbook cache Update() error - cannot have bids and ask targets both nil")
}
if w.lastUpdated.After(updated) {
return errors.New("exchange.go WebsocketOrderbookLocal Update() - update is before last update time")
}
w.m.Lock()
defer w.m.Unlock()
var orderbookAddress *orderbook.Base
for i := range w.ob {
if w.ob[i].Pair == p && w.ob[i].AssetType == assetType {
orderbookAddress = w.ob[i]
}
}
if orderbookAddress == nil {
return fmt.Errorf("exchange.go WebsocketOrderbookLocal Update() - orderbook.Base could not be found for Exchange %s CurrencyPair: %s AssetType: %s",
exchName,
p.String(),
assetType)
}
if len(orderbookAddress.Asks) == 0 || len(orderbookAddress.Bids) == 0 {
return errors.New("exchange.go websocket orderbook cache Update() error - snapshot incorrectly loaded")
}
if orderbookAddress.Pair == (currency.Pair{}) {
return fmt.Errorf("exchange.go websocket orderbook cache Update() error - snapshot not found %v",
p)
}
for x := range bidTargets {
// bid targets
func() {
for y := range orderbookAddress.Bids {
if orderbookAddress.Bids[y].Price == bidTargets[x].Price {
if bidTargets[x].Amount == 0 {
// Delete
orderbookAddress.Bids = append(orderbookAddress.Bids[:y],
orderbookAddress.Bids[y+1:]...)
return
}
// Amend
orderbookAddress.Bids[y].Amount = bidTargets[x].Amount
return
}
}
if bidTargets[x].Amount == 0 {
// Makes sure we dont append things we missed
return
}
// Append
orderbookAddress.Bids = append(orderbookAddress.Bids, orderbook.Item{
Price: bidTargets[x].Price,
Amount: bidTargets[x].Amount,
})
}()
// bid targets
}
for x := range askTargets {
func() {
for y := range orderbookAddress.Asks {
if orderbookAddress.Asks[y].Price == askTargets[x].Price {
if askTargets[x].Amount == 0 {
// Delete
orderbookAddress.Asks = append(orderbookAddress.Asks[:y],
orderbookAddress.Asks[y+1:]...)
return
}
// Amend
orderbookAddress.Asks[y].Amount = askTargets[x].Amount
return
}
}
if askTargets[x].Amount == 0 {
// Makes sure we dont append things we missed
return
}
// Append
orderbookAddress.Asks = append(orderbookAddress.Asks, orderbook.Item{
Price: askTargets[x].Price,
Amount: askTargets[x].Amount,
})
}()
}
return orderbookAddress.Process()
}
// LoadSnapshot loads initial snapshot of orderbook data, overite allows full
// orderbook to be completely rewritten because the exchange is a doing a full
// update not an incremental one
func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, exchName string, overwrite bool) error {
if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 {
return errors.New("exchange.go websocket orderbook cache LoadSnapshot() error - snapshot ask and bids are nil")
}
w.m.Lock()
defer w.m.Unlock()
for i := range w.ob {
if w.ob[i].Pair.Equal(newOrderbook.Pair) && w.ob[i].AssetType == newOrderbook.AssetType {
if overwrite {
w.ob[i] = newOrderbook
return newOrderbook.Process()
}
return errors.New("exchange.go websocket orderbook cache LoadSnapshot() error - Snapshot instance already found")
}
}
w.ob = append(w.ob, newOrderbook)
return newOrderbook.Process()
}
// UpdateUsingID updates orderbooks using specified ID
func (w *WebsocketOrderbookLocal) UpdateUsingID(bidTargets, askTargets []orderbook.Item,
p currency.Pair,
exchName, assetType, action string) error {
w.m.Lock()
defer w.m.Unlock()
var orderbookAddress *orderbook.Base
for i := range w.ob {
if w.ob[i].Pair == p && w.ob[i].AssetType == assetType {
orderbookAddress = w.ob[i]
}
}
if orderbookAddress == nil {
return fmt.Errorf("exchange.go WebsocketOrderbookLocal Update() - orderbook.Base could not be found for Exchange %s CurrencyPair: %s AssetType: %s",
exchName,
assetType,
p.String())
}
switch action {
case "update":
for _, target := range bidTargets {
for i := range orderbookAddress.Bids {
if orderbookAddress.Bids[i].ID == target.ID {
orderbookAddress.Bids[i].Amount = target.Amount
break
}
}
}
for _, target := range askTargets {
for i := range orderbookAddress.Asks {
if orderbookAddress.Asks[i].ID == target.ID {
orderbookAddress.Asks[i].Amount = target.Amount
break
}
}
}
case "delete":
for _, target := range bidTargets {
for i := range orderbookAddress.Bids {
if orderbookAddress.Bids[i].ID == target.ID {
orderbookAddress.Bids = append(orderbookAddress.Bids[:i],
orderbookAddress.Bids[i+1:]...)
break
}
}
}
for _, target := range askTargets {
for i := range orderbookAddress.Asks {
if orderbookAddress.Asks[i].ID == target.ID {
orderbookAddress.Asks = append(orderbookAddress.Asks[:i],
orderbookAddress.Asks[i+1:]...)
break
}
}
}
case "insert":
orderbookAddress.Bids = append(orderbookAddress.Bids, bidTargets...)
orderbookAddress.Asks = append(orderbookAddress.Asks, askTargets...)
}
return orderbookAddress.Process()
}
// FlushCache flushes w.ob data to be garbage collected and refreshed when a
// connection is lost and reconnected
func (w *WebsocketOrderbookLocal) FlushCache() {
w.m.Lock()
w.ob = nil
w.m.Unlock()
}
// WebsocketResponse defines generalised data from the websocket connection
type WebsocketResponse struct {
Type int
Raw []byte
}
// WebsocketOrderbookUpdate defines a websocket event in which the orderbook
// has been updated in the orderbook package
type WebsocketOrderbookUpdate struct {
Pair currency.Pair
Asset string
Exchange string
}
// TradeData defines trade data
type TradeData struct {
Timestamp time.Time
CurrencyPair currency.Pair
AssetType string
Exchange string
EventType string
EventTime int64
Price float64
Amount float64
Side string
}
// TickerData defines ticker feed
type TickerData struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
ClosePrice float64
Quantity float64
OpenPrice float64
HighPrice float64
LowPrice float64
}
// KlineData defines kline feed
type KlineData struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
StartTime time.Time
CloseTime time.Time
Interval string
OpenPrice float64
ClosePrice float64
HighPrice float64
LowPrice float64
Volume float64
}
// WebsocketPositionUpdated reflects a change in orders/contracts on an exchange
type WebsocketPositionUpdated struct {
Timestamp time.Time
Pair currency.Pair
AssetType string
Exchange string
}
// GetFunctionality returns a functionality bitmask for the websocket
// connection
func (w *Websocket) GetFunctionality() uint32 {
return w.Functionality
}
// SupportsFunctionality returns if the functionality is supported as a boolean
func (w *Websocket) SupportsFunctionality(f uint32) bool {
return w.GetFunctionality()&f == f
}
// FormatFunctionality will return each of the websocket connection compatible
// stream methods as a string
func (w *Websocket) FormatFunctionality() string {
functionality := []string{}
for i := 0; i < 32; i++ {
var check uint32 = 1 << uint32(i)
if w.GetFunctionality()&check != 0 {
switch check {
case WebsocketTickerSupported:
functionality = append(functionality, WebsocketTickerSupportedText)
case WebsocketOrderbookSupported:
functionality = append(functionality, WebsocketOrderbookSupportedText)
case WebsocketKlineSupported:
functionality = append(functionality, WebsocketKlineSupportedText)
case WebsocketTradeDataSupported:
functionality = append(functionality, WebsocketTradeDataSupportedText)
case WebsocketAccountSupported:
functionality = append(functionality, WebsocketAccountSupportedText)
case WebsocketAllowsRequests:
functionality = append(functionality, WebsocketAllowsRequestsText)
default:
functionality = append(functionality,
fmt.Sprintf("%s[1<<%v]", UnknownWebsocketFunctionality, i))
}
}
}
if len(functionality) > 0 {
return strings.Join(functionality, " & ")
}
return NoWebsocketSupportText
}