diff --git a/engine/portfolio.go b/engine/portfolio.go index 26e4644b..ce53da02 100644 --- a/engine/portfolio.go +++ b/engine/portfolio.go @@ -33,6 +33,8 @@ func (p *portfolioManager) Start() error { Bot.Portfolio = &portfolio.Portfolio Bot.Portfolio.Seed(Bot.Config.Portfolio) p.shutdown = make(chan struct{}) + portfolio.Verbose = Bot.Settings.Verbose + go p.run() return nil } @@ -73,13 +75,19 @@ func (p *portfolioManager) processPortfolio() { pf := portfolio.GetPortfolio() data := pf.GetPortfolioGroupedCoin() for key, value := range data { - success := pf.UpdatePortfolio(value, key) - if success { - log.Debugf(log.PortfolioMgr, - "Portfolio manager: Successfully updated address balance for %s address(es) %s\n", - key, value, - ) + err := pf.UpdatePortfolio(value, key) + if err != nil { + log.Errorf(log.PortfolioMgr, + "PortfolioWatcher error %s for currency %s\n", + err, + key) + continue } + + log.Debugf(log.PortfolioMgr, + "Portfolio manager: Successfully updated address balance for %s address(es) %s\n", + key, + value) } SeedExchangeAccountInfo(GetAllEnabledExchangeAccountInfo().Data) } diff --git a/engine/routines.go b/engine/routines.go index 87ded1e9..b3622a2c 100644 --- a/engine/routines.go +++ b/engine/routines.go @@ -57,11 +57,13 @@ func printConvertCurrencyFormat(origCurrency currency.Code, origPrice float64) s ) } -func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.Item, exchangeName string, err error) { +func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.Item, exchangeName, protocol string, err error) { if err != nil { - log.Errorf(log.Ticker, "Failed to get %s %s ticker. Error: %s\n", - p.String(), + log.Errorf(log.Ticker, "Failed to get %s %s %s %s ticker. Error: %s\n", exchangeName, + protocol, + p, + assetType, err) return } @@ -70,9 +72,10 @@ func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.I if p.Quote.IsFiatCurrency() && p.Quote != Bot.Config.Currency.FiatDisplayCurrency { origCurrency := p.Quote.Upper() - log.Infof(log.Ticker, "%s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), printConvertCurrencyFormat(origCurrency, result.Last), printConvertCurrencyFormat(origCurrency, result.Ask), @@ -83,9 +86,10 @@ func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.I } else { if p.Quote.IsFiatCurrency() && p.Quote == Bot.Config.Currency.FiatDisplayCurrency { - log.Infof(log.Ticker, "%s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %s Ask %s Bid %s High %s Low %s Volume %.8f\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), printCurrencyFormat(result.Last), printCurrencyFormat(result.Ask), @@ -94,9 +98,10 @@ func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.I printCurrencyFormat(result.Low), result.Volume) } else { - log.Infof(log.Ticker, "%s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f\n", + log.Infof(log.Ticker, "%s %s %s %s: TICKER: Last %.8f Ask %.8f Bid %.8f High %.8f Low %.8f Volume %.8f\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), result.Last, result.Ask, @@ -108,11 +113,12 @@ func printTickerSummary(result *ticker.Price, p currency.Pair, assetType asset.I } } -func printOrderbookSummary(result *orderbook.Base, p currency.Pair, assetType asset.Item, exchangeName string, err error) { +func printOrderbookSummary(result *orderbook.Base, p currency.Pair, assetType asset.Item, exchangeName, protocol string, err error) { if err != nil { - log.Errorf(log.OrderBook, "Failed to get %s %s orderbook of type %s. Error: %s\n", - p, + log.Errorf(log.OrderBook, "Failed to get %s %s %s orderbook of type %s. Error: %s\n", exchangeName, + protocol, + p, assetType, err) return @@ -124,47 +130,50 @@ func printOrderbookSummary(result *orderbook.Base, p currency.Pair, assetType as if p.Quote.IsFiatCurrency() && p.Quote != Bot.Config.Currency.FiatDisplayCurrency { origCurrency := p.Quote.Upper() - log.Infof(log.OrderBook, "%s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n", + log.Infof(log.OrderBook, "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), len(result.Bids), bidsAmount, - p.Base.String(), + p.Base, printConvertCurrencyFormat(origCurrency, bidsValue), len(result.Asks), asksAmount, - p.Base.String(), + p.Base, printConvertCurrencyFormat(origCurrency, asksValue), ) } else { if p.Quote.IsFiatCurrency() && p.Quote == Bot.Config.Currency.FiatDisplayCurrency { - log.Infof(log.OrderBook, "%s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n", + log.Infof(log.OrderBook, "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %s Asks len: %d Amount: %f %s. Total value: %s\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), len(result.Bids), bidsAmount, - p.Base.String(), + p.Base, printCurrencyFormat(bidsValue), len(result.Asks), asksAmount, - p.Base.String(), + p.Base, printCurrencyFormat(asksValue), ) } else { - log.Infof(log.OrderBook, "%s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %f Asks len: %d Amount: %f %s. Total value: %f\n", + log.Infof(log.OrderBook, "%s %s %s %s: ORDERBOOK: Bids len: %d Amount: %f %s. Total value: %f Asks len: %d Amount: %f %s. Total value: %f\n", exchangeName, - FormatCurrency(p).String(), + protocol, + FormatCurrency(p), strings.ToUpper(assetType.String()), len(result.Bids), bidsAmount, - p.Base.String(), + p.Base, bidsValue, len(result.Asks), asksAmount, - p.Base.String(), + p.Base, asksValue, ) } @@ -278,73 +287,71 @@ func WebsocketDataHandler(ws *wshandler.Websocket) { log.Warnf(log.WebsocketMgr, "routines.go warning - exchange %s websocket not enabled\n", ws.GetName()) } - default: log.Info(log.WebsocketMgr, d) } - case error: log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data) case wshandler.TradeData: - // Trade Data - // if Bot.Settings.Verbose { - // log.Println("Websocket trades Updated: ", data.(exchange.TradeData)) - // } - - case wshandler.TickerData: - // Ticker data - // if Bot.Settings.Verbose { - // log.Println("Websocket Ticker Updated: ", data.(exchange.TickerData)) - // } - - tickerNew := ticker.Price{ - Last: d.Last, - High: d.High, - Low: d.Low, - Bid: d.Bid, - Ask: d.Ask, - Volume: d.Volume, - QuoteVolume: d.QuoteVolume, - PriceATH: d.PriceATH, - Open: d.Open, - Close: d.Close, - Pair: d.Pair, - LastUpdated: d.Timestamp, + // Websocket Trade Data + if Bot.Settings.Verbose { + log.Infof(log.WebsocketMgr, "%s websocket %s %s trade updated %+v\n", + ws.GetName(), + FormatCurrency(d.CurrencyPair), + d.AssetType, + d) } + case wshandler.FundingData: + // Websocket Funding Data + if Bot.Settings.Verbose { + log.Infof(log.WebsocketMgr, "%s websocket %s %s funding updated %+v\n", + ws.GetName(), + FormatCurrency(d.CurrencyPair), + d.AssetType, + d) + } + case *ticker.Price: + // Websocket Ticker Data if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil { Bot.ExchangeCurrencyPairManager.update(ws.GetName(), - d.Pair, d.AssetType, SyncItemTicker, nil) + d.Pair, + d.AssetType, + SyncItemTicker, + nil) } - err := ticker.ProcessTicker(ws.GetName(), &tickerNew, d.AssetType) - if err != nil { - log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), err) - } - printTickerSummary(&tickerNew, tickerNew.Pair, d.AssetType, ws.GetName(), nil) + err := ticker.ProcessTicker(ws.GetName(), d, d.AssetType) + printTickerSummary(d, d.Pair, d.AssetType, ws.GetName(), "websocket", err) case wshandler.KlineData: - // Kline data + // Websocket Kline Data if Bot.Settings.Verbose { - log.Infof(log.WebsocketMgr, "Websocket Kline Updated: %v\n", d) + log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v\n", + ws.GetName(), + FormatCurrency(d.Pair), + d.AssetType, + d) } case wshandler.WebsocketOrderbookUpdate: - // Orderbook data + // Websocket Orderbook Data result := data.(wshandler.WebsocketOrderbookUpdate) if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil { Bot.ExchangeCurrencyPairManager.update(ws.GetName(), - result.Pair, result.Asset, SyncItemOrderbook, nil) + result.Pair, + result.Asset, + SyncItemOrderbook, + nil) } - // TO-DO: printOrderbookSummary - //nolint:gocritic + if Bot.Settings.Verbose { log.Infof(log.WebsocketMgr, - "Websocket %s %s orderbook updated\n", + "%s websocket %s %s orderbook updated\n", ws.GetName(), FormatCurrency(result.Pair), - ) + d.Asset) } default: if Bot.Settings.Verbose { log.Warnf(log.WebsocketMgr, - "Websocket %s Unknown type: %v\n", + "%s websocket Unknown type: %+v\n", ws.GetName(), d) } diff --git a/engine/syncer.go b/engine/syncer.go index 0f816d98..12329fd8 100644 --- a/engine/syncer.go +++ b/engine/syncer.go @@ -413,7 +413,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { } else { result, err = Bot.Exchanges[x].UpdateTicker(c.Pair, c.AssetType) } - printTickerSummary(result, c.Pair, c.AssetType, exchangeName, err) + printTickerSummary(result, c.Pair, c.AssetType, exchangeName, "REST", err) if err == nil { //nolint:gocritic Bot.CommsRelayer.StageTickerData(exchangeName, c.AssetType, result) if Bot.Config.RemoteControl.WebsocketRPC.Enabled { @@ -453,7 +453,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, true) result, err := Bot.Exchanges[x].UpdateOrderbook(c.Pair, c.AssetType) - printOrderbookSummary(result, c.Pair, c.AssetType, exchangeName, err) + printOrderbookSummary(result, c.Pair, c.AssetType, exchangeName, "REST", err) if err == nil { //nolint:gocritic Bot.CommsRelayer.StageOrderbookData(exchangeName, c.AssetType, result) if Bot.Config.RemoteControl.WebsocketRPC.Enabled { diff --git a/exchanges/binance/binance_types.go b/exchanges/binance/binance_types.go index 48a90381..a60e4e0f 100644 --- a/exchanges/binance/binance_types.go +++ b/exchanges/binance/binance_types.go @@ -67,11 +67,11 @@ type OrderbookItem struct { // OrderBookData is resp data from orderbook endpoint type OrderBookData struct { - Code int `json:"code"` - Msg string `json:"msg"` - LastUpdateID int64 `json:"lastUpdateId"` - Bids [][]string `json:"bids"` - Asks [][]string `json:"asks"` + Code int `json:"code"` + Msg string `json:"msg"` + LastUpdateID int64 `json:"lastUpdateId"` + Bids [][2]string `json:"bids"` + Asks [][2]string `json:"asks"` } // OrderBook actual structured data that can be used for orderbook diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 21d40cd3..8bccea52 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -13,6 +13,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" ) @@ -62,6 +63,8 @@ func (b *Binance) WsConnect() error { } b.WebsocketConn.URL = wsurl + b.WebsocketConn.Verbose = b.Verbose + err = b.WebsocketConn.Dial(&dialer, http.Header{}) if err != nil { return fmt.Errorf("%v - Unable to connect to Websocket. Error: %s", @@ -131,7 +134,7 @@ func (b *Binance) WsHandleData() { b.Websocket.DataHandler <- wshandler.TradeData{ CurrencyPair: currency.NewPairFromFormattedPairs(trade.Symbol, b.GetEnabledPairs(asset.Spot), b.GetPairFormat(asset.Spot, true)), - Timestamp: time.Unix(0, trade.TimeStamp), + Timestamp: time.Unix(0, trade.TimeStamp*int64(time.Millisecond)), Price: price, Amount: amount, Exchange: b.Name, @@ -149,25 +152,25 @@ func (b *Binance) WsHandleData() { continue } - b.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: b.Name, - Open: t.OpenPrice, - Close: t.ClosePrice, - Volume: t.TotalTradedVolume, - QuoteVolume: t.TotalTradedQuoteVolume, - High: t.HighPrice, - Low: t.LowPrice, - Bid: t.BestBidPrice, - Ask: t.BestAskPrice, - Last: t.LastPrice, - Timestamp: time.Unix(0, t.EventTime), - AssetType: asset.Spot, + b.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: b.Name, + Open: t.OpenPrice, + Close: t.ClosePrice, + Volume: t.TotalTradedVolume, + QuoteVolume: t.TotalTradedQuoteVolume, + High: t.HighPrice, + Low: t.LowPrice, + Bid: t.BestBidPrice, + Ask: t.BestAskPrice, + Last: t.LastPrice, + LastUpdated: time.Unix(0, t.EventTime*int64(time.Millisecond)), + AssetType: asset.Spot, Pair: currency.NewPairFromFormattedPairs(t.Symbol, b.GetEnabledPairs(asset.Spot), b.GetPairFormat(asset.Spot, true)), } continue - case "kline": + case "kline_1m": kline := KlineStream{} err := json.Unmarshal(multiStreamData.Data, &kline) if err != nil { @@ -178,13 +181,13 @@ func (b *Binance) WsHandleData() { } var wsKline wshandler.KlineData - wsKline.Timestamp = time.Unix(0, kline.EventTime) + wsKline.Timestamp = time.Unix(0, kline.EventTime*int64(time.Millisecond)) wsKline.Pair = currency.NewPairFromFormattedPairs(kline.Symbol, b.GetEnabledPairs(asset.Spot), b.GetPairFormat(asset.Spot, true)) wsKline.AssetType = asset.Spot wsKline.Exchange = b.Name - wsKline.StartTime = time.Unix(0, kline.Kline.StartTime) - wsKline.CloseTime = time.Unix(0, kline.Kline.CloseTime) + wsKline.StartTime = time.Unix(0, kline.Kline.StartTime*int64(time.Millisecond)) + wsKline.CloseTime = time.Unix(0, kline.Kline.CloseTime*int64(time.Millisecond)) wsKline.Interval = kline.Kline.Interval wsKline.OpenPrice, _ = strconv.ParseFloat(kline.Kline.OpenPrice, 64) wsKline.ClosePrice, _ = strconv.ParseFloat(kline.Kline.ClosePrice, 64) @@ -249,7 +252,6 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { }) } - newOrderBook.LastUpdated = time.Unix(orderbookNew.LastUpdateID, 0) newOrderBook.Pair = p newOrderBook.AssetType = asset.Spot newOrderBook.ExchangeName = b.Name diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index 7ef5ceb6..561cd525 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -385,7 +385,7 @@ type WebsocketChanInfo struct { // WebsocketBook holds booking information type WebsocketBook struct { Price float64 - ID int + ID int64 Amount float64 } @@ -395,6 +395,10 @@ type WebsocketTrade struct { Timestamp int64 Price float64 Amount float64 + // Funding rate of the trade + Rate float64 + // Funding offer period in days + Period int64 } // WebsocketCandle candle data diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 2b54be05..ac8e1a61 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -17,6 +17,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -122,6 +123,12 @@ func (b *Bitfinex) WsDataHandler() { } case "[]interface {}": chanData := result.([]interface{}) + if hb, ok := chanData[1].(string); ok { + // Capturing heart beat + if hb == "hb" { + continue + } + } chanID := int(chanData[0].(float64)) chanInfo, ok := b.WebsocketSubdChannels[chanID] if !ok && chanID != 0 { @@ -135,14 +142,14 @@ func (b *Bitfinex) WsDataHandler() { var newOrderbook []WebsocketBook curr := currency.NewPairFromString(chanInfo.Pair) if obSnapBundle, ok := chanData[1].([]interface{}); ok { - switch snapshot := obSnapBundle[0].(type) { + switch id := obSnapBundle[0].(type) { case []interface{}: - for i := range snapshot { - obSnap := snapshot[i].([]interface{}) + for i := range obSnapBundle { + data := obSnapBundle[i].([]interface{}) newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int(obSnap[0].(float64)), - Price: obSnap[1].(float64), - Amount: obSnap[2].(float64)}) + ID: int64(data[0].(float64)), + Price: data[1].(float64), + Amount: data[2].(float64)}) } err := b.WsInsertSnapshot(curr, asset.Spot, @@ -153,7 +160,7 @@ func (b *Bitfinex) WsDataHandler() { } case float64: newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int(snapshot), + ID: int64(id), Price: obSnapBundle[1].(float64), Amount: obSnapBundle[2].(float64)}) err := b.WsUpdateOrderbook(curr, @@ -205,38 +212,50 @@ func (b *Bitfinex) WsDataHandler() { continue case wsTicker: tickerData := chanData[1].([]interface{}) - b.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: b.Name, - Bid: tickerData[0].(float64), - Ask: tickerData[2].(float64), - Last: tickerData[6].(float64), - Volume: tickerData[7].(float64), - High: tickerData[8].(float64), - Low: tickerData[9].(float64), - AssetType: asset.Spot, - Pair: currency.NewPairFromString(chanInfo.Pair), + b.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: b.Name, + Bid: tickerData[0].(float64), + Ask: tickerData[2].(float64), + Last: tickerData[6].(float64), + Volume: tickerData[7].(float64), + High: tickerData[8].(float64), + Low: tickerData[9].(float64), + AssetType: asset.Spot, + Pair: currency.NewPairFromString(chanInfo.Pair), } continue case wsTrades: var trades []WebsocketTrade switch len(chanData) { case 2: - data := chanData[1].([]interface{}) - for i := range data { - y := data[i].([]interface{}) - if _, ok := y[0].(string); ok { + snapshot := chanData[1].([]interface{}) + for i := range snapshot { + elem := snapshot[i].([]interface{}) + if len(elem) == 5 { + trades = append(trades, + WebsocketTrade{ + ID: int64(elem[0].(float64)), + Timestamp: int64(elem[1].(float64)), + Amount: elem[3].(float64), + Rate: elem[4].(float64), + Period: int64(elem[4].(float64)), + }) continue } trades = append(trades, WebsocketTrade{ - ID: int64(y[0].(float64)), - Timestamp: int64(y[1].(float64)), - Price: y[3].(float64), - Amount: y[2].(float64)}) + ID: int64(elem[0].(float64)), + Timestamp: int64(elem[1].(float64)), + Price: elem[3].(float64), + Amount: elem[2].(float64), + }) } case 3: - if chanData[1].(string) == wsTradeExecuted { - // the te update contains less data then the "tu" + if chanData[1].(string) == wsTradeExecutionUpdate || + chanData[1].(string) == wsFundingTradeUpdate { + // "(f)te - trade executed" && "(f)tu - trade updated" + // contain the same amount of data + // "(f)te" gets sent first so we can drop "(f)tu" continue } data := chanData[2].([]interface{}) @@ -246,27 +265,42 @@ func (b *Bitfinex) WsDataHandler() { Price: data[3].(float64), Amount: data[2].(float64)}) } - if len(trades) > 0 { - for i := range trades { - side := order.Buy.String() - newAmount := trades[i].Amount - if newAmount < 0 { - side = order.Sell.String() - newAmount *= -1 - } - b.Websocket.DataHandler <- wshandler.TradeData{ + + for i := range trades { + side := order.Buy.String() + newAmount := trades[i].Amount + if newAmount < 0 { + side = order.Sell.String() + newAmount *= -1 + } + + if trades[i].Rate > 0 { + b.Websocket.DataHandler <- wshandler.FundingData{ CurrencyPair: currency.NewPairFromString(chanInfo.Pair), - Timestamp: time.Unix(trades[i].Timestamp, 0), - Price: trades[i].Price, + Timestamp: time.Unix(0, trades[i].Timestamp*int64(time.Millisecond)), Amount: newAmount, Exchange: b.Name, AssetType: asset.Spot, Side: side, + Rate: trades[i].Rate, + Period: trades[i].Period, } + continue + } + + b.Websocket.DataHandler <- wshandler.TradeData{ + CurrencyPair: currency.NewPairFromString(chanInfo.Pair), + Timestamp: time.Unix(0, trades[i].Timestamp*int64(time.Millisecond)), + Price: trades[i].Price, + Amount: newAmount, + Exchange: b.Name, + AssetType: asset.Spot, + Side: side, } - continue } + continue } + if authResp, ok := chanData[1].(string); ok { switch authResp { case wsHeartbeat, pong: @@ -608,15 +642,19 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books } var bid, ask []orderbook.Item for i := range books { - if books[i].Amount >= 0 { - bid = append(bid, orderbook.Item{Amount: books[i].Amount, Price: books[i].Price}) + if books[i].Amount > 0 { + bid = append(bid, orderbook.Item{ + ID: books[i].ID, + Amount: books[i].Amount, + Price: books[i].Price}) } else { - ask = append(ask, orderbook.Item{Amount: books[i].Amount * -1, Price: books[i].Price}) + ask = append(ask, orderbook.Item{ + ID: books[i].ID, + Amount: books[i].Amount * -1, + Price: books[i].Price}) } } - if len(bid) == 0 && len(ask) == 0 { - return errors.New("bitfinex.go error - no orderbooks in item lists") - } + var newOrderBook orderbook.Base newOrderBook.Asks = ask newOrderBook.AssetType = assetType @@ -638,29 +676,41 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books // orderbook sides func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book []WebsocketBook) error { orderbookUpdate := wsorderbook.WebsocketOrderbookUpdate{ - Asks: []orderbook.Item{}, - Bids: []orderbook.Item{}, Asset: assetType, Pair: p, } - for i := 0; i < len(book); i++ { + for i := range book { switch { case book[i].Price > 0: + orderbookUpdate.Action = "update/insert" if book[i].Amount > 0 { // update bid - orderbookUpdate.Bids = append(orderbookUpdate.Bids, orderbook.Item{Amount: book[i].Amount, Price: book[i].Price}) + orderbookUpdate.Bids = append(orderbookUpdate.Bids, + orderbook.Item{ + ID: book[i].ID, + Amount: book[i].Amount, + Price: book[i].Price}) } else if book[i].Amount < 0 { // update ask - orderbookUpdate.Asks = append(orderbookUpdate.Asks, orderbook.Item{Amount: book[i].Amount * -1, Price: book[i].Price}) + orderbookUpdate.Asks = append(orderbookUpdate.Asks, + orderbook.Item{ + ID: book[i].ID, + Amount: book[i].Amount * -1, + Price: book[i].Price}) } case book[i].Price == 0: + orderbookUpdate.Action = "delete" if book[i].Amount == 1 { // delete bid - orderbookUpdate.Bids = append(orderbookUpdate.Bids, orderbook.Item{Amount: 0, Price: book[i].Price}) + orderbookUpdate.Bids = append(orderbookUpdate.Bids, + orderbook.Item{ + ID: book[i].ID}) } else if book[i].Amount == -1 { // delete ask - orderbookUpdate.Asks = append(orderbookUpdate.Asks, orderbook.Item{Amount: 0, Price: book[i].Price}) + orderbookUpdate.Asks = append(orderbookUpdate.Asks, + orderbook.Item{ + ID: book[i].ID}) } } } @@ -716,6 +766,7 @@ func (b *Bitfinex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr req := make(map[string]interface{}) req["event"] = "subscribe" req["channel"] = channelToSubscribe.Channel + if channelToSubscribe.Currency.String() != "" { if channelToSubscribe.Channel == wsCandles { // TODO: Add ability to select timescale @@ -726,11 +777,13 @@ func (b *Bitfinex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr asset.Spot).String() } } + if len(channelToSubscribe.Params) > 0 { for k, v := range channelToSubscribe.Params { req[k] = v } } + return b.WebsocketConn.SendMessage(req) } @@ -775,14 +828,6 @@ func (b *Bitfinex) WsSendAuth() error { return nil } -// WsSendUnauth sends an unauthenticated payload -func (b *Bitfinex) WsSendUnauth() error { - req := make(map[string]string) - req["event"] = "unauth" - - return b.WebsocketConn.SendMessage(req) -} - // WsAddSubscriptionChannel adds a new subscription channel to the // WebsocketSubdChannels map in bitfinex.go (Bitfinex struct) func (b *Bitfinex) WsAddSubscriptionChannel(chanID int, channel, pair string) { diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 41f58100..db9adc90 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -185,10 +185,10 @@ func (b *Bitfinex) Setup(exch *config.ExchangeConfig) error { b.Websocket.Orderbook.Setup( exch.WebsocketOrderbookBufferLimit, + false, + false, + false, true, - false, - false, - false, exch.Name) return nil } diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 79cf815a..205509ad 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -14,7 +14,9 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" log "github.com/thrasher-corp/gocryptotrader/logger" ) @@ -153,6 +155,8 @@ func (b *BTCMarkets) WsHandleData() { Exchange: b.Name, Price: trade.Price, Amount: trade.Volume, + Side: order.SideUnknown.String(), + EventType: order.Unknown.String(), } case tick: var tick WsTick @@ -163,17 +167,18 @@ func (b *BTCMarkets) WsHandleData() { } p := currency.NewPairFromString(tick.Currency) - b.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: b.Name, - Volume: tick.Volume, - High: tick.High24, - Low: tick.Low24h, - Bid: tick.Bid, - Ask: tick.Ask, - Last: tick.Last, - Timestamp: tick.Timestamp, - AssetType: asset.Spot, - Pair: p, + + b.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: b.Name, + Volume: tick.Volume, + High: tick.High24, + Low: tick.Low24h, + Bid: tick.Bid, + Ask: tick.Ask, + Last: tick.Last, + LastUpdated: tick.Timestamp, + AssetType: asset.Spot, + Pair: p, } case fundChange: var transferData WsFundTransfer diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index b5bc3997..4293b2ac 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -70,11 +70,6 @@ func (b *BTSE) WsHandleData() { } switch { case strings.Contains(result["topic"].(string), "tradeHistory"): - log.Warnf(log.ExchangeSys, - "%s: Buy/Sell side functionality is broken for this exchange "+ - "currently! 'gain' has no correlation with buy side or "+ - "sell side", - b.Name) var tradeHistory wsTradeHistory err = json.Unmarshal(resp.Raw, &tradeHistory) if err != nil { @@ -87,8 +82,8 @@ func (b *BTSE) WsHandleData() { side = order.Sell.String() } b.Websocket.DataHandler <- wshandler.TradeData{ - Timestamp: time.Unix(tradeHistory.Data[x].TransactionTime, 0), - CurrencyPair: currency.NewPairFromString(strings.Replace(tradeHistory.Topic, "tradeHistory", "", 1)), + Timestamp: time.Unix(0, tradeHistory.Data[x].TransactionTime*int64(time.Millisecond)), + CurrencyPair: currency.NewPairFromString(strings.Replace(tradeHistory.Topic, "tradeHistory:", "", 1)), AssetType: asset.Spot, Exchange: b.Name, Price: tradeHistory.Data[x].Price, diff --git a/exchanges/btse/btse_wrapper.go b/exchanges/btse/btse_wrapper.go index 3a8e21d0..6575aa82 100644 --- a/exchanges/btse/btse_wrapper.go +++ b/exchanges/btse/btse_wrapper.go @@ -91,12 +91,10 @@ func (b *BTSE) SetDefaults() { CryptoWithdrawalFee: true, }, WebsocketCapabilities: protocol.Features{ - TickerFetching: true, OrderbookFetching: true, + TradeFetching: true, Subscribe: true, Unsubscribe: true, - // TradeHistory is supported but it is currently broken on BTSE's - // API so it has been left as unsupported }, WithdrawPermissions: exchange.NoAPIWithdrawalMethods, }, diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 6994211b..71e3f2b3 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -14,6 +14,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" ) @@ -81,25 +82,25 @@ func (c *CoinbasePro) WsHandleData() { c.Websocket.DataHandler <- errors.New(string(resp.Raw)) case "ticker": - ticker := WebsocketTicker{} - err := json.Unmarshal(resp.Raw, &ticker) + wsTicker := WebsocketTicker{} + err := json.Unmarshal(resp.Raw, &wsTicker) if err != nil { c.Websocket.DataHandler <- err continue } - c.Websocket.DataHandler <- wshandler.TickerData{ - Timestamp: ticker.Time, - Pair: ticker.ProductID, - AssetType: asset.Spot, - Exchange: c.Name, - Open: ticker.Open24H, - High: ticker.High24H, - Low: ticker.Low24H, - Last: ticker.Price, - Volume: ticker.Volume24H, - Bid: ticker.BestBid, - Ask: ticker.BestAsk, + c.Websocket.DataHandler <- &ticker.Price{ + LastUpdated: wsTicker.Time, + Pair: wsTicker.ProductID, + AssetType: asset.Spot, + ExchangeName: c.Name, + Open: wsTicker.Open24H, + High: wsTicker.High24H, + Low: wsTicker.Low24H, + Last: wsTicker.Price, + Volume: wsTicker.Volume24H, + Bid: wsTicker.BestBid, + Ask: wsTicker.BestAsk, } case "snapshot": diff --git a/exchanges/coinbene/coinbene_websocket.go b/exchanges/coinbene/coinbene_websocket.go index 55c87593..69c8c214 100644 --- a/exchanges/coinbene/coinbene_websocket.go +++ b/exchanges/coinbene/coinbene_websocket.go @@ -15,6 +15,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" ) @@ -126,26 +127,26 @@ func (c *Coinbene) WsDataHandler() { } switch { case strings.Contains(result[topic].(string), "ticker"): - var ticker WsTicker - err = json.Unmarshal(stream.Raw, &ticker) + var wsTicker WsTicker + err = json.Unmarshal(stream.Raw, &wsTicker) if err != nil { c.Websocket.DataHandler <- err continue } - for x := range ticker.Data { - c.Websocket.DataHandler <- wshandler.TickerData{ - Volume: ticker.Data[x].Volume24h, - Last: ticker.Data[x].LastPrice, - High: ticker.Data[x].High24h, - Low: ticker.Data[x].Low24h, - Bid: ticker.Data[x].BestBidPrice, - Ask: ticker.Data[x].BestAskPrice, - Pair: currency.NewPairFromFormattedPairs(ticker.Data[x].Symbol, + for x := range wsTicker.Data { + c.Websocket.DataHandler <- &ticker.Price{ + Volume: wsTicker.Data[x].Volume24h, + Last: wsTicker.Data[x].LastPrice, + High: wsTicker.Data[x].High24h, + Low: wsTicker.Data[x].Low24h, + Bid: wsTicker.Data[x].BestBidPrice, + Ask: wsTicker.Data[x].BestAskPrice, + Pair: currency.NewPairFromFormattedPairs(wsTicker.Data[x].Symbol, c.GetEnabledPairs(asset.PerpetualSwap), c.GetPairFormat(asset.PerpetualSwap, true)), - Exchange: c.Name, - AssetType: asset.PerpetualSwap, - Timestamp: ticker.Data[x].Timestamp, + ExchangeName: c.Name, + AssetType: asset.PerpetualSwap, + LastUpdated: wsTicker.Data[x].Timestamp, } } case strings.Contains(result[topic].(string), "tradeList"): diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index a60a2c96..9c1b286e 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -133,25 +134,25 @@ func (c *COINUT) wsProcessResponse(resp []byte) { case "hb": channels["hb"] <- resp case "inst_tick": - var ticker WsTicker - err := json.Unmarshal(resp, &ticker) + var wsTicker WsTicker + err := json.Unmarshal(resp, &wsTicker) if err != nil { c.Websocket.DataHandler <- err return } - currencyPair := c.instrumentMap.LookupInstrument(ticker.InstID) - c.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: c.Name, - Volume: ticker.Volume24, - QuoteVolume: ticker.Volume24Quote, - Bid: ticker.HighestBuy, - Ask: ticker.LowestSell, - High: ticker.High24, - Low: ticker.Low24, - Last: ticker.Last, - Timestamp: time.Unix(0, ticker.Timestamp), - AssetType: asset.Spot, + currencyPair := c.instrumentMap.LookupInstrument(wsTicker.InstID) + c.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: c.Name, + Volume: wsTicker.Volume24, + QuoteVolume: wsTicker.Volume24Quote, + Bid: wsTicker.HighestBuy, + Ask: wsTicker.LowestSell, + High: wsTicker.High24, + Low: wsTicker.Low24, + Last: wsTicker.Last, + LastUpdated: time.Unix(0, wsTicker.Timestamp), + AssetType: asset.Spot, Pair: currency.NewPairFromFormattedPairs(currencyPair, c.GetEnabledPairs(asset.Spot), c.GetPairFormat(asset.Spot, true)), diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 8959f1f1..7ab7d5ac 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -15,6 +15,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -122,9 +123,9 @@ func (g *Gateio) WsHandleData() { switch { case strings.Contains(result.Method, "ticker"): - var ticker WebsocketTicker + var wsTicker WebsocketTicker var c string - err = json.Unmarshal(result.Params[1], &ticker) + err = json.Unmarshal(result.Params[1], &wsTicker) if err != nil { g.Websocket.DataHandler <- err continue @@ -136,17 +137,17 @@ func (g *Gateio) WsHandleData() { continue } - g.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: g.Name, - Open: ticker.Open, - Close: ticker.Close, - Volume: ticker.BaseVolume, - QuoteVolume: ticker.QuoteVolume, - High: ticker.High, - Low: ticker.Low, - Last: ticker.Last, - AssetType: asset.Spot, - Pair: currency.NewPairFromString(c), + g.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: g.Name, + Open: wsTicker.Open, + Close: wsTicker.Close, + Volume: wsTicker.BaseVolume, + QuoteVolume: wsTicker.QuoteVolume, + High: wsTicker.High, + Low: wsTicker.Low, + Last: wsTicker.Last, + AssetType: asset.Spot, + Pair: currency.NewPairFromString(c), } case strings.Contains(result.Method, "trades"): diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index e4c23b06..545ef6df 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -297,11 +297,10 @@ func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pa for i := 0; i < len(result.Events); i++ { if result.Events[i].Type == "trade" { g.Websocket.DataHandler <- wshandler.TradeData{ - Timestamp: time.Now(), + Timestamp: time.Unix(0, result.Timestamp), CurrencyPair: pair, AssetType: asset.Spot, Exchange: g.Name, - EventTime: result.Timestamp, Price: result.Events[i].Price, Amount: result.Events[i].Amount, Side: result.Events[i].MakerSide, diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index aa24a6dc..91d3d713 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/nonce" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -105,30 +106,30 @@ func (h *HitBTC) WsHandleData() { func (h *HitBTC) handleSubscriptionUpdates(resp wshandler.WebsocketResponse, init capture) { switch init.Method { case "ticker": - var ticker WsTicker - err := json.Unmarshal(resp.Raw, &ticker) + var wsTicker WsTicker + err := json.Unmarshal(resp.Raw, &wsTicker) if err != nil { h.Websocket.DataHandler <- err return } - ts, err := time.Parse(time.RFC3339, ticker.Params.Timestamp) + ts, err := time.Parse(time.RFC3339, wsTicker.Params.Timestamp) if err != nil { h.Websocket.DataHandler <- err return } - h.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: h.Name, - Open: ticker.Params.Open, - Volume: ticker.Params.Volume, - QuoteVolume: ticker.Params.VolumeQuote, - High: ticker.Params.High, - Low: ticker.Params.Low, - Bid: ticker.Params.Bid, - Ask: ticker.Params.Ask, - Last: ticker.Params.Last, - Timestamp: ts, - AssetType: asset.Spot, - Pair: currency.NewPairFromFormattedPairs(ticker.Params.Symbol, + h.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: h.Name, + Open: wsTicker.Params.Open, + Volume: wsTicker.Params.Volume, + QuoteVolume: wsTicker.Params.VolumeQuote, + High: wsTicker.Params.High, + Low: wsTicker.Params.Low, + Bid: wsTicker.Params.Bid, + Ask: wsTicker.Params.Ask, + Last: wsTicker.Params.Last, + LastUpdated: ts, + AssetType: asset.Spot, + Pair: currency.NewPairFromFormattedPairs(wsTicker.Params.Symbol, h.GetEnabledPairs(asset.Spot), h.GetPairFormat(asset.Spot, true)), } case "snapshotOrderbook": diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 93dda0a8..1c1d5373 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -15,6 +15,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" log "github.com/thrasher-corp/gocryptotrader/logger" ) @@ -250,7 +251,7 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) { } data := strings.Split(kline.Channel, ".") h.Websocket.DataHandler <- wshandler.KlineData{ - Timestamp: time.Unix(0, kline.Timestamp), + Timestamp: time.Unix(0, kline.Timestamp*int64(time.Millisecond)), Exchange: h.Name, AssetType: asset.Spot, Pair: currency.NewPairFromFormattedPairs(data[1], @@ -274,26 +275,26 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) { AssetType: asset.Spot, CurrencyPair: currency.NewPairFromFormattedPairs(data[1], h.GetEnabledPairs(asset.Spot), h.GetPairFormat(asset.Spot, true)), - Timestamp: time.Unix(0, trade.Tick.Timestamp), + Timestamp: time.Unix(0, trade.Tick.Timestamp*int64(time.Millisecond)), } case strings.Contains(init.Channel, "detail"): - var ticker WsTick - err := json.Unmarshal(resp.Raw, &ticker) + var wsTicker WsTick + err := json.Unmarshal(resp.Raw, &wsTicker) if err != nil { h.Websocket.DataHandler <- err return } - data := strings.Split(ticker.Channel, ".") - h.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: h.Name, - Open: ticker.Tick.Open, - Close: ticker.Tick.Close, - Volume: ticker.Tick.Amount, - QuoteVolume: ticker.Tick.Volume, - High: ticker.Tick.High, - Low: ticker.Tick.Low, - Timestamp: time.Unix(0, ticker.Timestamp), - AssetType: asset.Spot, + data := strings.Split(wsTicker.Channel, ".") + h.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: h.Name, + Open: wsTicker.Tick.Open, + Close: wsTicker.Tick.Close, + Volume: wsTicker.Tick.Amount, + QuoteVolume: wsTicker.Tick.Volume, + High: wsTicker.Tick.High, + Low: wsTicker.Tick.Low, + LastUpdated: time.Unix(0, wsTicker.Timestamp*int64(time.Millisecond)), + AssetType: asset.Spot, Pair: currency.NewPairFromFormattedPairs(data[1], h.GetEnabledPairs(asset.Spot), h.GetPairFormat(asset.Spot, true)), } diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index 535a5ed1..357d5b26 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -623,7 +623,7 @@ func (h *HUOBI) GetOrderInfo(orderID string) (order.Detail, error) { CurrencyPair: currency.NewPairFromString(respData.Symbol), OrderType: orderType, OrderSide: orderSide, - OrderDate: time.Unix(respData.CreatedAt, 0), + OrderDate: time.Unix(0, respData.CreatedAt*int64(time.Millisecond)), Status: orderStatus, Price: respData.Price, Amount: respData.Amount, @@ -714,7 +714,7 @@ func (h *HUOBI) GetActiveOrders(req *order.GetOrdersRequest) ([]order.Detail, er CurrencyPair: req.Currencies[i], OrderType: orderType, OrderSide: orderSide, - OrderDate: time.Unix(resp.Data[j].CreatedAt, 0), + OrderDate: time.Unix(0, resp.Data[j].CreatedAt*int64(time.Millisecond)), Status: orderStatus, Price: resp.Data[j].Price, Amount: resp.Data[j].OrderAmount, diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index e6659a99..798377bd 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -15,6 +15,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -507,18 +508,17 @@ func (k *Kraken) wsProcessTickers(channelData *WebsocketChannelData, data map[st return } - k.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: k.Name, - Open: openPrice, - Close: closePrice, - Volume: quantity, - High: highPrice, - Low: lowPrice, - Bid: bid, - Ask: ask, - Timestamp: time.Now(), - AssetType: asset.Spot, - Pair: channelData.Pair, + k.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: k.Name, + Open: openPrice, + Close: closePrice, + Volume: quantity, + High: highPrice, + Low: lowPrice, + Bid: bid, + Ask: ask, + AssetType: asset.Spot, + Pair: channelData.Pair, } } @@ -576,7 +576,6 @@ func (k *Kraken) wsProcessTrades(channelData *WebsocketChannelData, data []inter k.Websocket.DataHandler <- wshandler.TradeData{ AssetType: asset.Spot, CurrencyPair: channelData.Pair, - EventTime: time.Now().Unix(), Exchange: k.Name, Price: price, Amount: amount, diff --git a/exchanges/lakebtc/lakebtc_websocket.go b/exchanges/lakebtc/lakebtc_websocket.go index 0bdf0955..ba512b48 100644 --- a/exchanges/lakebtc/lakebtc_websocket.go +++ b/exchanges/lakebtc/lakebtc_websocket.go @@ -12,6 +12,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" log "github.com/thrasher-corp/gocryptotrader/logger" "github.com/toorop/go-pusher" @@ -152,7 +153,6 @@ func (l *LakeBTC) processTrades(data, channel string) error { AssetType: asset.Spot, Exchange: l.Name, EventType: asset.Spot.String(), - EventTime: tradeData.Trades[i].Date, Price: tradeData.Trades[i].Price, Amount: tradeData.Trades[i].Amount, Side: tradeData.Trades[i].Type, @@ -233,9 +233,9 @@ func (l *LakeBTC) getCurrencyFromChannel(channel string) currency.Pair { return currency.NewPairFromString(curr) } -func (l *LakeBTC) processTicker(ticker string) error { +func (l *LakeBTC) processTicker(wsTicker string) error { var tUpdate map[string]interface{} - err := json.Unmarshal([]byte(ticker), &tUpdate) + err := json.Unmarshal([]byte(wsTicker), &tUpdate) if err != nil { l.Websocket.DataHandler <- err return err @@ -266,16 +266,16 @@ func (l *LakeBTC) processTicker(ticker string) error { return p } - l.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: l.Name, - Bid: processTickerItem(tickerData, order.Buy.Lower()), - High: processTickerItem(tickerData, tickerHighString), - Last: processTickerItem(tickerData, tickerLastString), - Low: processTickerItem(tickerData, tickerLowString), - Ask: processTickerItem(tickerData, order.Sell.Lower()), - Volume: processTickerItem(tickerData, tickerVolumeString), - AssetType: asset.Spot, - Pair: returnCurrency, + l.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: l.Name, + Bid: processTickerItem(tickerData, order.Buy.Lower()), + High: processTickerItem(tickerData, tickerHighString), + Last: processTickerItem(tickerData, tickerLastString), + Low: processTickerItem(tickerData, tickerLowString), + Ask: processTickerItem(tickerData, order.Sell.Lower()), + Volume: processTickerItem(tickerData, tickerVolumeString), + AssetType: asset.Spot, + Pair: returnCurrency, } } return nil diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index c1ae5fb5..ed0df0d5 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -17,6 +17,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -410,20 +411,20 @@ func (o *OKGroup) wsProcessTickers(response *WebsocketDataResponse) { c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash) } - o.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: o.Name, - Open: response.Data[i].Open24h, - Close: response.Data[i].Last, - Volume: response.Data[i].BaseVolume24h, - QuoteVolume: response.Data[i].QuoteVolume24h, - High: response.Data[i].High24h, - Low: response.Data[i].Low24h, - Bid: response.Data[i].BestBid, - Ask: response.Data[i].BestAsk, - Last: response.Data[i].Last, - Timestamp: response.Data[i].Timestamp, - AssetType: o.GetAssetTypeFromTableName(response.Table), - Pair: c, + o.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: o.Name, + Open: response.Data[i].Open24h, + Close: response.Data[i].Last, + Volume: response.Data[i].BaseVolume24h, + QuoteVolume: response.Data[i].QuoteVolume24h, + High: response.Data[i].High24h, + Low: response.Data[i].Low24h, + Bid: response.Data[i].BestBid, + Ask: response.Data[i].BestAsk, + Last: response.Data[i].Last, + LastUpdated: response.Data[i].Timestamp, + AssetType: o.GetAssetTypeFromTableName(response.Table), + Pair: c, } } } @@ -446,7 +447,6 @@ func (o *OKGroup) wsProcessTrades(response *WebsocketDataResponse) { Amount: response.Data[i].Size, AssetType: o.GetAssetTypeFromTableName(response.Table), CurrencyPair: c, - EventTime: time.Now().Unix(), Exchange: o.Name, Price: response.Data[i].WebsocketTradeResponse.Price, Side: response.Data[i].Side, diff --git a/exchanges/order/order_types.go b/exchanges/order/order_types.go index 6df5c9e8..31c44ff0 100644 --- a/exchanges/order/order_types.go +++ b/exchanges/order/order_types.go @@ -100,11 +100,12 @@ type Side string // Order side types const ( - AnySide Side = "ANY" - Buy Side = "BUY" - Sell Side = "SELL" - Bid Side = "BID" - Ask Side = "ASK" + AnySide Side = "ANY" + Buy Side = "BUY" + Sell Side = "SELL" + Bid Side = "BID" + Ask Side = "ASK" + SideUnknown Side = "SIDEUNKNOWN" ) // Detail holds order detail data diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index 18b91525..c54120d0 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -16,6 +16,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wsorderbook" log "github.com/thrasher-corp/gocryptotrader/logger" @@ -278,18 +279,17 @@ func (p *Poloniex) wsHandleTickerData(data []interface{}) { return } - p.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: p.Name, - Volume: t.BaseCurrencyVolume24H, - QuoteVolume: t.QuoteCurrencyVolume24H, - High: t.HighestBid, - Low: t.LowestAsk, - Bid: t.HighestBid, - Ask: t.LowestAsk, - Last: t.LastPrice, - Timestamp: time.Now(), - AssetType: asset.Spot, - Pair: currencyPair, + p.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: p.Name, + Volume: t.BaseCurrencyVolume24H, + QuoteVolume: t.QuoteCurrencyVolume24H, + High: t.HighestBid, + Low: t.LowestAsk, + Bid: t.HighestBid, + Ask: t.LowestAsk, + Last: t.LastPrice, + AssetType: asset.Spot, + Pair: currencyPair, } } diff --git a/exchanges/websocket/wshandler/wshandler_types.go b/exchanges/websocket/wshandler/wshandler_types.go index 22e47266..f6d551ae 100644 --- a/exchanges/websocket/wshandler/wshandler_types.go +++ b/exchanges/websocket/wshandler/wshandler_types.go @@ -60,6 +60,7 @@ type Websocket struct { features *protocol.Features } +// WebsocketSetup defines variables for setting up a websocket connection type WebsocketSetup struct { Enabled bool Verbose bool @@ -103,28 +104,21 @@ type TradeData struct { AssetType asset.Item Exchange string EventType string - EventTime int64 Price float64 Amount float64 Side string } -// TickerData defines ticker feed -type TickerData struct { - Exchange string - Open float64 - Close float64 - Volume float64 - QuoteVolume float64 - High float64 - Low float64 - Bid float64 - Ask float64 - Last float64 - PriceATH float64 - Timestamp time.Time - AssetType asset.Item - Pair currency.Pair +// FundingData defines funding data +type FundingData struct { + Timestamp time.Time + CurrencyPair currency.Pair + AssetType asset.Item + Exchange string + Amount float64 + Rate float64 + Period int64 + Side string } // KlineData defines kline feed diff --git a/exchanges/websocket/wsorderbook/wsorderbook.go b/exchanges/websocket/wsorderbook/wsorderbook.go index 26c4de55..906bc4e8 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook.go +++ b/exchanges/websocket/wsorderbook/wsorderbook.go @@ -196,6 +196,29 @@ func (w *WebsocketOrderbookLocal) updateByIDAndAction(o *orderbook.Base, u *Webs sort.Slice(o.Asks, func(i, j int) bool { return o.Asks[i].Price < o.Asks[j].Price }) + + case "update/insert": + updateBids: + for x := range u.Bids { + for y := range o.Bids { + if o.Bids[y].ID == u.Bids[x].ID { + o.Bids[y].Amount = u.Bids[x].Amount + continue updateBids + } + } + o.Bids = append(o.Bids, u.Bids[x]) + } + + updateAsks: + for x := range u.Asks { + for y := range o.Asks { + if o.Asks[y].ID == u.Asks[x].ID { + o.Asks[y].Amount = u.Asks[x].Amount + continue updateAsks + } + } + o.Asks = append(o.Asks, u.Asks[x]) + } } } diff --git a/exchanges/zb/zb.go b/exchanges/zb/zb.go index dfb5d457..462d5742 100644 --- a/exchanges/zb/zb.go +++ b/exchanges/zb/zb.go @@ -141,22 +141,13 @@ func (z *ZB) GetOrders(currency string, pageindex, side int64) ([]Order, error) func (z *ZB) GetMarkets() (map[string]MarketResponseItem, error) { endpoint := fmt.Sprintf("%s/%s/%s", z.API.Endpoints.URL, zbAPIVersion, zbMarkets) - var res interface{} + var res map[string]MarketResponseItem err := z.SendHTTPRequest(endpoint, &res) if err != nil { return nil, err } - list := res.(map[string]interface{}) - result := map[string]MarketResponseItem{} - for k, v := range list { - item := v.(map[string]interface{}) - result[k] = MarketResponseItem{ - AmountScale: item["amountScale"].(float64), - PriceScale: item["priceScale"].(float64), - } - } - return result, nil + return res, nil } // GetLatestSpotPrice returns latest spot price of symbol diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go index bc4466a9..80777ec4 100644 --- a/exchanges/zb/zb_websocket.go +++ b/exchanges/zb/zb_websocket.go @@ -15,6 +15,7 @@ import ( exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler" log "github.com/thrasher-corp/gocryptotrader/logger" ) @@ -88,25 +89,25 @@ func (z *ZB) WsHandleData() { case strings.Contains(result.Channel, "ticker"): cPair := strings.Split(result.Channel, "_") - var ticker WsTicker - err := json.Unmarshal(fixedJSON, &ticker) + var wsTicker WsTicker + err := json.Unmarshal(fixedJSON, &wsTicker) if err != nil { z.Websocket.DataHandler <- err continue } - z.Websocket.DataHandler <- wshandler.TickerData{ - Exchange: z.Name, - Close: ticker.Data.Last, - Volume: ticker.Data.Volume24Hr, - High: ticker.Data.High, - Low: ticker.Data.Low, - Last: ticker.Data.Last, - Bid: ticker.Data.Buy, - Ask: ticker.Data.Sell, - Timestamp: time.Unix(0, ticker.Date*int64(time.Millisecond)), - AssetType: asset.Spot, - Pair: currency.NewPairFromString(cPair[0]), + z.Websocket.DataHandler <- &ticker.Price{ + ExchangeName: z.Name, + Close: wsTicker.Data.Last, + Volume: wsTicker.Data.Volume24Hr, + High: wsTicker.Data.High, + Low: wsTicker.Data.Low, + Last: wsTicker.Data.Last, + Bid: wsTicker.Data.Buy, + Ask: wsTicker.Data.Sell, + LastUpdated: time.Unix(0, wsTicker.Date*int64(time.Millisecond)), + AssetType: asset.Spot, + Pair: currency.NewPairFromString(cPair[0]), } case strings.Contains(result.Channel, "depth"): @@ -170,11 +171,10 @@ func (z *ZB) WsHandleData() { channelInfo := strings.Split(result.Channel, "_") cPair := currency.NewPairFromString(channelInfo[0]) z.Websocket.DataHandler <- wshandler.TradeData{ - Timestamp: time.Unix(0, t.Date*int64(time.Millisecond)), + Timestamp: time.Unix(t.Date, 0), CurrencyPair: cPair, AssetType: asset.Spot, Exchange: z.Name, - EventTime: t.Date, Price: t.Price, Amount: t.Amount, Side: t.TradeType, diff --git a/portfolio/portfolio.go b/portfolio/portfolio.go index 5f4cd676..a059bf74 100644 --- a/portfolio/portfolio.go +++ b/portfolio/portfolio.go @@ -26,6 +26,9 @@ const ( // Portfolio is variable store holding an array of portfolioAddress var Portfolio Base +// Verbose allows for debug output when sending an http request +var Verbose bool + // GetEthereumBalance single or multiple address information as // EtherchainBalanceResponse func GetEthereumBalance(address string) (EthplorerResponse, error) { @@ -37,8 +40,9 @@ func GetEthereumBalance(address string) (EthplorerResponse, error) { urlPath := fmt.Sprintf( "%s/%s/%s?apiKey=freekey", ethplorerAPIURL, ethplorerAddressInfo, address, ) + result := EthplorerResponse{} - return result, common.SendHTTPGetRequest(urlPath, true, false, &result) + return result, common.SendHTTPGetRequest(urlPath, true, Verbose, &result) } // GetCryptoIDAddress queries CryptoID for an address balance for a @@ -55,7 +59,7 @@ func GetCryptoIDAddress(address string, coinType currency.Code) (float64, error) coinType.Lower(), address) - err = common.SendHTTPGetRequest(url, true, false, &result) + err = common.SendHTTPGetRequest(url, true, Verbose, &result) if err != nil { return 0, err } @@ -200,43 +204,40 @@ func (p *Base) RemoveAddress(address, description string, coinType currency.Code } // UpdatePortfolio adds to the portfolio addresses by coin type -func (p *Base) UpdatePortfolio(addresses []string, coinType currency.Code) bool { +func (p *Base) UpdatePortfolio(addresses []string, coinType currency.Code) error { if strings.Contains(strings.Join(addresses, ","), PortfolioAddressExchange) || strings.Contains(strings.Join(addresses, ","), PortfolioAddressPersonal) { - return true + return nil } - numErrors := 0 if coinType == currency.ETH { for x := range addresses { result, err := GetEthereumBalance(addresses[x]) if err != nil { - numErrors++ - continue + return err } if result.Error.Message != "" { - numErrors++ - continue + return errors.New(result.Error.Message) } + p.AddAddress(addresses[x], PortfolioAddressPersonal, coinType, result.ETH.Balance) } - return numErrors == 0 } for x := range addresses { result, err := GetCryptoIDAddress(addresses[x], coinType) if err != nil { - return false + return err } p.AddAddress(addresses[x], PortfolioAddressPersonal, coinType, result) } - return true + return nil } // GetPortfolioByExchange returns currency portfolio amount by exchange @@ -421,13 +422,19 @@ func StartPortfolioWatcher() { for { data := Portfolio.GetPortfolioGroupedCoin() for key, value := range data { - success := Portfolio.UpdatePortfolio(value, key) - if success { - log.Debugf(log.PortfolioMgr, - "PortfolioWatcher: Successfully updated address balance for %s address(es) %s\n", - key, value, - ) + err := Portfolio.UpdatePortfolio(value, key) + if err != nil { + log.Errorf(log.PortfolioMgr, + "PortfolioWatcher error %s for currency %s\n", + err, + key) + continue } + + log.Debugf(log.PortfolioMgr, + "PortfolioWatcher: Successfully updated address balance for %s address(es) %s\n", + key, + value) } time.Sleep(time.Minute * 10) } diff --git a/portfolio/portfolio_test.go b/portfolio/portfolio_test.go index 2b8029de..4a340abc 100644 --- a/portfolio/portfolio_test.go +++ b/portfolio/portfolio_test.go @@ -256,50 +256,50 @@ func TestUpdatePortfolio(t *testing.T) { portfolio := GetPortfolio() portfolio.Seed(newbase) - value := portfolio.UpdatePortfolio( - []string{"LdP8Qox1VAhCzLJNqrr74YovaWYyNBUWvL"}, currency.LTC, - ) - if !value { - t.Error("portfolio_test.go - UpdatePortfolio error") + err := portfolio.UpdatePortfolio( + []string{"LdP8Qox1VAhCzLJNqrr74YovaWYyNBUWvL"}, + currency.LTC) + if err != nil { + t.Error("portfolio_test.go - UpdatePortfolio error", err) } - value = portfolio.UpdatePortfolio([]string{"Testy"}, currency.LTC) - if value { - t.Error("portfolio_test.go - UpdatePortfolio error") + err = portfolio.UpdatePortfolio([]string{"Testy"}, currency.LTC) + if err == nil { + t.Error("portfolio_test.go - UpdatePortfolio error cannot be nil") } - value = portfolio.UpdatePortfolio( + err = portfolio.UpdatePortfolio( []string{"LdP8Qox1VAhCzLJNqrr74YovaWYyNBUWvL", "LVa8wZ983PvWtdwXZ8viK6SocMENLCXkEy"}, currency.LTC, ) - if !value { - t.Error("portfolio_test.go - UpdatePortfolio error") + if err != nil { + t.Error("portfolio_test.go - UpdatePortfolio error", err) } - value = portfolio.UpdatePortfolio( + err = portfolio.UpdatePortfolio( []string{"LdP8Qox1VAhCzLJNqrr74YovaWYyNBUWvL", "Testy"}, currency.LTC, ) - if value { - t.Error("portfolio_test.go - UpdatePortfolio error") + if err == nil { + t.Error("portfolio_test.go - UpdatePortfolio error cannot be nil") } time.Sleep(time.Second * 5) - value = portfolio.UpdatePortfolio( + err = portfolio.UpdatePortfolio( []string{"0xb794f5ea0ba39494ce839613fffba74279579268", "0xe853c56864a2ebe4576a807d26fdc4a0ada51919"}, currency.ETH, ) - if !value { - t.Error("portfolio_test.go - UpdatePortfolio error") + if err == nil { // eth support seems to have been dropped for cryptoid + t.Error("portfolio_test.go - UpdatePortfolio error cannot be nil") } - value = portfolio.UpdatePortfolio( + err = portfolio.UpdatePortfolio( []string{"0xb794f5ea0ba39494ce839613fffba74279579268", "TESTY"}, currency.ETH, ) - if value { - t.Error("portfolio_test.go - UpdatePortfolio error") + if err == nil { + t.Error("portfolio_test.go - UpdatePortfolio error cannot be nil") } - value = portfolio.UpdatePortfolio( + err = portfolio.UpdatePortfolio( []string{PortfolioAddressExchange, PortfolioAddressPersonal}, currency.LTC) - if !value { - t.Error("portfolio_test.go - UpdatePortfolio error") + if err != nil { + t.Error("portfolio_test.go - UpdatePortfolio error", err) } }