Initial overhaul of websocket connection and feeds (#189)

* Initial overhaul of websocket connection and feeds
* Added proxy support
* Piped to routines.go

* Added new websocket file in exchanges
Refactored orderbook handling into exchange_websocket.go
Added better error responses for binance_websocket.go
General clean for binance_websocket.go

* General fixes - bitfinex_websocket.go
Refactored orderbook cache code - bitfinex_websocket.go
Removed fatal error with unhandled type - routines.go

* Added general improvements to bitmex_websocket.go
Refactored orderbook handling to exchange_websocket.go
Added variable in Item struct in orderbook.go for looking up orders by ID

* Fix issue when routines are blocked due to Data Handler not started
Updated traffic handler
General fixes for bitstamp_websocket.go

* General fixes for coinbasepro_websocket.go

* General fixes for coinut_websocket.go
Fixed error return in exchange_websocket.go

* Removed comments in coinut_wrapper.go
Refactor orderbook logic from hitbtc_websocket.go to exchange_websocket.go

* General fixes

* Removed comments
General fixes

* Updated routines.go

* After rebase fix

* Fixed update config pairs in okcoin.go

* fixed config currency issue in okcoin.go for okcoin China

* exchange_websocket.go
*Removed unused const dec
*Removed state change routine
*Improved trafficMonitor routine
*Increased verbosity for error returns
*Removed uneeded mutex locks

exchange_websocket_test.go
*Added new tests for websocket and orderbook updating

routines.go
*Removed string cased

* Fixed race conditions on sync.waitgroup in exchanges_websocket.go

* Changes variable name in config.go

* Removes unnecessary comment

* Removes indefinite lock on error return

* Removes unnecessary comment

* Adds support for BTCC websocket
Drops support for BTCC REST

* Rewords comment in exchange_websocket.go
Moves types to poloniex_types.go

* Moves types to coinut_types.go

* Removes uneeded range for accessing array variables for coinbase_websocket.go
Removes comments in coinut_types.go

* Adds verbosity flag to GCT
Suppresses verbose output from routines.go

* Fixes setting proxy for REST and Websocket per exchange
Upgrades error handling
Drops unused *url.Url variable in exchange type

* Adds test for setting proxy

* Fixes bug that closes connection due to incorrect timeout time through a proxy connection

* Clarify verbose flag message
This commit is contained in:
Ryan O'Hara-Reid
2018-10-24 14:22:41 +11:00
committed by Adrian Gallagher
parent 7315e6604c
commit d3c2800fe0
99 changed files with 6515 additions and 3031 deletions

View File

@@ -56,7 +56,8 @@ const (
// Bitstamp is the overarching type across the bitstamp package
type Bitstamp struct {
exchange.Base
Balance Balances
Balance Balances
WebsocketConn WebsocketConn
}
// SetDefaults sets default for Bitstamp
@@ -64,7 +65,6 @@ func (b *Bitstamp) SetDefaults() {
b.Name = "Bitstamp"
b.Enabled = false
b.Verbose = false
b.Websocket = false
b.RESTPollingDelay = 10
b.RequestCurrencyPairFormat.Delimiter = ""
b.RequestCurrencyPairFormat.Uppercase = true
@@ -79,6 +79,7 @@ func (b *Bitstamp) SetDefaults() {
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
b.APIUrlDefault = bitstampAPIURL
b.APIUrl = b.APIUrlDefault
b.WebsocketInit()
}
// Setup sets configuration values to bitstamp
@@ -93,7 +94,7 @@ func (b *Bitstamp) Setup(exch config.ExchangeConfig) {
b.SetHTTPClientUserAgent(exch.HTTPUserAgent)
b.RESTPollingDelay = exch.RESTPollingDelay
b.Verbose = exch.Verbose
b.Websocket = exch.Websocket
b.Websocket.SetEnabled(exch.Websocket)
b.BaseCurrencies = common.SplitStrings(exch.BaseCurrencies, ",")
b.AvailablePairs = common.SplitStrings(exch.AvailablePairs, ",")
b.EnabledPairs = common.SplitStrings(exch.EnabledPairs, ",")
@@ -113,6 +114,18 @@ func (b *Bitstamp) Setup(exch config.ExchangeConfig) {
if err != nil {
log.Fatal(err)
}
err = b.SetClientProxyAddress(exch.ProxyAddress)
if err != nil {
log.Fatal(err)
}
err = b.WebsocketSetup(b.WsConnect,
exch.Name,
exch.Websocket,
BitstampPusherKey,
exch.WebsocketURL)
if err != nil {
log.Fatal(err)
}
}
}

View File

@@ -29,7 +29,7 @@ func TestSetDefaults(t *testing.T) {
if b.Verbose != false {
t.Error("Test Failed - SetDefaults() error")
}
if b.Websocket != false {
if b.Websocket.IsEnabled() != false {
t.Error("Test Failed - SetDefaults() error")
}
if b.RESTPollingDelay != 10 {
@@ -47,7 +47,7 @@ func TestSetup(t *testing.T) {
b.Setup(bConfig)
if !b.IsEnabled() || b.AuthenticatedAPISupport || b.RESTPollingDelay != time.Duration(10) ||
b.Verbose || b.Websocket || len(b.BaseCurrencies) < 1 ||
b.Verbose || b.Websocket.IsEnabled() || len(b.BaseCurrencies) < 1 ||
len(b.AvailablePairs) < 1 || len(b.EnabledPairs) < 1 {
t.Error("Test Failed - Bitstamp Setup values not set correctly")
}

View File

@@ -4,23 +4,47 @@ import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/currency/pair"
"github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
"github.com/toorop/go-pusher"
)
// WebsocketConn defins a pusher websocket connection
type WebsocketConn struct {
Client *pusher.Client
Data chan *pusher.Event
Trade chan *pusher.Event
}
// PusherOrderbook holds order book information to be pushed
type PusherOrderbook struct {
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Timestamp int64 `json:"timestamp,string"`
}
// PusherTrade holds trade information to be pushed
type PusherTrade struct {
Price float64 `json:"price"`
Amount float64 `json:"amount"`
Price float64 `json:"price"`
Amount float64 `json:"amount"`
ID int64 `json:"id"`
Type int64 `json:"type"`
Timestamp int64 `json:"timestamp,string"`
BuyOrderID int64 `json:"buy_order_id"`
SellOrderID int64 `json:"sell_order_id"`
}
// PusherOrders defines order information
type PusherOrders struct {
ID int64 `json:"id"`
Amount float64 `json:"amount"`
Price float64 `json:""`
}
const (
@@ -28,6 +52,8 @@ const (
BitstampPusherKey = "de504dc5763aeef9ff52"
)
var tradingPairs map[string]string
// findPairFromChannel extracts the capitalized trading pair from the channel and returns it only if enabled in the config
func (b *Bitstamp) findPairFromChannel(channelName string) (string, error) {
split := strings.Split(channelName, "_")
@@ -39,92 +65,214 @@ func (b *Bitstamp) findPairFromChannel(channelName string) (string, error) {
}
}
return "", errors.New("Could not find trading pair")
return "", errors.New("bistamp_websocket.go error - could not find trading pair")
}
// PusherClient starts the push mechanism
func (b *Bitstamp) PusherClient() {
for b.Enabled && b.Websocket {
// hold the mapping of channel:tradingPair in order not to always compute it
seenTradingPairs := map[string]string{}
// WsConnect connects to a websocket feed
func (b *Bitstamp) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return errors.New(exchange.WebsocketNotEnabled)
}
pusherClient, err := pusher.NewClient(BitstampPusherKey)
tradingPairs = make(map[string]string)
var err error
if b.Websocket.GetProxyAddress() != "" {
log.Println("bistamp_websocket.go warning - set proxy address error: proxy not supported")
}
b.WebsocketConn.Client, err = pusher.NewClient(BitstampPusherKey)
if err != nil {
return fmt.Errorf("%s Unable to connect to Websocket. Error: %s",
b.GetName(),
err)
}
b.WebsocketConn.Data, err = b.WebsocketConn.Client.Bind("data")
if err != nil {
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
}
b.WebsocketConn.Trade, err = b.WebsocketConn.Client.Bind("trade")
if err != nil {
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
}
go b.WsReadData()
for _, p := range b.GetEnabledCurrencies() {
orderbookSeed, err := b.GetOrderbook(p.Pair().String())
if err != nil {
log.Printf("%s Unable to connect to Websocket. Error: %s\n", b.GetName(), err)
continue
return err
}
for _, pair := range b.EnabledPairs {
err = pusherClient.Subscribe(fmt.Sprintf("live_trades_%s", strings.ToLower(pair)))
var newOrderbook orderbook.Base
var asks []orderbook.Item
for _, ask := range orderbookSeed.Asks {
var item orderbook.Item
item.Amount = ask.Amount
item.Price = ask.Price
asks = append(asks, item)
}
var bids []orderbook.Item
for _, bid := range orderbookSeed.Bids {
var item orderbook.Item
item.Amount = bid.Amount
item.Price = bid.Price
bids = append(bids, item)
}
newOrderbook.Asks = asks
newOrderbook.Bids = bids
newOrderbook.CurrencyPair = p.Pair().String()
newOrderbook.Pair = p
newOrderbook.LastUpdated = time.Unix(0, orderbookSeed.Timestamp)
newOrderbook.AssetType = "SPOT"
err = b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName())
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Pair: p,
Asset: "SPOT",
Exchange: b.GetName(),
}
err = b.WebsocketConn.Client.Subscribe(fmt.Sprintf("live_trades_%s",
strings.ToLower(p.Pair().String())))
if err != nil {
log.Println(err)
return fmt.Errorf("%s Websocket Trade subscription error: %s",
b.GetName(),
err)
}
err = b.WebsocketConn.Client.Subscribe(fmt.Sprintf("diff_order_book_%s",
strings.ToLower(p.Pair().String())))
if err != nil {
log.Println(err)
return fmt.Errorf("%s Websocket Trade subscription error: %s",
b.GetName(),
err)
}
}
return nil
}
// WsReadData reads data coming from bitstamp websocket connection
func (b *Bitstamp) WsReadData() {
b.Websocket.Wg.Add(1)
defer func() {
err := b.WebsocketConn.Client.Close()
if err != nil {
b.Websocket.DataHandler <- fmt.Errorf("bitstamp_websocket.go - Unable to to close Websocket connection. Error: %s",
err)
}
b.Websocket.Wg.Done()
}()
for {
select {
case <-b.Websocket.ShutdownC:
return
case data := <-b.WebsocketConn.Data:
b.Websocket.TrafficAlert <- struct{}{}
result := PusherOrderbook{}
err := common.JSONDecode([]byte(data.Data), &result)
if err != nil {
log.Printf("%s Websocket Trade subscription error: %s\n", b.GetName(), err)
log.Fatal(err)
}
err = pusherClient.Subscribe(fmt.Sprintf("order_book_%s", strings.ToLower(pair)))
currencyPair := common.SplitStrings(data.Channel, "_")
p := pair.NewCurrencyPairFromString(common.StringToUpper(currencyPair[3]))
err = b.WsUpdateOrderbook(result, p, "SPOT")
if err != nil {
log.Printf("%s Websocket Trade subscription error: %s\n", b.GetName(), err)
b.Websocket.DataHandler <- err
}
}
dataChannelTrade, err := pusherClient.Bind("data")
if err != nil {
log.Printf("%s Websocket Bind error: %s\n", b.GetName(), err)
continue
}
case trade := <-b.WebsocketConn.Trade:
b.Websocket.TrafficAlert <- struct{}{}
tradeChannelTrade, err := pusherClient.Bind("trade")
if err != nil {
log.Printf("%s Websocket Bind error: %s\n", b.GetName(), err)
continue
}
result := PusherTrade{}
err := common.JSONDecode([]byte(trade.Data), &result)
if err != nil {
log.Fatal(err)
}
log.Printf("%s Pusher client connected.\n", b.GetName())
currencyPair := common.SplitStrings(trade.Channel, "_")
for b.Websocket {
select {
case data := <-dataChannelTrade:
result := PusherOrderbook{}
err := common.JSONDecode([]byte(data.Data), &result)
var channelTradingPair string
var ok bool
if channelTradingPair, ok = seenTradingPairs[data.Channel]; !ok {
if foundTradingPair, noPair := b.findPairFromChannel(data.Channel); noPair == nil {
seenTradingPairs[data.Channel] = foundTradingPair
} else {
log.Printf("%s Pair from Channel: %s does not seem to be enabled or found", b.GetName(), data.Channel)
continue
}
}
log.Printf("%s Pusher: received ticker for Pair: %s\n", b.GetName(), channelTradingPair)
if err != nil {
log.Println(err)
}
case trade := <-tradeChannelTrade:
result := PusherTrade{}
err := common.JSONDecode([]byte(trade.Data), &result)
if err != nil {
log.Println(err)
}
var channelTradingPair string
var ok bool
if channelTradingPair, ok = seenTradingPairs[trade.Channel]; !ok {
if foundTradingPair, noPair := b.findPairFromChannel(trade.Channel); noPair == nil {
seenTradingPairs[trade.Channel] = foundTradingPair
} else {
log.Printf("%s LiveTrade Pair from Channel: %s does not seem to be enabled or found", b.GetName(), trade.Channel)
continue
}
}
log.Println(trade.Channel)
log.Printf("%s Pusher trade: Pair: %s Price: %f Amount: %f\n", b.GetName(), channelTradingPair, result.Price, result.Amount)
b.Websocket.DataHandler <- exchange.TradeData{
Price: result.Price,
Amount: result.Amount,
CurrencyPair: pair.NewCurrencyPairFromString(currencyPair[2]),
Exchange: b.GetName(),
AssetType: "SPOT",
}
}
}
}
// WsUpdateOrderbook updates local cache of orderbook information
func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p pair.CurrencyPair, assetType string) error {
if len(ob.Asks) == 0 && len(ob.Bids) == 0 {
return errors.New("bitstamp_websocket.go error - no orderbook data")
}
var asks, bids []orderbook.Item
if len(ob.Asks) > 0 {
for _, ask := range ob.Asks {
target, err := strconv.ParseFloat(ask[0], 64)
if err != nil {
log.Fatal(err)
}
amount, err := strconv.ParseFloat(ask[1], 64)
if err != nil {
log.Fatal(err)
}
asks = append(asks, orderbook.Item{Price: target, Amount: amount})
}
}
if len(ob.Bids) > 0 {
for _, bid := range ob.Bids {
target, err := strconv.ParseFloat(bid[0], 64)
if err != nil {
log.Fatal(err)
}
amount, err := strconv.ParseFloat(bid[1], 64)
if err != nil {
log.Fatal(err)
}
bids = append(bids, orderbook.Item{Price: target, Amount: amount})
}
}
err := b.Websocket.Orderbook.Update(bids, asks, p, time.Now(), b.GetName(), assetType)
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Pair: p,
Asset: assetType,
Exchange: b.GetName(),
}
return nil
}

View File

@@ -25,15 +25,11 @@ func (b *Bitstamp) Start(wg *sync.WaitGroup) {
// Run implements the Bitstamp wrapper
func (b *Bitstamp) Run() {
if b.Verbose {
log.Printf("%s Websocket: %s.", b.GetName(), common.IsEnabled(b.Websocket))
log.Printf("%s Websocket: %s.", b.GetName(), common.IsEnabled(b.Websocket.IsEnabled()))
log.Printf("%s polling delay: %ds.\n", b.GetName(), b.RESTPollingDelay)
log.Printf("%s %d currencies enabled: %s.\n", b.GetName(), len(b.EnabledPairs), b.EnabledPairs)
}
if b.Websocket {
go b.PusherClient()
}
pairs, err := b.GetTradingPairs()
if err != nil {
log.Printf("%s failed to get trading pairs. Err: %s", b.Name, err)
@@ -211,3 +207,8 @@ func (b *Bitstamp) WithdrawFiatExchangeFunds(currency pair.CurrencyItem, amount
func (b *Bitstamp) WithdrawFiatExchangeFundsToInternationalBank(currency pair.CurrencyItem, amount float64) (string, error) {
return "", errors.New("not yet implemented")
}
// GetWebsocket returns a pointer to the exchange websocket
func (b *Bitstamp) GetWebsocket() (*exchange.Websocket, error) {
return b.Websocket, nil
}