Files
gocryptotrader/exchanges/bitstamp/bitstamp_websocket.go
Ryan O'Hara-Reid d3c2800fe0 Initial overhaul of websocket connection and feeds (#189)
* Initial overhaul of websocket connection and feeds
* Added proxy support
* Piped to routines.go

* Added new websocket file in exchanges
Refactored orderbook handling into exchange_websocket.go
Added better error responses for binance_websocket.go
General clean for binance_websocket.go

* General fixes - bitfinex_websocket.go
Refactored orderbook cache code - bitfinex_websocket.go
Removed fatal error with unhandled type - routines.go

* Added general improvements to bitmex_websocket.go
Refactored orderbook handling to exchange_websocket.go
Added variable in Item struct in orderbook.go for looking up orders by ID

* Fix issue when routines are blocked due to Data Handler not started
Updated traffic handler
General fixes for bitstamp_websocket.go

* General fixes for coinbasepro_websocket.go

* General fixes for coinut_websocket.go
Fixed error return in exchange_websocket.go

* Removed comments in coinut_wrapper.go
Refactor orderbook logic from hitbtc_websocket.go to exchange_websocket.go

* General fixes

* Removed comments
General fixes

* Updated routines.go

* After rebase fix

* Fixed update config pairs in okcoin.go

* fixed config currency issue in okcoin.go for okcoin China

* exchange_websocket.go
*Removed unused const dec
*Removed state change routine
*Improved trafficMonitor routine
*Increased verbosity for error returns
*Removed uneeded mutex locks

exchange_websocket_test.go
*Added new tests for websocket and orderbook updating

routines.go
*Removed string cased

* Fixed race conditions on sync.waitgroup in exchanges_websocket.go

* Changes variable name in config.go

* Removes unnecessary comment

* Removes indefinite lock on error return

* Removes unnecessary comment

* Adds support for BTCC websocket
Drops support for BTCC REST

* Rewords comment in exchange_websocket.go
Moves types to poloniex_types.go

* Moves types to coinut_types.go

* Removes uneeded range for accessing array variables for coinbase_websocket.go
Removes comments in coinut_types.go

* Adds verbosity flag to GCT
Suppresses verbose output from routines.go

* Fixes setting proxy for REST and Websocket per exchange
Upgrades error handling
Drops unused *url.Url variable in exchange type

* Adds test for setting proxy

* Fixes bug that closes connection due to incorrect timeout time through a proxy connection

* Clarify verbose flag message
2018-10-24 14:22:40 +11:00

279 lines
6.7 KiB
Go

