mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-18 07:26:50 +00:00
Bitstamp: Websocket API upgrade to v2 (#307)
* Subscribe/Unsubscribe methods added * migration to v3 * removed orderbook from rest * WsUpdateOrderbook updated to reflect changes to v2 * Added comment for exported func * removed logging * unexported structs that are not used globally moved seed to own function * unexported functions not used outside package * Support reconnection message from bitstamp * moved from range key/val * using ticket.Spot instead of string * Seperated out WsReadData & WsHandleData to allow for better testing of websocket messages * ah should continue to next iteration and not break execution on json decode * code formatting clean up * reworded connection message * return out of method instead of just breaking loop * formatting changes and replaced SPOT with ticket.Spot type
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/thrasher-/gocryptotrader/common"
|
||||
"github.com/thrasher-/gocryptotrader/config"
|
||||
"github.com/thrasher-/gocryptotrader/currency"
|
||||
@@ -63,7 +64,7 @@ const (
|
||||
type Bitstamp struct {
|
||||
exchange.Base
|
||||
Balance Balances
|
||||
WebsocketConn WebsocketConn
|
||||
WebsocketConn *websocket.Conn
|
||||
wsRequestMtx sync.Mutex
|
||||
}
|
||||
|
||||
@@ -116,6 +117,7 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) {
|
||||
b.APISecret = exch.APISecret
|
||||
b.SetAPIKeys(exch.APIKey, exch.APISecret, b.ClientID, false)
|
||||
b.AuthenticatedAPISupport = true
|
||||
b.WebsocketURL = bitstampWSURL
|
||||
err := b.SetCurrencyPairFormat()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -142,7 +144,7 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) {
|
||||
exch.Name,
|
||||
exch.Websocket,
|
||||
exch.Verbose,
|
||||
BitstampPusherKey,
|
||||
bitstampWSURL,
|
||||
exch.WebsocketURL)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package bitstamp
|
||||
|
||||
import pusher "github.com/toorop/go-pusher"
|
||||
|
||||
// Ticker holds ticker information
|
||||
type Ticker struct {
|
||||
Last float64 `json:"last,string"`
|
||||
@@ -160,34 +158,46 @@ const (
|
||||
errStr string = "error"
|
||||
)
|
||||
|
||||
// WebsocketConn defines a pusher websocket connection
|
||||
type WebsocketConn struct {
|
||||
Client *pusher.Client
|
||||
Data chan *pusher.Event
|
||||
Trade chan *pusher.Event
|
||||
type websocketEventRequest struct {
|
||||
Event string `json:"event"`
|
||||
Data websocketData `json:"data"`
|
||||
}
|
||||
|
||||
// PusherOrderbook holds order book information to be pushed
|
||||
type PusherOrderbook struct {
|
||||
Asks [][]string `json:"asks"`
|
||||
Bids [][]string `json:"bids"`
|
||||
Timestamp int64 `json:"timestamp,string"`
|
||||
type websocketData struct {
|
||||
Channel string `json:"channel"`
|
||||
}
|
||||
|
||||
// PusherTrade holds trade information to be pushed
|
||||
type PusherTrade struct {
|
||||
Price float64 `json:"price"`
|
||||
Amount float64 `json:"amount"`
|
||||
ID int64 `json:"id"`
|
||||
Type int64 `json:"type"`
|
||||
Timestamp int64 `json:"timestamp,string"`
|
||||
BuyOrderID int64 `json:"buy_order_id"`
|
||||
SellOrderID int64 `json:"sell_order_id"`
|
||||
type websocketResponse struct {
|
||||
Event string `json:"event"`
|
||||
Channel string `json:"channel"`
|
||||
}
|
||||
|
||||
// PusherOrders defines order information
|
||||
type PusherOrders struct {
|
||||
ID int64 `json:"id"`
|
||||
Amount float64 `json:"amount"`
|
||||
Price float64 `json:""`
|
||||
type websocketTradeResponse struct {
|
||||
websocketResponse
|
||||
Data websocketTradeData `json:"data"`
|
||||
}
|
||||
|
||||
type websocketTradeData struct {
|
||||
Microtimestamp string `json:"microtimestamp"`
|
||||
Amount float64 `json:"amount"`
|
||||
BuyOrderID int64 `json:"buy_order_id"`
|
||||
SellOrderID int64 `json:"sell_order_id"`
|
||||
AmountStr string `json:"amount_str"`
|
||||
PriceStr string `json:"price_str"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Price float64 `json:"price"`
|
||||
Type int `json:"type"`
|
||||
ID int `json:"id"`
|
||||
}
|
||||
|
||||
type websocketOrderBookResponse struct {
|
||||
websocketResponse
|
||||
Data websocketOrderBook `json:"data"`
|
||||
}
|
||||
|
||||
type websocketOrderBook struct {
|
||||
Asks [][]string `json:"asks"`
|
||||
Bids [][]string `json:"bids"`
|
||||
Timestamp int64 `json:"timestamp,string"`
|
||||
Microtimestamp string `json:"microtimestamp"`
|
||||
}
|
||||
|
||||
@@ -3,126 +3,161 @@ package bitstamp
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/thrasher-/gocryptotrader/common"
|
||||
"github.com/thrasher-/gocryptotrader/currency"
|
||||
exchange "github.com/thrasher-/gocryptotrader/exchanges"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
|
||||
"github.com/thrasher-/gocryptotrader/exchanges/ticker"
|
||||
log "github.com/thrasher-/gocryptotrader/logger"
|
||||
pusher "github.com/toorop/go-pusher"
|
||||
)
|
||||
|
||||
const (
|
||||
// BitstampPusherKey holds the current pusher key
|
||||
BitstampPusherKey = "de504dc5763aeef9ff52"
|
||||
bitstampWSURL = "wss://ws.bitstamp.net"
|
||||
)
|
||||
|
||||
var tradingPairs map[string]string
|
||||
|
||||
// findPairFromChannel extracts the capitalized trading pair from the channel and returns it only if enabled in the config
|
||||
func (b *Bitstamp) findPairFromChannel(channelName string) (string, error) {
|
||||
split := strings.Split(channelName, "_")
|
||||
tradingPair := strings.ToUpper(split[len(split)-1])
|
||||
|
||||
for _, enabledPair := range b.EnabledPairs {
|
||||
if enabledPair.String() == tradingPair {
|
||||
return tradingPair, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", errors.New("bistamp_websocket.go error - could not find trading pair")
|
||||
}
|
||||
|
||||
// WsConnect connects to a websocket feed
|
||||
func (b *Bitstamp) WsConnect() error {
|
||||
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
|
||||
return errors.New(exchange.WebsocketNotEnabled)
|
||||
}
|
||||
|
||||
tradingPairs = make(map[string]string)
|
||||
var err error
|
||||
|
||||
var dialer websocket.Dialer
|
||||
if b.Websocket.GetProxyAddress() != "" {
|
||||
log.Warn("bitstamp_websocket.go warning - set proxy address error: proxy not supported")
|
||||
proxy, err := url.Parse(b.Websocket.GetProxyAddress())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dialer.Proxy = http.ProxyURL(proxy)
|
||||
}
|
||||
|
||||
b.WebsocketConn.Client, err = pusher.NewClient(BitstampPusherKey)
|
||||
var err error
|
||||
b.WebsocketConn, _, err = dialer.Dial(b.Websocket.GetWebsocketURL(), http.Header{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Unable to connect to Websocket. Error: %s",
|
||||
b.GetName(),
|
||||
b.Name,
|
||||
err)
|
||||
}
|
||||
|
||||
b.WebsocketConn.Data, err = b.WebsocketConn.Client.Bind("data")
|
||||
if b.Verbose {
|
||||
log.Debugf("%s Connected to Websocket.\n", b.GetName())
|
||||
}
|
||||
|
||||
err = b.seedOrderBook()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
|
||||
|
||||
b.Websocket.DataHandler <- err
|
||||
}
|
||||
|
||||
b.WebsocketConn.Trade, err = b.WebsocketConn.Client.Bind("trade")
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err)
|
||||
}
|
||||
b.GenerateDefaultSubscriptions()
|
||||
go b.WsReadData()
|
||||
b.generateDefaultSubscriptions()
|
||||
go b.WsHandleData()
|
||||
|
||||
for _, p := range b.GetEnabledCurrencies() {
|
||||
orderbookSeed, err := b.GetOrderbook(p.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var newOrderBook orderbook.Base
|
||||
|
||||
var asks []orderbook.Item
|
||||
for _, ask := range orderbookSeed.Asks {
|
||||
var item orderbook.Item
|
||||
item.Amount = ask.Amount
|
||||
item.Price = ask.Price
|
||||
asks = append(asks, item)
|
||||
}
|
||||
|
||||
var bids []orderbook.Item
|
||||
for _, bid := range orderbookSeed.Bids {
|
||||
var item orderbook.Item
|
||||
item.Amount = bid.Amount
|
||||
item.Price = bid.Price
|
||||
bids = append(bids, item)
|
||||
}
|
||||
|
||||
newOrderBook.Asks = asks
|
||||
newOrderBook.Bids = bids
|
||||
newOrderBook.Pair = p
|
||||
newOrderBook.AssetType = "SPOT"
|
||||
|
||||
err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, b.GetName(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
|
||||
Pair: p,
|
||||
Asset: "SPOT",
|
||||
Exchange: b.GetName(),
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
|
||||
func (b *Bitstamp) GenerateDefaultSubscriptions() {
|
||||
// WsReadData reads data coming from bitstamp websocket connection
|
||||
func (b *Bitstamp) WsReadData() (exchange.WebsocketResponse, error) {
|
||||
msgType, resp, err := b.WebsocketConn.ReadMessage()
|
||||
|
||||
if err != nil {
|
||||
return exchange.WebsocketResponse{}, err
|
||||
}
|
||||
|
||||
if b.Verbose {
|
||||
log.Debugf("%s websocket raw response: %s", b.GetName(), resp)
|
||||
}
|
||||
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
return exchange.WebsocketResponse{Type: msgType, Raw: resp}, nil
|
||||
}
|
||||
|
||||
// WsHandleData handles websocket data from WsReadData
|
||||
func (b *Bitstamp) WsHandleData() {
|
||||
b.Websocket.Wg.Add(1)
|
||||
|
||||
defer func() {
|
||||
b.Websocket.Wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-b.Websocket.ShutdownC:
|
||||
return
|
||||
|
||||
default:
|
||||
resp, err := b.WsReadData()
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
return
|
||||
}
|
||||
|
||||
wsResponse := websocketResponse{}
|
||||
err = common.JSONDecode(resp.Raw, &wsResponse)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
switch wsResponse.Event {
|
||||
case "bts:request_reconnect":
|
||||
if b.Verbose {
|
||||
log.Debugf("%v - Websocket reconnection request received", b.GetName())
|
||||
}
|
||||
go b.Websocket.WebsocketReset()
|
||||
|
||||
case "data":
|
||||
wsOrderBookTemp := websocketOrderBookResponse{}
|
||||
err := common.JSONDecode(resp.Raw, &wsOrderBookTemp)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
currencyPair := common.SplitStrings(wsResponse.Channel, "_")
|
||||
p := currency.NewPairFromString(common.StringToUpper(currencyPair[3]))
|
||||
|
||||
err = b.wsUpdateOrderbook(wsOrderBookTemp.Data, p, ticker.Spot)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
case "trade":
|
||||
wsTradeTemp := websocketTradeResponse{}
|
||||
|
||||
err := common.JSONDecode(resp.Raw, &wsTradeTemp)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
currencyPair := common.SplitStrings(wsResponse.Channel, "_")
|
||||
p := currency.NewPairFromString(common.StringToUpper(currencyPair[2]))
|
||||
|
||||
b.Websocket.DataHandler <- exchange.TradeData{
|
||||
Price: wsTradeTemp.Data.Price,
|
||||
Amount: wsTradeTemp.Data.Amount,
|
||||
CurrencyPair: p,
|
||||
Exchange: b.GetName(),
|
||||
AssetType: ticker.Spot,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bitstamp) generateDefaultSubscriptions() {
|
||||
var channels = []string{"live_trades_", "diff_order_book_"}
|
||||
enabledCurrencies := b.GetEnabledCurrencies()
|
||||
subscriptions := []exchange.WebsocketChannelSubscription{}
|
||||
for i := range channels {
|
||||
for j := range enabledCurrencies {
|
||||
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
|
||||
Channel: fmt.Sprintf("%v%v", channels[i], enabledCurrencies[j].Lower().String()),
|
||||
Currency: enabledCurrencies[j],
|
||||
Channel: fmt.Sprintf("%v%v", channels[i], enabledCurrencies[j].Lower().String()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -133,88 +168,37 @@ func (b *Bitstamp) GenerateDefaultSubscriptions() {
|
||||
func (b *Bitstamp) Subscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
|
||||
b.wsRequestMtx.Lock()
|
||||
defer b.wsRequestMtx.Unlock()
|
||||
if b.Verbose {
|
||||
log.Debugf("%v sending message to websocket %v", b.Name, channelToSubscribe)
|
||||
|
||||
req := websocketEventRequest{
|
||||
Event: "bts:subscribe",
|
||||
Data: websocketData{
|
||||
Channel: channelToSubscribe.Channel,
|
||||
},
|
||||
}
|
||||
return b.WebsocketConn.Client.Subscribe(channelToSubscribe.Channel)
|
||||
return b.WebsocketConn.WriteJSON(req)
|
||||
}
|
||||
|
||||
// Unsubscribe sends a websocket message to stop receiving data from the channel
|
||||
func (b *Bitstamp) Unsubscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
|
||||
b.wsRequestMtx.Lock()
|
||||
defer b.wsRequestMtx.Unlock()
|
||||
if b.Verbose {
|
||||
log.Debugf("%v sending message to websocket %v", b.Name, channelToSubscribe)
|
||||
|
||||
req := websocketEventRequest{
|
||||
Event: "bts:unsubscribe",
|
||||
Data: websocketData{
|
||||
Channel: channelToSubscribe.Channel,
|
||||
},
|
||||
}
|
||||
return b.WebsocketConn.Client.Unsubscribe(channelToSubscribe.Channel)
|
||||
return b.WebsocketConn.WriteJSON(req)
|
||||
}
|
||||
|
||||
// WsReadData reads data coming from bitstamp websocket connection
|
||||
func (b *Bitstamp) WsReadData() {
|
||||
b.Websocket.Wg.Add(1)
|
||||
defer func() {
|
||||
err := b.WebsocketConn.Client.Close()
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- fmt.Errorf("bitstamp_websocket.go - Unable to to close Websocket connection. Error: %s",
|
||||
err)
|
||||
}
|
||||
b.Websocket.Wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-b.Websocket.ShutdownC:
|
||||
return
|
||||
|
||||
case data := <-b.WebsocketConn.Data:
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
|
||||
result := PusherOrderbook{}
|
||||
err := common.JSONDecode([]byte(data.Data), &result)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
currencyPair := common.SplitStrings(data.Channel, "_")
|
||||
p := currency.NewPairFromString(common.StringToUpper(currencyPair[3]))
|
||||
|
||||
err = b.WsUpdateOrderbook(result, p, "SPOT")
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
case trade := <-b.WebsocketConn.Trade:
|
||||
b.Websocket.TrafficAlert <- struct{}{}
|
||||
|
||||
result := PusherTrade{}
|
||||
err := common.JSONDecode([]byte(trade.Data), &result)
|
||||
if err != nil {
|
||||
b.Websocket.DataHandler <- err
|
||||
continue
|
||||
}
|
||||
|
||||
currencyPair := common.SplitStrings(trade.Channel, "_")
|
||||
|
||||
b.Websocket.DataHandler <- exchange.TradeData{
|
||||
Price: result.Price,
|
||||
Amount: result.Amount,
|
||||
CurrencyPair: currency.NewPairFromString(currencyPair[2]),
|
||||
Exchange: b.GetName(),
|
||||
AssetType: "SPOT",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WsUpdateOrderbook updates local cache of orderbook information
|
||||
func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p currency.Pair, assetType string) error {
|
||||
func (b *Bitstamp) wsUpdateOrderbook(ob websocketOrderBook, p currency.Pair, assetType string) error {
|
||||
if len(ob.Asks) == 0 && len(ob.Bids) == 0 {
|
||||
return errors.New("bitstamp_websocket.go error - no orderbook data")
|
||||
}
|
||||
|
||||
var asks, bids []orderbook.Item
|
||||
|
||||
if len(ob.Asks) > 0 {
|
||||
for _, ask := range ob.Asks {
|
||||
target, err := strconv.ParseFloat(ask[0], 64)
|
||||
@@ -264,3 +248,47 @@ func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p currency.Pair, assetT
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Bitstamp) seedOrderBook() error {
|
||||
p := b.GetEnabledCurrencies()
|
||||
for x := range p {
|
||||
orderbookSeed, err := b.GetOrderbook(p[x].String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var newOrderBook orderbook.Base
|
||||
var asks, bids []orderbook.Item
|
||||
|
||||
for _, ask := range orderbookSeed.Asks {
|
||||
var item orderbook.Item
|
||||
item.Amount = ask.Amount
|
||||
item.Price = ask.Price
|
||||
asks = append(asks, item)
|
||||
}
|
||||
|
||||
for _, bid := range orderbookSeed.Bids {
|
||||
var item orderbook.Item
|
||||
item.Amount = bid.Amount
|
||||
item.Price = bid.Price
|
||||
bids = append(bids, item)
|
||||
}
|
||||
|
||||
newOrderBook.Asks = asks
|
||||
newOrderBook.Bids = bids
|
||||
newOrderBook.Pair = p[x]
|
||||
newOrderBook.AssetType = ticker.Spot
|
||||
|
||||
err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, b.GetName(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
|
||||
Pair: p[x],
|
||||
Asset: ticker.Spot,
|
||||
Exchange: b.GetName(),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
1
go.mod
1
go.mod
@@ -6,6 +6,5 @@ require (
|
||||
github.com/google/go-querystring v1.0.0
|
||||
github.com/gorilla/mux v1.7.2
|
||||
github.com/gorilla/websocket v1.4.0
|
||||
github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f
|
||||
)
|
||||
|
||||
2
go.sum
2
go.sum
@@ -4,8 +4,6 @@ github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I=
|
||||
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb h1:9kcmLvQdiIecpgVEL3/+J5QIP/ElRBJDljOay0SvqnA=
|
||||
github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb/go.mod h1:VTLqNCX1tXrur6pdIRCl8Q90FR7nw/mEBdyMkWMcsb0=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
||||
Reference in New Issue
Block a user