Engine: Protocol Features, coverage, types, BTC markets websocket (#368)

* Attempts to update orderbook so it doesn't need to sort

* Reverts the ws ob stuff. Gets rid of sorting because it happens later. Adds some exchange features

* update existing feature lists. Expands list definition to match my emotions

* Adds bithumb bitmex and bitstamp. adds a couple more types

* Features for you, features for me, features for bittrex, btcmarkets, btse, coinbasepro, coinut, exmo, gateio and gemini

* Features for hitbtc, huobi, itbit, kraken, lakebtc, lbank, localbitcoins, okcoin, okex, poloniex, yobit, zb

* Who can forget good old alphapoint?

* Adds btcmarksets websocket :glitch_crab: fixes alphapoint features

* Adds extra data not in the documentation :/

* Replaces websocket features by using protocol features. However, it breaks it due to import cycles. I'm not sure what I'll do just yet

* Removes import cycle via duplicate structs.

* Increases coverage of config with `TestCheckCurrencyConfigValues`. Moves all currency pair package types into their own files or places it at the bottom of files if necessary

* Increase coverage in code.go

* One way of determining a test has failed, is when to it fails. Removed redundant explanation

* Increases code coverage of conversion

* Lint fixes

* Fixes orderbook tests

* Re-adds sorting because its important to still have the internal pre-processed orderbook to be representative of a real orderbook

* Secret lints that did not show up via Windows linting

* Adds protocol package to contain exchange features

* Fixes protocol implementation

* Fixes ws tests

* Addresses the following: Removes st-st-stutters in config types, changes GetAvailableForexProviders -> GetSupportedForexProviders, removes errors from tests where error is nil, removes orderbook setup when not necessary, removes import newlines, removes false bools from declaration, changes should of to should have

* imports and casing

* Fixes two more nil error checks
This commit is contained in:
Scott
2019-10-22 10:56:20 +11:00
committed by Adrian Gallagher
parent ec0ed1c1e5
commit ccfcdf26aa
156 changed files with 5228 additions and 4337 deletions

View File

@@ -45,6 +45,7 @@ func (w *Websocket) Setup(setupData *WebsocketSetup) error {
w.SetExchangeName(setupData.ExchangeName)
w.SetCanUseAuthenticatedEndpoints(setupData.AuthenticatedWebsocketAPISupport)
w.trafficTimeout = setupData.WebsocketTimeout
w.features = setupData.Features
err := w.Initialise()
if err != nil {
return err
@@ -91,7 +92,7 @@ func (w *Websocket) Connect() error {
if !w.IsConnectionMonitorRunning() {
go w.connectionMonitor()
}
if w.SupportsFunctionality(WebsocketSubscribeSupported) || w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
if w.features.Subscribe || w.features.Unsubscribe {
w.Wg.Add(1)
go w.manageSubscriptions()
}
@@ -402,87 +403,6 @@ func (w *Websocket) GetName() string {
return w.exchangeName
}
// GetFunctionality returns a functionality bitmask for the websocket
// connection
func (w *Websocket) GetFunctionality() uint32 {
return w.Functionality
}
// SupportsFunctionality returns if the functionality is supported as a boolean
func (w *Websocket) SupportsFunctionality(f uint32) bool {
return w.GetFunctionality()&f == f
}
// FormatFunctionality will return each of the websocket connection compatible
// stream methods as a string
func (w *Websocket) FormatFunctionality() string {
var functionality []string
for i := 0; i < 32; i++ {
var check uint32 = 1 << uint32(i)
if w.GetFunctionality()&check != 0 {
switch check {
case WebsocketTickerSupported:
functionality = append(functionality, WebsocketTickerSupportedText)
case WebsocketOrderbookSupported:
functionality = append(functionality, WebsocketOrderbookSupportedText)
case WebsocketKlineSupported:
functionality = append(functionality, WebsocketKlineSupportedText)
case WebsocketTradeDataSupported:
functionality = append(functionality, WebsocketTradeDataSupportedText)
case WebsocketAccountSupported:
functionality = append(functionality, WebsocketAccountSupportedText)
case WebsocketAllowsRequests:
functionality = append(functionality, WebsocketAllowsRequestsText)
case WebsocketSubscribeSupported:
functionality = append(functionality, WebsocketSubscribeSupportedText)
case WebsocketUnsubscribeSupported:
functionality = append(functionality, WebsocketUnsubscribeSupportedText)
case WebsocketAuthenticatedEndpointsSupported:
functionality = append(functionality, WebsocketAuthenticatedEndpointsSupportedText)
case WebsocketAccountDataSupported:
functionality = append(functionality, WebsocketAccountDataSupportedText)
case WebsocketSubmitOrderSupported:
functionality = append(functionality, WebsocketSubmitOrderSupportedText)
case WebsocketCancelOrderSupported:
functionality = append(functionality, WebsocketCancelOrderSupportedText)
case WebsocketWithdrawSupported:
functionality = append(functionality, WebsocketWithdrawSupportedText)
case WebsocketMessageCorrelationSupported:
functionality = append(functionality, WebsocketMessageCorrelationSupportedText)
case WebsocketSequenceNumberSupported:
functionality = append(functionality, WebsocketSequenceNumberSupportedText)
case WebsocketDeadMansSwitchSupported:
functionality = append(functionality, WebsocketDeadMansSwitchSupportedText)
default:
functionality = append(functionality,
fmt.Sprintf("%s[1<<%v]", UnknownWebsocketFunctionality, i))
}
}
}
if len(functionality) > 0 {
return strings.Join(functionality, " & ")
}
return NoWebsocketSupportText
}
// SetChannelSubscriber sets the function to use the base subscribe func
func (w *Websocket) SetChannelSubscriber(subscriber func(channelToSubscribe WebsocketChannelSubscription) error) {
w.channelSubscriber = subscriber
@@ -495,7 +415,7 @@ func (w *Websocket) SetChannelUnsubscriber(unsubscriber func(channelToUnsubscrib
// ManageSubscriptions ensures the subscriptions specified continue to be subscribed to
func (w *Websocket) manageSubscriptions() {
if !w.SupportsFunctionality(WebsocketSubscribeSupported) && !w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
if !w.features.Subscribe && !w.features.Unsubscribe {
w.DataHandler <- fmt.Errorf("%v does not support channel subscriptions, exiting ManageSubscriptions()", w.exchangeName)
return
}
@@ -528,13 +448,13 @@ func (w *Websocket) manageSubscriptions() {
log.Debugf(log.WebsocketMgr, "%v checking subscriptions", w.exchangeName)
}
// Subscribe to channels Pending a subscription
if w.SupportsFunctionality(WebsocketSubscribeSupported) {
if w.features.Subscribe {
err := w.appendSubscribedChannels()
if err != nil {
w.DataHandler <- err
}
}
if w.SupportsFunctionality(WebsocketUnsubscribeSupported) {
if w.features.Unsubscribe {
err := w.unsubscribeToChannels()
if err != nil {
w.DataHandler <- err

View File

@@ -16,6 +16,7 @@ import (
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
)
func TestTrafficMonitorTimeout(t *testing.T) {
@@ -85,6 +86,7 @@ func TestConnectionMessageErrors(t *testing.T) {
ws.DataHandler = make(chan interface{})
ws.ShutdownC = make(chan struct{})
ws.connector = func() error { return nil }
ws.features = &protocol.Features{}
go ws.connectionMonitor()
timer := time.NewTimer(900 * time.Millisecond)
ws.ReadMessageErrors <- errors.New("errorText")
@@ -126,7 +128,7 @@ func TestWebsocket(t *testing.T) {
ws = *New()
err = ws.SetProxyAddress("testProxy")
if err != nil {
t.Error("test failed - SetProxyAddress", err)
t.Error("SetProxyAddress", err)
}
err = ws.Setup(
@@ -140,102 +142,60 @@ func TestWebsocket(t *testing.T) {
Connector: func() error { return nil },
Subscriber: func(test WebsocketChannelSubscription) error { return nil },
UnSubscriber: func(test WebsocketChannelSubscription) error { return nil },
Features: &protocol.Features{},
})
if err != nil {
t.Error(err)
}
if ws.GetName() != "exchangeName" {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
if !ws.IsEnabled() {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
if ws.GetProxyAddress() != "testProxy" {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
if ws.GetDefaultURL() != "testDefaultURL" {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
if ws.GetWebsocketURL() != "testRunningURL" {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
if ws.trafficTimeout != time.Duration(2) {
t.Error("test failed - WebsocketSetup")
t.Error("WebsocketSetup")
}
// -- Not connected shutdown
err = ws.Shutdown()
if err == nil {
t.Fatal("test failed - should not be connected to able to shut down")
t.Fatal("should not be connected to able to shut down")
}
ws.Wg.Wait()
// -- Normal connect
err = ws.Connect()
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
t.Fatal("WebsocketSetup", err)
}
ws.SetWebsocketURL("ws://demos.kaazing.com/echo")
// -- Already connected connect
err = ws.Connect()
if err == nil {
t.Fatal("test failed - should not connect, already connected")
t.Fatal("should not connect, already connected")
}
// -- Normal shutdown
err = ws.Shutdown()
if err != nil {
t.Fatal("test failed - WebsocketSetup", err)
t.Fatal("WebsocketSetup", err)
}
ws.Wg.Wait()
}
func TestFunctionality(t *testing.T) {
ws := New()
if ws.FormatFunctionality() != NoWebsocketSupportText {
t.Fatalf("Test Failed - FormatFunctionality error expected %s but received %s",
NoWebsocketSupportText, ws.FormatFunctionality())
}
ws.Functionality = 1 << 31
if ws.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" {
t.Fatal("Test Failed - GetFunctionality error incorrect error returned")
}
ws.Functionality = WebsocketOrderbookSupported
if ws.GetFunctionality() != WebsocketOrderbookSupported {
t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned")
}
if !ws.SupportsFunctionality(WebsocketOrderbookSupported) {
t.Fatal("Test Failed - SupportsFunctionality error should be true")
}
ws.Functionality = WebsocketTickerSupported | WebsocketOrderbookSupported | WebsocketKlineSupported |
WebsocketTradeDataSupported | WebsocketAccountSupported | WebsocketAllowsRequests |
WebsocketSubscribeSupported | WebsocketUnsubscribeSupported | WebsocketAuthenticatedEndpointsSupported |
WebsocketAccountDataSupported | WebsocketSubmitOrderSupported | WebsocketCancelOrderSupported |
WebsocketWithdrawSupported | WebsocketMessageCorrelationSupported | WebsocketSequenceNumberSupported |
WebsocketDeadMansSwitchSupported
formatted := ws.FormatFunctionality()
if !strings.Contains(formatted, WebsocketTickerSupportedText) || !strings.Contains(formatted, WebsocketOrderbookSupportedText) ||
!strings.Contains(formatted, WebsocketKlineSupportedText) || !strings.Contains(formatted, WebsocketTradeDataSupportedText) ||
!strings.Contains(formatted, WebsocketAccountSupportedText) || !strings.Contains(formatted, WebsocketAllowsRequestsText) ||
!strings.Contains(formatted, WebsocketSubscribeSupportedText) || !strings.Contains(formatted, WebsocketUnsubscribeSupportedText) ||
!strings.Contains(formatted, WebsocketAuthenticatedEndpointsSupportedText) || !strings.Contains(formatted, WebsocketAccountDataSupportedText) ||
!strings.Contains(formatted, WebsocketSubmitOrderSupportedText) || !strings.Contains(formatted, WebsocketCancelOrderSupportedText) ||
!strings.Contains(formatted, WebsocketWithdrawSupportedText) || !strings.Contains(formatted, WebsocketMessageCorrelationSupportedText) ||
!strings.Contains(formatted, WebsocketSequenceNumberSupportedText) || !strings.Contains(formatted, WebsocketDeadMansSwitchSupportedText) {
t.Error("Failed to format and include supported websocket features")
}
}
// placeholderSubscriber basic function to test subscriptions
func placeholderSubscriber(channelToSubscribe WebsocketChannelSubscription) error {
return nil
@@ -349,8 +309,8 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) {
// TestManageSubscriptionsStartStop logic test
func TestManageSubscriptionsStartStop(t *testing.T) {
w := Websocket{
ShutdownC: make(chan struct{}),
Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported,
ShutdownC: make(chan struct{}),
features: &protocol.Features{Subscribe: true, Unsubscribe: true},
}
w.Wg.Add(1)
go w.manageSubscriptions()
@@ -361,8 +321,8 @@ func TestManageSubscriptionsStartStop(t *testing.T) {
// TestManageSubscriptions logic test
func TestManageSubscriptions(t *testing.T) {
w := Websocket{
ShutdownC: make(chan struct{}),
Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported,
ShutdownC: make(chan struct{}),
features: &protocol.Features{Subscribe: true, Unsubscribe: true},
subscribedChannels: []WebsocketChannelSubscription{
{
Channel: "hello",

View File

@@ -7,47 +7,12 @@ import (
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
"github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook"
)
// Websocket functionality list and state consts
const (
NoWebsocketSupport uint32 = 0
WebsocketTickerSupported uint32 = 1 << (iota - 1)
WebsocketOrderbookSupported
WebsocketKlineSupported
WebsocketTradeDataSupported
WebsocketAccountSupported
WebsocketAllowsRequests
WebsocketSubscribeSupported
WebsocketUnsubscribeSupported
WebsocketAuthenticatedEndpointsSupported
WebsocketAccountDataSupported
WebsocketSubmitOrderSupported
WebsocketCancelOrderSupported
WebsocketWithdrawSupported
WebsocketMessageCorrelationSupported
WebsocketSequenceNumberSupported
WebsocketDeadMansSwitchSupported
WebsocketTickerSupportedText = "TICKER STREAMING SUPPORTED"
WebsocketOrderbookSupportedText = "ORDERBOOK STREAMING SUPPORTED"
WebsocketKlineSupportedText = "KLINE STREAMING SUPPORTED"
WebsocketTradeDataSupportedText = "TRADE STREAMING SUPPORTED"
WebsocketAccountSupportedText = "ACCOUNT STREAMING SUPPORTED"
WebsocketAllowsRequestsText = "WEBSOCKET REQUESTS SUPPORTED"
NoWebsocketSupportText = "WEBSOCKET NOT SUPPORTED"
UnknownWebsocketFunctionality = "UNKNOWN FUNCTIONALITY BITMASK"
WebsocketSubscribeSupportedText = "WEBSOCKET SUBSCRIBE SUPPORTED"
WebsocketUnsubscribeSupportedText = "WEBSOCKET UNSUBSCRIBE SUPPORTED"
WebsocketAuthenticatedEndpointsSupportedText = "WEBSOCKET AUTHENTICATED ENDPOINTS SUPPORTED"
WebsocketAccountDataSupportedText = "WEBSOCKET ACCOUNT DATA SUPPORTED"
WebsocketSubmitOrderSupportedText = "WEBSOCKET SUBMIT ORDER SUPPORTED"
WebsocketCancelOrderSupportedText = "WEBSOCKET CANCEL ORDER SUPPORTED"
WebsocketWithdrawSupportedText = "WEBSOCKET WITHDRAW SUPPORTED"
WebsocketMessageCorrelationSupportedText = "WEBSOCKET MESSAGE CORRELATION SUPPORTED"
WebsocketSequenceNumberSupportedText = "WEBSOCKET SEQUENCE NUMBER SUPPORTED"
WebsocketDeadMansSwitchSupportedText = "WEBSOCKET DEAD MANS SWITCH SUPPORTED"
// WebsocketNotEnabled alerts of a disabled websocket
WebsocketNotEnabled = "exchange_websocket_not_enabled"
manageSubscriptionsDelay = 5 * time.Second
@@ -58,8 +23,6 @@ const (
// Websocket defines a return type for websocket connections via the interface
// wrapper for routine processing in routines.go
type Websocket struct {
// Functionality defines websocket stream capabilities
Functionality uint32
canUseAuthenticatedEndpoints bool
enabled bool
init bool
@@ -93,6 +56,7 @@ type Websocket struct {
TrafficAlert chan struct{}
// ReadMessageErrors will received all errors from ws.ReadMessage() and verify if its a disconnection
ReadMessageErrors chan error
features *protocol.Features
}
type WebsocketSetup struct {
@@ -106,6 +70,7 @@ type WebsocketSetup struct {
Connector func() error
Subscriber func(channelToSubscribe WebsocketChannelSubscription) error
UnSubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error
Features *protocol.Features
}
// WebsocketChannelSubscription container for websocket subscriptions

View File

@@ -242,6 +242,39 @@ func BenchmarkNoBufferPerformance(b *testing.B) {
}
}
func TestUpdates(t *testing.T) {
obl, curr, _, _, err := createSnapshot()
if err != nil {
t.Error(err)
}
obl.updateAsksByPrice(obl.ob[curr][asset.Spot], &WebsocketOrderbookUpdate{
Bids: itemArray[5],
Asks: itemArray[5],
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: asset.Spot,
})
if err != nil {
t.Error(err)
}
obl.updateAsksByPrice(obl.ob[curr][asset.Spot], &WebsocketOrderbookUpdate{
Bids: itemArray[0],
Asks: itemArray[0],
CurrencyPair: curr,
UpdateTime: time.Now(),
AssetType: asset.Spot,
})
if err != nil {
t.Error(err)
}
if len(obl.ob[curr][asset.Spot].Asks) != 3 {
t.Error("Did not update")
}
}
// TestHittingTheBuffer logic test
func TestHittingTheBuffer(t *testing.T) {
obl, curr, _, _, err := createSnapshot()