package bitstamp
import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/currency/pair"
"github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
"github.com/toorop/go-pusher"
)
// WebsocketConn defins a pusher websocket connection
type WebsocketConn struct {
Client *pusher.Client
Data chan *pusher.Event
Trade chan *pusher.Event
}
// PusherOrderbook holds order book information to be pushed
type PusherOrderbook struct {
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
Timestamp int64 `json:"timestamp,string"`
}
// PusherTrade holds trade information to be pushed
type PusherTrade struct {
Price float64 `json:"price"`
Amount float64 `json:"amount"`
ID int64 `json:"id"`
Type int64 `json:"type"`
Timestamp int64 `json:"timestamp,string"`
BuyOrderID int64 `json:"buy_order_id"`
SellOrderID int64 `json:"sell_order_id"`
}
// PusherOrders defines order information
type PusherOrders struct {
ID int64 `json:"id"`
Amount float64 `json:"amount"`
Price float64 `json:""`
}
const (
// BitstampPusherKey holds the current pusher key
BitstampPusherKey = "de504dc5763aeef9ff52"
)
var tradingPairs map[string]string
// findPairFromChannel extracts the capitalized trading pair from the channel and returns it only if enabled in the config
func (b *Bitstamp) findPairFromChannel(channelName string) (string, error) {
split := strings.Split(channelName, "_")
tradingPair := strings.ToUpper(split[len(split)-1])
for _, enabledPair := range b.EnabledPairs {
if enabledPair == tradingPair {
return tradingPair, nil
}
}
return "", errors.New("bistamp_websocket.go error - could not find trading pair")
}
// WsConnect connects to a websocket feed
func (b *Bitstamp) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
return errors.New(exchange.WebsocketNotEnabled)
}
tradingPairs = make(map[string]string)
var err error
if b.Websocket.GetProxyAddress() != "" {
log.Println("bistamp_websocket.go warning - set proxy address error: proxy not supported")
}
b.WebsocketConn.Client, err = pusher.NewClient(BitstampPusherKey)
if err != nil {
return fmt.Errorf("%s Unable to connect to Websocket. Error: %s",
b.GetName(),
err)
}
b.WebsocketConn.Data, err = b.WebsocketConn.Client.Bind("data")
if err != nil {
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
}
b.WebsocketConn.Trade, err = b.WebsocketConn.Client.Bind("trade")
if err != nil {
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
}
go b.WsReadData()
for _, p := range b.GetEnabledCurrencies() {
orderbookSeed, err := b.GetOrderbook(p.Pair().String())
if err != nil {
return err
}
var newOrderbook orderbook.Base
var asks []orderbook.Item
for _, ask := range orderbookSeed.Asks {
var item orderbook.Item
item.Amount = ask.Amount
item.Price = ask.Price
asks = append(asks, item)
}
var bids []orderbook.Item
for _, bid := range orderbookSeed.Bids {
var item orderbook.Item
item.Amount = bid.Amount
item.Price = bid.Price
bids = append(bids, item)
}
newOrderbook.Asks = asks
newOrderbook.Bids = bids
newOrderbook.CurrencyPair = p.Pair().String()
newOrderbook.Pair = p
newOrderbook.LastUpdated = time.Unix(0, orderbookSeed.Timestamp)
newOrderbook.AssetType = "SPOT"
err = b.Websocket.Orderbook.LoadSnapshot(newOrderbook, b.GetName())
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Pair: p,
Asset: "SPOT",
Exchange: b.GetName(),
}
err = b.WebsocketConn.Client.Subscribe(fmt.Sprintf("live_trades_%s",
strings.ToLower(p.Pair().String())))
if err != nil {
log.Println(err)
return fmt.Errorf("%s Websocket Trade subscription error: %s",
b.GetName(),
err)
}
err = b.WebsocketConn.Client.Subscribe(fmt.Sprintf("diff_order_book_%s",
strings.ToLower(p.Pair().String())))
if err != nil {
log.Println(err)
return fmt.Errorf("%s Websocket Trade subscription error: %s",
b.GetName(),
err)
}
}
return nil
}
// WsReadData reads data coming from bitstamp websocket connection
func (b *Bitstamp) WsReadData() {
b.Websocket.Wg.Add(1)
defer func() {
err := b.WebsocketConn.Client.Close()
if err != nil {
b.Websocket.DataHandler <- fmt.Errorf("bitstamp_websocket.go - Unable to to close Websocket connection. Error: %s",
err)
}
b.Websocket.Wg.Done()
}()
for {
select {
case <-b.Websocket.ShutdownC:
return
case data := <-b.WebsocketConn.Data:
b.Websocket.TrafficAlert <- struct{}{}
result := PusherOrderbook{}
err := common.JSONDecode([]byte(data.Data), &result)
if err != nil {
log.Fatal(err)
}
currencyPair := common.SplitStrings(data.Channel, "_")
p := pair.NewCurrencyPairFromString(common.StringToUpper(currencyPair[3]))
err = b.WsUpdateOrderbook(result, p, "SPOT")
if err != nil {
b.Websocket.DataHandler <- err
}
case trade := <-b.WebsocketConn.Trade:
b.Websocket.TrafficAlert <- struct{}{}
result := PusherTrade{}
err := common.JSONDecode([]byte(trade.Data), &result)
if err != nil {
log.Fatal(err)
}
currencyPair := common.SplitStrings(trade.Channel, "_")
b.Websocket.DataHandler <- exchange.TradeData{
Price: result.Price,
Amount: result.Amount,
CurrencyPair: pair.NewCurrencyPairFromString(currencyPair[2]),
Exchange: b.GetName(),
AssetType: "SPOT",
}
}
}
}
// WsUpdateOrderbook updates local cache of orderbook information
func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p pair.CurrencyPair, assetType string) error {
if len(ob.Asks) == 0 && len(ob.Bids) == 0 {
return errors.New("bitstamp_websocket.go error - no orderbook data")
}
var asks, bids []orderbook.Item
if len(ob.Asks) > 0 {
for _, ask := range ob.Asks {
target, err := strconv.ParseFloat(ask[0], 64)
if err != nil {
log.Fatal(err)
}
amount, err := strconv.ParseFloat(ask[1], 64)
if err != nil {
log.Fatal(err)
}
asks = append(asks, orderbook.Item{Price: target, Amount: amount})
}
}
if len(ob.Bids) > 0 {
for _, bid := range ob.Bids {
target, err := strconv.ParseFloat(bid[0], 64)
if err != nil {
log.Fatal(err)
}
amount, err := strconv.ParseFloat(bid[1], 64)
if err != nil {
log.Fatal(err)
}
bids = append(bids, orderbook.Item{Price: target, Amount: amount})
}
}
err := b.Websocket.Orderbook.Update(bids, asks, p, time.Now(), b.GetName(), assetType)
if err != nil {
return err
}
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Pair: p,
Asset: assetType,
Exchange: b.GetName(),
}
return nil
}