From 506d6011c2a3ed4a2a2fd02031671cabab2ed328 Mon Sep 17 00:00:00 2001 From: Andrew Date: Wed, 22 May 2019 15:46:55 +1000 Subject: [PATCH] Bitstamp: Websocket API upgrade to v2 (#307) * Subscribe/Unsubscribe methods added * migration to v3 * removed orderbook from rest * WsUpdateOrderbook updated to reflect changes to v2 * Added comment for exported func * removed logging * unexported structs that are not used globally moved seed to own function * unexported functions not used outside package * Support reconnection message from bitstamp * moved from range key/val * using ticket.Spot instead of string * Seperated out WsReadData & WsHandleData to allow for better testing of websocket messages * ah should continue to next iteration and not break execution on json decode * code formatting clean up * reworded connection message * return out of method instead of just breaking loop * formatting changes and replaced SPOT with ticket.Spot type --- exchanges/bitstamp/bitstamp.go | 6 +- exchanges/bitstamp/bitstamp_types.go | 62 +++-- exchanges/bitstamp/bitstamp_websocket.go | 322 ++++++++++++----------- go.mod | 1 - go.sum | 2 - 5 files changed, 215 insertions(+), 178 deletions(-) diff --git a/exchanges/bitstamp/bitstamp.go b/exchanges/bitstamp/bitstamp.go index a12eac07..868e16bf 100644 --- a/exchanges/bitstamp/bitstamp.go +++ b/exchanges/bitstamp/bitstamp.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/config" "github.com/thrasher-/gocryptotrader/currency" @@ -63,7 +64,7 @@ const ( type Bitstamp struct { exchange.Base Balance Balances - WebsocketConn WebsocketConn + WebsocketConn *websocket.Conn wsRequestMtx sync.Mutex } @@ -116,6 +117,7 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) { b.APISecret = exch.APISecret b.SetAPIKeys(exch.APIKey, exch.APISecret, b.ClientID, false) b.AuthenticatedAPISupport = true + b.WebsocketURL = bitstampWSURL err := b.SetCurrencyPairFormat() if err != nil { log.Fatal(err) @@ -142,7 +144,7 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) { exch.Name, exch.Websocket, exch.Verbose, - BitstampPusherKey, + bitstampWSURL, exch.WebsocketURL) if err != nil { log.Fatal(err) diff --git a/exchanges/bitstamp/bitstamp_types.go b/exchanges/bitstamp/bitstamp_types.go index fb2923a5..1cdc63a9 100644 --- a/exchanges/bitstamp/bitstamp_types.go +++ b/exchanges/bitstamp/bitstamp_types.go @@ -1,7 +1,5 @@ package bitstamp -import pusher "github.com/toorop/go-pusher" - // Ticker holds ticker information type Ticker struct { Last float64 `json:"last,string"` @@ -160,34 +158,46 @@ const ( errStr string = "error" ) -// WebsocketConn defines a pusher websocket connection -type WebsocketConn struct { - Client *pusher.Client - Data chan *pusher.Event - Trade chan *pusher.Event +type websocketEventRequest struct { + Event string `json:"event"` + Data websocketData `json:"data"` } -// PusherOrderbook holds order book information to be pushed -type PusherOrderbook struct { - Asks [][]string `json:"asks"` - Bids [][]string `json:"bids"` - Timestamp int64 `json:"timestamp,string"` +type websocketData struct { + Channel string `json:"channel"` } -// PusherTrade holds trade information to be pushed -type PusherTrade struct { - Price float64 `json:"price"` - Amount float64 `json:"amount"` - ID int64 `json:"id"` - Type int64 `json:"type"` - Timestamp int64 `json:"timestamp,string"` - BuyOrderID int64 `json:"buy_order_id"` - SellOrderID int64 `json:"sell_order_id"` +type websocketResponse struct { + Event string `json:"event"` + Channel string `json:"channel"` } -// PusherOrders defines order information -type PusherOrders struct { - ID int64 `json:"id"` - Amount float64 `json:"amount"` - Price float64 `json:""` +type websocketTradeResponse struct { + websocketResponse + Data websocketTradeData `json:"data"` +} + +type websocketTradeData struct { + Microtimestamp string `json:"microtimestamp"` + Amount float64 `json:"amount"` + BuyOrderID int64 `json:"buy_order_id"` + SellOrderID int64 `json:"sell_order_id"` + AmountStr string `json:"amount_str"` + PriceStr string `json:"price_str"` + Timestamp string `json:"timestamp"` + Price float64 `json:"price"` + Type int `json:"type"` + ID int `json:"id"` +} + +type websocketOrderBookResponse struct { + websocketResponse + Data websocketOrderBook `json:"data"` +} + +type websocketOrderBook struct { + Asks [][]string `json:"asks"` + Bids [][]string `json:"bids"` + Timestamp int64 `json:"timestamp,string"` + Microtimestamp string `json:"microtimestamp"` } diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index b0908fa2..58b4409f 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -3,126 +3,161 @@ package bitstamp import ( "errors" "fmt" + "net/http" + "net/url" "strconv" - "strings" "time" + "github.com/gorilla/websocket" "github.com/thrasher-/gocryptotrader/common" "github.com/thrasher-/gocryptotrader/currency" exchange "github.com/thrasher-/gocryptotrader/exchanges" "github.com/thrasher-/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-/gocryptotrader/exchanges/ticker" log "github.com/thrasher-/gocryptotrader/logger" - pusher "github.com/toorop/go-pusher" ) const ( - // BitstampPusherKey holds the current pusher key - BitstampPusherKey = "de504dc5763aeef9ff52" + bitstampWSURL = "wss://ws.bitstamp.net" ) -var tradingPairs map[string]string - -// findPairFromChannel extracts the capitalized trading pair from the channel and returns it only if enabled in the config -func (b *Bitstamp) findPairFromChannel(channelName string) (string, error) { - split := strings.Split(channelName, "_") - tradingPair := strings.ToUpper(split[len(split)-1]) - - for _, enabledPair := range b.EnabledPairs { - if enabledPair.String() == tradingPair { - return tradingPair, nil - } - } - - return "", errors.New("bistamp_websocket.go error - could not find trading pair") -} - // WsConnect connects to a websocket feed func (b *Bitstamp) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { return errors.New(exchange.WebsocketNotEnabled) } - tradingPairs = make(map[string]string) - var err error - + var dialer websocket.Dialer if b.Websocket.GetProxyAddress() != "" { - log.Warn("bitstamp_websocket.go warning - set proxy address error: proxy not supported") + proxy, err := url.Parse(b.Websocket.GetProxyAddress()) + if err != nil { + return err + } + dialer.Proxy = http.ProxyURL(proxy) } - b.WebsocketConn.Client, err = pusher.NewClient(BitstampPusherKey) + var err error + b.WebsocketConn, _, err = dialer.Dial(b.Websocket.GetWebsocketURL(), http.Header{}) if err != nil { return fmt.Errorf("%s Unable to connect to Websocket. Error: %s", - b.GetName(), + b.Name, err) } - b.WebsocketConn.Data, err = b.WebsocketConn.Client.Bind("data") + if b.Verbose { + log.Debugf("%s Connected to Websocket.\n", b.GetName()) + } + + err = b.seedOrderBook() if err != nil { - return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err) - + b.Websocket.DataHandler <- err } - b.WebsocketConn.Trade, err = b.WebsocketConn.Client.Bind("trade") - if err != nil { - return fmt.Errorf("%s Websocket Bind error: %s", b.GetName(), err) - } - b.GenerateDefaultSubscriptions() - go b.WsReadData() + b.generateDefaultSubscriptions() + go b.WsHandleData() - for _, p := range b.GetEnabledCurrencies() { - orderbookSeed, err := b.GetOrderbook(p.String()) - if err != nil { - return err - } - - var newOrderBook orderbook.Base - - var asks []orderbook.Item - for _, ask := range orderbookSeed.Asks { - var item orderbook.Item - item.Amount = ask.Amount - item.Price = ask.Price - asks = append(asks, item) - } - - var bids []orderbook.Item - for _, bid := range orderbookSeed.Bids { - var item orderbook.Item - item.Amount = bid.Amount - item.Price = bid.Price - bids = append(bids, item) - } - - newOrderBook.Asks = asks - newOrderBook.Bids = bids - newOrderBook.Pair = p - newOrderBook.AssetType = "SPOT" - - err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, b.GetName(), false) - if err != nil { - return err - } - - b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ - Pair: p, - Asset: "SPOT", - Exchange: b.GetName(), - } - - } return nil } -// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (b *Bitstamp) GenerateDefaultSubscriptions() { +// WsReadData reads data coming from bitstamp websocket connection +func (b *Bitstamp) WsReadData() (exchange.WebsocketResponse, error) { + msgType, resp, err := b.WebsocketConn.ReadMessage() + + if err != nil { + return exchange.WebsocketResponse{}, err + } + + if b.Verbose { + log.Debugf("%s websocket raw response: %s", b.GetName(), resp) + } + + b.Websocket.TrafficAlert <- struct{}{} + return exchange.WebsocketResponse{Type: msgType, Raw: resp}, nil +} + +// WsHandleData handles websocket data from WsReadData +func (b *Bitstamp) WsHandleData() { + b.Websocket.Wg.Add(1) + + defer func() { + b.Websocket.Wg.Done() + }() + + for { + select { + case <-b.Websocket.ShutdownC: + return + + default: + resp, err := b.WsReadData() + if err != nil { + b.Websocket.DataHandler <- err + return + } + + wsResponse := websocketResponse{} + err = common.JSONDecode(resp.Raw, &wsResponse) + if err != nil { + b.Websocket.DataHandler <- err + continue + } + + switch wsResponse.Event { + case "bts:request_reconnect": + if b.Verbose { + log.Debugf("%v - Websocket reconnection request received", b.GetName()) + } + go b.Websocket.WebsocketReset() + + case "data": + wsOrderBookTemp := websocketOrderBookResponse{} + err := common.JSONDecode(resp.Raw, &wsOrderBookTemp) + if err != nil { + b.Websocket.DataHandler <- err + continue + } + + currencyPair := common.SplitStrings(wsResponse.Channel, "_") + p := currency.NewPairFromString(common.StringToUpper(currencyPair[3])) + + err = b.wsUpdateOrderbook(wsOrderBookTemp.Data, p, ticker.Spot) + if err != nil { + b.Websocket.DataHandler <- err + continue + } + + case "trade": + wsTradeTemp := websocketTradeResponse{} + + err := common.JSONDecode(resp.Raw, &wsTradeTemp) + if err != nil { + b.Websocket.DataHandler <- err + continue + } + + currencyPair := common.SplitStrings(wsResponse.Channel, "_") + p := currency.NewPairFromString(common.StringToUpper(currencyPair[2])) + + b.Websocket.DataHandler <- exchange.TradeData{ + Price: wsTradeTemp.Data.Price, + Amount: wsTradeTemp.Data.Amount, + CurrencyPair: p, + Exchange: b.GetName(), + AssetType: ticker.Spot, + } + } + } + } +} + +func (b *Bitstamp) generateDefaultSubscriptions() { var channels = []string{"live_trades_", "diff_order_book_"} enabledCurrencies := b.GetEnabledCurrencies() subscriptions := []exchange.WebsocketChannelSubscription{} for i := range channels { for j := range enabledCurrencies { subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{ - Channel: fmt.Sprintf("%v%v", channels[i], enabledCurrencies[j].Lower().String()), - Currency: enabledCurrencies[j], + Channel: fmt.Sprintf("%v%v", channels[i], enabledCurrencies[j].Lower().String()), }) } } @@ -133,88 +168,37 @@ func (b *Bitstamp) GenerateDefaultSubscriptions() { func (b *Bitstamp) Subscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error { b.wsRequestMtx.Lock() defer b.wsRequestMtx.Unlock() - if b.Verbose { - log.Debugf("%v sending message to websocket %v", b.Name, channelToSubscribe) + + req := websocketEventRequest{ + Event: "bts:subscribe", + Data: websocketData{ + Channel: channelToSubscribe.Channel, + }, } - return b.WebsocketConn.Client.Subscribe(channelToSubscribe.Channel) + return b.WebsocketConn.WriteJSON(req) } // Unsubscribe sends a websocket message to stop receiving data from the channel func (b *Bitstamp) Unsubscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error { b.wsRequestMtx.Lock() defer b.wsRequestMtx.Unlock() - if b.Verbose { - log.Debugf("%v sending message to websocket %v", b.Name, channelToSubscribe) + + req := websocketEventRequest{ + Event: "bts:unsubscribe", + Data: websocketData{ + Channel: channelToSubscribe.Channel, + }, } - return b.WebsocketConn.Client.Unsubscribe(channelToSubscribe.Channel) + return b.WebsocketConn.WriteJSON(req) } -// WsReadData reads data coming from bitstamp websocket connection -func (b *Bitstamp) WsReadData() { - b.Websocket.Wg.Add(1) - defer func() { - err := b.WebsocketConn.Client.Close() - if err != nil { - b.Websocket.DataHandler <- fmt.Errorf("bitstamp_websocket.go - Unable to to close Websocket connection. Error: %s", - err) - } - b.Websocket.Wg.Done() - }() - - for { - select { - case <-b.Websocket.ShutdownC: - return - - case data := <-b.WebsocketConn.Data: - b.Websocket.TrafficAlert <- struct{}{} - - result := PusherOrderbook{} - err := common.JSONDecode([]byte(data.Data), &result) - if err != nil { - b.Websocket.DataHandler <- err - continue - } - - currencyPair := common.SplitStrings(data.Channel, "_") - p := currency.NewPairFromString(common.StringToUpper(currencyPair[3])) - - err = b.WsUpdateOrderbook(result, p, "SPOT") - if err != nil { - b.Websocket.DataHandler <- err - continue - } - - case trade := <-b.WebsocketConn.Trade: - b.Websocket.TrafficAlert <- struct{}{} - - result := PusherTrade{} - err := common.JSONDecode([]byte(trade.Data), &result) - if err != nil { - b.Websocket.DataHandler <- err - continue - } - - currencyPair := common.SplitStrings(trade.Channel, "_") - - b.Websocket.DataHandler <- exchange.TradeData{ - Price: result.Price, - Amount: result.Amount, - CurrencyPair: currency.NewPairFromString(currencyPair[2]), - Exchange: b.GetName(), - AssetType: "SPOT", - } - } - } -} - -// WsUpdateOrderbook updates local cache of orderbook information -func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p currency.Pair, assetType string) error { +func (b *Bitstamp) wsUpdateOrderbook(ob websocketOrderBook, p currency.Pair, assetType string) error { if len(ob.Asks) == 0 && len(ob.Bids) == 0 { return errors.New("bitstamp_websocket.go error - no orderbook data") } var asks, bids []orderbook.Item + if len(ob.Asks) > 0 { for _, ask := range ob.Asks { target, err := strconv.ParseFloat(ask[0], 64) @@ -264,3 +248,47 @@ func (b *Bitstamp) WsUpdateOrderbook(ob PusherOrderbook, p currency.Pair, assetT return nil } + +func (b *Bitstamp) seedOrderBook() error { + p := b.GetEnabledCurrencies() + for x := range p { + orderbookSeed, err := b.GetOrderbook(p[x].String()) + if err != nil { + return err + } + + var newOrderBook orderbook.Base + var asks, bids []orderbook.Item + + for _, ask := range orderbookSeed.Asks { + var item orderbook.Item + item.Amount = ask.Amount + item.Price = ask.Price + asks = append(asks, item) + } + + for _, bid := range orderbookSeed.Bids { + var item orderbook.Item + item.Amount = bid.Amount + item.Price = bid.Price + bids = append(bids, item) + } + + newOrderBook.Asks = asks + newOrderBook.Bids = bids + newOrderBook.Pair = p[x] + newOrderBook.AssetType = ticker.Spot + + err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, b.GetName(), false) + if err != nil { + return err + } + + b.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{ + Pair: p[x], + Asset: ticker.Spot, + Exchange: b.GetName(), + } + } + return nil +} diff --git a/go.mod b/go.mod index 6f6e6fdc..edd6a53b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,5 @@ require ( github.com/google/go-querystring v1.0.0 github.com/gorilla/mux v1.7.2 github.com/gorilla/websocket v1.4.0 - github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f ) diff --git a/go.sum b/go.sum index eb88c36e..1d5e8821 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb h1:9kcmLvQdiIecpgVEL3/+J5QIP/ElRBJDljOay0SvqnA= -github.com/toorop/go-pusher v0.0.0-20180521062818-4521e2eb39fb/go.mod h1:VTLqNCX1tXrur6pdIRCl8Q90FR7nw/mEBdyMkWMcsb0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=