Files
gocryptotrader/exchanges/hitbtc/hitbtc_websocket.go
Andrew 3de1d94e5f New logging system (#319)
* First pass at adding new logging system

* NewLogger

* NewLogger

* WIP

* silly bug fix

* :D removed files

* removed old logging interface

* added tests

* added tests

* Started to add new lines to all f calls

* Added subsystem log types

* Logger improvements

* Further performance improvements

* changes to logger and sublogger creation

* Renamed Logging types

* removed old print statement

* changes based on feedback

* moved sublogger types to own file

* :)

* added console as output type

* added get level command

* added get/set log level via grpc command

* added check for output being empty for migration support

* first pass at log rotation

* added log rotation

* :D derp fixed

* added tests

* changes based on feedback

* changed log type

* comments

* renamed file -> fileSettings

* typo fix

* changes based on feedback

* gofmt ran on additional files

* gofmt ran on additional files
2019-07-07 05:20:31 +10:00

502 lines
14 KiB
Go

package hitbtc
import (
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/common/crypto"
"github.com/thrasher-/gocryptotrader/currency"
exchange "github.com/thrasher-/gocryptotrader/exchanges"
"github.com/thrasher-/gocryptotrader/exchanges/asset"
"github.com/thrasher-/gocryptotrader/exchanges/nonce"
"github.com/thrasher-/gocryptotrader/exchanges/orderbook"
log "github.com/thrasher-/gocryptotrader/logger"
)
const (
hitbtcWebsocketAddress = "wss://api.hitbtc.com/api/2/ws"
rpcVersion = "2.0"
)
var requestID nonce.Nonce
// WsConnect starts a new connection with the websocket API
func (h *HitBTC) WsConnect() error {
if !h.Websocket.IsEnabled() || !h.IsEnabled() {
return errors.New(exchange.WebsocketNotEnabled)
}
var dialer websocket.Dialer
if h.Websocket.GetProxyAddress() != "" {
proxy, err := url.Parse(h.Websocket.GetProxyAddress())
if err != nil {
return err
}
dialer.Proxy = http.ProxyURL(proxy)
}
var err error
h.WebsocketConn, _, err = dialer.Dial(hitbtcWebsocketAddress, http.Header{})
if err != nil {
return err
}
go h.WsHandleData()
err = h.wsLogin()
if err != nil {
log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", h.Name, err)
}
h.GenerateDefaultSubscriptions()
return nil
}
// WsReadData reads from the websocket connection
func (h *HitBTC) WsReadData() (exchange.WebsocketResponse, error) {
_, resp, err := h.WebsocketConn.ReadMessage()
if err != nil {
return exchange.WebsocketResponse{}, err
}
h.Websocket.TrafficAlert <- struct{}{}
return exchange.WebsocketResponse{Raw: resp}, nil
}
// WsHandleData handles websocket data
func (h *HitBTC) WsHandleData() {
h.Websocket.Wg.Add(1)
defer func() {
h.Websocket.Wg.Done()
}()
for {
select {
case <-h.Websocket.ShutdownC:
return
default:
resp, err := h.WsReadData()
if err != nil {
h.Websocket.DataHandler <- err
return
}
var init capture
err = common.JSONDecode(resp.Raw, &init)
if err != nil {
h.Websocket.DataHandler <- err
continue
}
if init.Error.Message != "" || init.Error.Code != 0 {
if init.Error.Code == 1002 {
h.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
h.Websocket.DataHandler <- fmt.Errorf("hitbtc.go error - Code: %d, Message: %s",
init.Error.Code,
init.Error.Message)
continue
}
if _, ok := init.Result.(bool); ok {
continue
}
if init.Method != "" {
h.handleSubscriptionUpdates(resp, init)
} else {
h.handleCommandResponses(resp, init)
}
}
}
}
func (h *HitBTC) handleSubscriptionUpdates(resp exchange.WebsocketResponse, init capture) {
switch init.Method {
case "ticker":
var ticker WsTicker
err := common.JSONDecode(resp.Raw, &ticker)
if err != nil {
h.Websocket.DataHandler <- err
return
}
ts, err := time.Parse(time.RFC3339, ticker.Params.Timestamp)
if err != nil {
h.Websocket.DataHandler <- err
return
}
h.Websocket.DataHandler <- exchange.TickerData{
Exchange: h.GetName(),
AssetType: asset.Spot,
Pair: currency.NewPairFromString(ticker.Params.Symbol),
Quantity: ticker.Params.Volume,
Timestamp: ts,
OpenPrice: ticker.Params.Open,
HighPrice: ticker.Params.High,
LowPrice: ticker.Params.Low,
}
case "snapshotOrderbook":
var obSnapshot WsOrderbook
err := common.JSONDecode(resp.Raw, &obSnapshot)
if err != nil {
h.Websocket.DataHandler <- err
}
err = h.WsProcessOrderbookSnapshot(obSnapshot)
if err != nil {
h.Websocket.DataHandler <- err
}
case "updateOrderbook":
var obUpdate WsOrderbook
err := common.JSONDecode(resp.Raw, &obUpdate)
if err != nil {
h.Websocket.DataHandler <- err
}
h.WsProcessOrderbookUpdate(obUpdate)
case "snapshotTrades":
var tradeSnapshot WsTrade
err := common.JSONDecode(resp.Raw, &tradeSnapshot)
if err != nil {
h.Websocket.DataHandler <- err
}
case "updateTrades":
var tradeUpdates WsTrade
err := common.JSONDecode(resp.Raw, &tradeUpdates)
if err != nil {
h.Websocket.DataHandler <- err
}
case "activeOrders":
var activeOrders WsActiveOrdersResponse
err := common.JSONDecode(resp.Raw, &activeOrders)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- activeOrders
case "report":
var reportData WsReportResponse
err := common.JSONDecode(resp.Raw, &reportData)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- reportData
}
}
func (h *HitBTC) handleCommandResponses(resp exchange.WebsocketResponse, init capture) {
switch resultType := init.Result.(type) {
case map[string]interface{}:
switch resultType["reportType"].(string) {
case "new":
var response WsSubmitOrderSuccessResponse
err := common.JSONDecode(resp.Raw, &response)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- response
case "canceled":
var response WsCancelOrderResponse
err := common.JSONDecode(resp.Raw, &response)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- response
case "replaced":
var response WsReplaceOrderResponse
err := common.JSONDecode(resp.Raw, &response)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- response
}
case []interface{}:
if len(resultType) == 0 {
h.Websocket.DataHandler <- fmt.Sprintf("No data returned. ID: %v", init.ID)
return
}
data := resultType[0].(map[string]interface{})
if _, ok := data["clientOrderId"]; ok {
var response WsActiveOrdersResponse
err := common.JSONDecode(resp.Raw, &response)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- response
} else if _, ok := data["available"]; ok {
var response WsGetTradingBalanceResponse
err := common.JSONDecode(resp.Raw, &response)
if err != nil {
h.Websocket.DataHandler <- err
}
h.Websocket.DataHandler <- response
}
}
}
// WsProcessOrderbookSnapshot processes a full orderbook snapshot to a local cache
func (h *HitBTC) WsProcessOrderbookSnapshot(ob WsOrderbook) error {
if len(ob.Params.Bid) == 0 || len(ob.Params.Ask) == 0 {
return errors.New("hitbtc.go error - no orderbooks to process")
}
var bids []orderbook.Item
for _, bid := range ob.Params.Bid {
bids = append(bids, orderbook.Item{Amount: bid.Size, Price: bid.Price})
}
var asks []orderbook.Item
for _, ask := range ob.Params.Ask {
asks = append(asks, orderbook.Item{Amount: ask.Size, Price: ask.Price})
}
p := currency.NewPairFromString(ob.Params.Symbol)
var newOrderBook orderbook.Base
newOrderBook.Asks = asks
newOrderBook.Bids = bids
newOrderBook.AssetType = asset.Spot
newOrderBook.LastUpdated = time.Now()
newOrderBook.Pair = p
err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, h.GetName(), false)
if err != nil {
return err
}
h.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: h.GetName(),
Asset: asset.Spot,
Pair: p,
}
return nil
}
// WsProcessOrderbookUpdate updates a local cache
func (h *HitBTC) WsProcessOrderbookUpdate(ob WsOrderbook) error {
if len(ob.Params.Bid) == 0 && len(ob.Params.Ask) == 0 {
return errors.New("hitbtc_websocket.go error - no data")
}
var bids, asks []orderbook.Item
for _, bid := range ob.Params.Bid {
bids = append(bids, orderbook.Item{Price: bid.Price, Amount: bid.Size})
}
for _, ask := range ob.Params.Ask {
asks = append(asks, orderbook.Item{Price: ask.Price, Amount: ask.Size})
}
p := currency.NewPairFromString(ob.Params.Symbol)
err := h.Websocket.Orderbook.Update(bids, asks, p, time.Now(), h.GetName(), asset.Spot)
if err != nil {
return err
}
h.Websocket.DataHandler <- exchange.WebsocketOrderbookUpdate{
Exchange: h.GetName(),
Asset: asset.Spot,
Pair: p,
}
return nil
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (h *HitBTC) GenerateDefaultSubscriptions() {
var channels = []string{"subscribeTicker", "subscribeOrderbook", "subscribeTrades", "subscribeCandles"}
var subscriptions []exchange.WebsocketChannelSubscription
if h.Websocket.CanUseAuthenticatedEndpoints() {
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
Channel: "subscribeReports",
})
}
enabledCurrencies := h.GetEnabledPairs(asset.Spot)
for i := range channels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = ""
subscriptions = append(subscriptions, exchange.WebsocketChannelSubscription{
Channel: channels[i],
Currency: enabledCurrencies[j],
})
}
}
h.Websocket.SubscribeToChannels(subscriptions)
}
// Subscribe sends a websocket message to receive data from the channel
func (h *HitBTC) Subscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
subscribe := WsNotification{
Method: channelToSubscribe.Channel,
}
if channelToSubscribe.Currency.String() != "" {
subscribe.Params = params{
Symbol: channelToSubscribe.Currency.String(),
}
}
if strings.EqualFold(channelToSubscribe.Channel, "subscribeTrades") {
subscribe.Params = params{
Symbol: channelToSubscribe.Currency.String(),
Limit: 100,
}
} else if strings.EqualFold(channelToSubscribe.Channel, "subscribeCandles") {
subscribe.Params = params{
Symbol: channelToSubscribe.Currency.String(),
Period: "M30",
Limit: 100,
}
}
return h.wsSend(subscribe)
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HitBTC) Unsubscribe(channelToSubscribe exchange.WebsocketChannelSubscription) error {
unsubscribeChannel := strings.Replace(channelToSubscribe.Channel, "subscribe", "unsubscribe", 1)
subscribe := WsNotification{
JSONRPCVersion: rpcVersion,
Method: unsubscribeChannel,
Params: params{
Symbol: channelToSubscribe.Currency.String(),
},
}
if strings.EqualFold(unsubscribeChannel, "unsubscribeTrades") {
subscribe.Params = params{
Symbol: channelToSubscribe.Currency.String(),
Limit: 100,
}
} else if strings.EqualFold(unsubscribeChannel, "unsubscribeCandles") {
subscribe.Params = params{
Symbol: channelToSubscribe.Currency.String(),
Period: "M30",
Limit: 100,
}
}
return h.wsSend(subscribe)
}
// WsSend sends data to the websocket server
func (h *HitBTC) wsSend(data interface{}) error {
h.wsRequestMtx.Lock()
defer h.wsRequestMtx.Unlock()
json, err := common.JSONEncode(data)
if err != nil {
return err
}
if h.Verbose {
log.Debugf(log.ExchangeSys, "%v sending message to websocket %v", h.Name, string(json))
}
return h.WebsocketConn.WriteMessage(websocket.TextMessage, json)
}
// Unsubscribe sends a websocket message to stop receiving data from the channel
func (h *HitBTC) wsLogin() error {
if !h.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
return fmt.Errorf("%v AuthenticatedWebsocketAPISupport not enabled", h.Name)
}
h.Websocket.SetCanUseAuthenticatedEndpoints(true)
nonce := fmt.Sprintf("%v", time.Now().Unix())
hmac := crypto.GetHMAC(crypto.HashSHA256, []byte(nonce), []byte(h.API.Credentials.Secret))
request := WsLoginRequest{
Method: "login",
Params: WsLoginData{
Algo: "HS256",
PKey: h.API.Credentials.Key,
Nonce: nonce,
Signature: crypto.HexEncodeToString(hmac),
},
}
err := h.wsSend(request)
if err != nil {
h.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
}
return nil
}
// wsPlaceOrder sends a websocket message to submit an order
func (h *HitBTC) wsPlaceOrder(pair currency.Pair, side string, price, quantity float64) error {
if !h.Websocket.CanUseAuthenticatedEndpoints() {
return fmt.Errorf("%v not authenticated, cannot place order", h.Name)
}
request := WsSubmitOrderRequest{
Method: "newOrder",
Params: WsSubmitOrderRequestData{
ClientOrderID: fmt.Sprintf("%v", time.Now().Unix()),
Symbol: pair,
Side: strings.ToLower(side),
Price: price,
Quantity: quantity,
},
ID: int64(requestID.GetInc()),
}
return h.wsSend(request)
}
// wsCancelOrder sends a websocket message to cancel an order
func (h *HitBTC) wsCancelOrder(clientOrderID string) error {
if !h.Websocket.CanUseAuthenticatedEndpoints() {
return fmt.Errorf("%v not authenticated, cannot place order", h.Name)
}
request := WsCancelOrderRequest{
Method: "cancelOrder",
Params: WsCancelOrderRequestData{
ClientOrderID: clientOrderID,
},
ID: int64(requestID.GetInc()),
}
return h.wsSend(request)
}
// wsReplaceOrder sends a websocket message to replace an order
func (h *HitBTC) wsReplaceOrder(clientOrderID string, quantity, price float64) error {
if !h.Websocket.CanUseAuthenticatedEndpoints() {
return fmt.Errorf("%v not authenticated, cannot place order", h.Name)
}
request := WsReplaceOrderRequest{
Method: "cancelReplaceOrder",
Params: WsReplaceOrderRequestData{
ClientOrderID: clientOrderID,
RequestClientID: fmt.Sprintf("%v", time.Now().Unix()),
Quantity: quantity,
Price: price,
},
ID: int64(requestID.GetInc()),
}
return h.wsSend(request)
}
// wsGetActiveOrders sends a websocket message to get all active orders
func (h *HitBTC) wsGetActiveOrders() error {
if !h.Websocket.CanUseAuthenticatedEndpoints() {
return fmt.Errorf("%v not authenticated, cannot place order", h.Name)
}
request := WsReplaceOrderRequest{
Method: "getOrders",
Params: WsReplaceOrderRequestData{},
ID: int64(requestID.GetInc()),
}
return h.wsSend(request)
}
// wsGetTradingBalance sends a websocket message to get trading balance
func (h *HitBTC) wsGetTradingBalance() error {
if !h.Websocket.CanUseAuthenticatedEndpoints() {
return fmt.Errorf("%v not authenticated, cannot place order", h.Name)
}
request := WsReplaceOrderRequest{
Method: "getTradingBalance",
Params: WsReplaceOrderRequestData{},
ID: int64(requestID.GetInc()),
}
return h.wsSend(request)
}