Kucoin: Fix ProcessMarketSnapshot and add a test (#1392)

* Kucoin: Fix ProcessMarketSnapshot and add a test

* Kucoin: ProcessMarketSnapshot: move the check before the ticker

* Kucoin: remove time.sleep from the test, add if statement to processmarketSnapshot

* Kucoin: ProcessMarketSnapshot to send margin, spot and both margin and spot pair data

* Kucoin: range over listOfAssetsCurrencyPairEnabledFor which returns a slice

* Kucoin: linter fix and pointers placed

* Kucoin: removed AssetWebsocketSupport and ku.CurrencyPairs.IsAssetEnabled, linter error fixed, comment amended
This commit is contained in:
Bea
2023-11-21 07:28:38 +07:00
committed by GitHub
parent dff0581890
commit 06024fcb56
5 changed files with 168 additions and 175 deletions

View File

@@ -24,7 +24,9 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)
@@ -2415,6 +2417,68 @@ func TestProcessOrderbook(t *testing.T) {
}
}
func TestProcessMarketSnapshot(t *testing.T) {
t.Parallel()
n := new(Kucoin)
sharedtestvalues.TestFixtureToDataHandler(t, ku, n, "testdata/wsMarketSnapshot.json", n.wsHandleData)
seen := 0
for reading := true; reading; {
select {
default:
reading = false
case resp := <-n.GetBase().Websocket.DataHandler:
seen++
switch v := resp.(type) {
case *ticker.Price:
switch seen {
// spot only
case 1:
assert.Equal(t, time.UnixMilli(1698740324415), v.LastUpdated, "datetime")
assert.Equal(t, 0.00001402100000000000, v.High, "high")
assert.Equal(t, 0.000012508, v.Last, "lastTradedPrice")
assert.Equal(t, 0.00001129200000000000, v.Low, "low")
assert.Equal(t, currency.NewPairWithDelimiter("XMR", "BTC", "-"), v.Pair, "symbol")
assert.Equal(t, 28474.47280000000000000000, v.Volume, "volume")
assert.Equal(t, 0.37038038297340000000, v.QuoteVolume, "volValue")
// margin only
case 2:
assert.Equal(t, time.UnixMilli(1698740324483), v.LastUpdated, "datetime")
assert.Equal(t, 0.00000039450000000000, v.High, "high")
assert.Equal(t, 0.0000003897, v.Last, "lastTradedPrice")
assert.Equal(t, 0.00000034200000000000, v.Low, "low")
assert.Equal(t, currency.NewPairWithDelimiter("MTV", "BTC", "-"), v.Pair, "symbol")
assert.Equal(t, 316078.69700000000000000000, v.Volume, "volume")
assert.Equal(t, 0.11768519138877000000, v.QuoteVolume, "volValue")
// both margin and spot
case 3:
assert.Equal(t, time.UnixMilli(1698740324437), v.LastUpdated, "datetime")
assert.Equal(t, 0.00008486000000000000, v.High, "high")
assert.Equal(t, 0.00008318, v.Last, "lastTradedPrice")
assert.Equal(t, 0.00007152000000000000, v.Low, "low")
assert.Equal(t, currency.NewPairWithDelimiter("BTC", "USDT", "-"), v.Pair, "symbol")
assert.Equal(t, 17062.45450000000000000000, v.Volume, "volume")
assert.Equal(t, 1.33076678861000000000, v.QuoteVolume, "volValue")
}
case error:
t.Error(v)
default:
t.Errorf("Got unexpected data: %T %v", v, v)
}
}
}
assert.Equal(t, 4, seen, "Number of messages")
}
func TestSubscribeMarketSnapshot(t *testing.T) {
t.Parallel()
setupWS()
s := []stream.ChannelSubscription{
{Channel: marketTickerSnapshotForCurrencyChannel,
Currency: currency.Pair{Base: currency.BTC}},
}
err := ku.Subscribe(s)
assert.NoError(t, err, "Subscribe to MarketSnapshot should not error")
}
func TestSeedLocalCache(t *testing.T) {
t.Parallel()
pair, err := currency.NewPairFromString("ETH-USDT")

View File

@@ -232,7 +232,7 @@ func (ku *Kucoin) wsHandleData(respData []byte) error {
return ku.processTicker(resp.Data, instruments)
case strings.HasPrefix(marketTickerSnapshotChannel, topicInfo[0]) ||
strings.HasPrefix(marketTickerSnapshotForCurrencyChannel, topicInfo[0]):
return ku.processMarketSnapshot(resp.Data, topicInfo[1])
return ku.processMarketSnapshot(resp.Data)
case strings.HasPrefix(marketOrderbookLevel2Channels, topicInfo[0]):
return ku.processOrderbookWithDepth(respData, topicInfo[1])
case strings.HasPrefix(marketOrderbookLevel2to5Channel, topicInfo[0]),
@@ -677,30 +677,24 @@ func (ku *Kucoin) processOrderChangeEvent(respData []byte) error {
if err != nil {
return err
}
orderChange := order.Detail{
Price: response.Price,
Amount: response.Size,
ExecutedAmount: response.FilledSize,
RemainingAmount: response.RemainSize,
Exchange: ku.Name,
OrderID: response.OrderID,
ClientOrderID: response.ClientOid,
Type: oType,
Side: side,
Status: oStatus,
AssetType: asset.Spot,
Date: response.OrderTime.Time(),
LastUpdated: response.Timestamp.Time(),
Pair: pair,
}
assetPairEnabled := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetPairEnabled[asset.Spot] && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
ku.Websocket.DataHandler <- &orderChange
}
if assetPairEnabled[asset.Margin] && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
marginOrderChange := orderChange
marginOrderChange.AssetType = asset.Margin
ku.Websocket.DataHandler <- &marginOrderChange
// TODO should amend this function as we need to know the order asset type when we call it
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
ku.Websocket.DataHandler <- &order.Detail{
Price: response.Price,
Amount: response.Size,
ExecutedAmount: response.FilledSize,
RemainingAmount: response.RemainSize,
Exchange: ku.Name,
OrderID: response.OrderID,
ClientOrderID: response.ClientOid,
Type: oType,
Side: side,
Status: oStatus,
AssetType: assetType,
Date: response.OrderTime.Time(),
LastUpdated: response.Timestamp.Time(),
Pair: pair,
}
}
return nil
}
@@ -724,26 +718,17 @@ func (ku *Kucoin) processTradeData(respData []byte, instrument string) error {
if err != nil {
return err
}
tradeData := trade.Data{
CurrencyPair: pair,
Timestamp: response.Time.Time(),
Price: response.Price,
Amount: response.Size,
Side: side,
Exchange: ku.Name,
TID: response.TradeID,
AssetType: asset.Spot,
}
assetPairEnabled := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetPairEnabled[asset.Spot] && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
err = ku.Websocket.Trade.Update(saveTradeData, tradeData)
if err != nil {
return err
}
}
if assetPairEnabled[asset.Margin] && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
tradeData.AssetType = asset.Margin
err := ku.Websocket.Trade.Update(saveTradeData, tradeData)
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
err = ku.Websocket.Trade.Update(saveTradeData, trade.Data{
CurrencyPair: pair,
Timestamp: response.Time.Time(),
Price: response.Price,
Amount: response.Size,
Side: side,
Exchange: ku.Name,
TID: response.TradeID,
AssetType: assetType,
})
if err != nil {
return err
}
@@ -761,26 +746,19 @@ func (ku *Kucoin) processTicker(respData []byte, instrument string) error {
if err != nil {
return err
}
spotTickerPrice := ticker.Price{
AssetType: asset.Spot,
Last: response.Price,
LastUpdated: response.Timestamp.Time(),
ExchangeName: ku.Name,
Pair: pair,
Ask: response.BestAsk,
Bid: response.BestBid,
AskSize: response.BestAskSize,
BidSize: response.BestBidSize,
Volume: response.Size,
}
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Spot) && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
ku.Websocket.DataHandler <- &spotTickerPrice
}
if assetEnabledPairs[asset.Margin] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Margin) && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
marginTickerPrice := spotTickerPrice
marginTickerPrice.AssetType = asset.Margin
ku.Websocket.DataHandler <- &marginTickerPrice
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
ku.Websocket.DataHandler <- &ticker.Price{
AssetType: assetType,
Last: response.Price,
LastUpdated: response.Timestamp.Time(),
ExchangeName: ku.Name,
Pair: pair,
Ask: response.BestAsk,
Bid: response.BestBid,
AskSize: response.BestAskSize,
BidSize: response.BestBidSize,
Volume: response.Size,
}
}
return nil
}
@@ -799,26 +777,20 @@ func (ku *Kucoin) processCandlesticks(respData []byte, instrument, intervalStrin
if err != nil {
return err
}
candlestickData := stream.KlineData{
Timestamp: response.Time.Time(),
Pair: pair,
AssetType: asset.Spot,
Exchange: ku.Name,
StartTime: resp.Candles.StartTime,
Interval: intervalString,
OpenPrice: resp.Candles.OpenPrice,
ClosePrice: resp.Candles.ClosePrice,
HighPrice: resp.Candles.HighPrice,
LowPrice: resp.Candles.LowPrice,
Volume: resp.Candles.TransactionVolume,
}
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Spot) && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
ku.Websocket.DataHandler <- candlestickData
}
if assetEnabledPairs[asset.Margin] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Margin) && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
candlestickData.AssetType = asset.Margin
ku.Websocket.DataHandler <- candlestickData
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
ku.Websocket.DataHandler <- stream.KlineData{
Timestamp: response.Time.Time(),
Pair: pair,
AssetType: assetType,
Exchange: ku.Name,
StartTime: resp.Candles.StartTime,
Interval: intervalString,
OpenPrice: resp.Candles.OpenPrice,
ClosePrice: resp.Candles.ClosePrice,
HighPrice: resp.Candles.HighPrice,
LowPrice: resp.Candles.LowPrice,
Volume: resp.Candles.TransactionVolume,
}
}
return nil
}
@@ -836,28 +808,15 @@ func (ku *Kucoin) processOrderbookWithDepth(respData []byte, instrument string)
return err
}
var init bool
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
init, err = ku.UpdateLocalBuffer(result.Result, asset.Spot)
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
init, err = ku.UpdateLocalBuffer(result.Result, assetType)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s",
ku.Name,
asset.Spot,
err)
}
}
if assetEnabledPairs[asset.Margin] && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
init, err = ku.UpdateLocalBuffer(result.Result, asset.Margin)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type: %v error: %s",
ku.Name,
asset.Margin,
assetType,
err)
}
}
@@ -913,65 +872,45 @@ func (ku *Kucoin) processOrderbook(respData []byte, symbol string) error {
return err
}
var init bool
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
init, err = ku.UpdateLocalBuffer(response, asset.Spot)
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
init, err = ku.UpdateLocalBuffer(response, assetType)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type %v error: %s",
ku.Name,
asset.Spot,
err)
}
}
if assetEnabledPairs[asset.Margin] && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
init, err = ku.UpdateLocalBuffer(response, asset.Margin)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache for asset type %v error: %s",
ku.Name,
asset.Margin,
assetType,
err)
}
}
return nil
}
func (ku *Kucoin) processMarketSnapshot(respData []byte, instrument string) error {
func (ku *Kucoin) processMarketSnapshot(respData []byte) error {
response := WsSpotTicker{}
err := json.Unmarshal(respData, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(instrument)
pair, err := currency.NewPairFromString(response.Data.Symbol)
if err != nil {
return err
}
spotTickerPrice := ticker.Price{
ExchangeName: ku.Name,
AssetType: asset.Spot,
Last: response.Data.LastTradedPrice,
Pair: pair,
Low: response.Data.Low,
High: response.Data.High,
QuoteVolume: response.Data.VolValue,
Volume: response.Data.Vol,
Open: response.Data.Open,
Close: response.Data.Close,
LastUpdated: response.Data.Datetime.Time(),
}
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Spot) && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
ku.Websocket.DataHandler <- &spotTickerPrice
}
if assetEnabledPairs[asset.Margin] && ku.AssetWebsocketSupport.IsAssetWebsocketSupported(asset.Margin) && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
marginTickerPrice := spotTickerPrice
marginTickerPrice.AssetType = asset.Margin
ku.Websocket.DataHandler <- &marginTickerPrice
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
ku.Websocket.DataHandler <- &ticker.Price{
ExchangeName: ku.Name,
AssetType: assetType,
Last: response.Data.LastTradedPrice,
Pair: pair,
Low: response.Data.Low,
High: response.Data.High,
QuoteVolume: response.Data.VolValue,
Volume: response.Data.Vol,
Open: response.Data.Open,
Close: response.Data.Close,
LastUpdated: response.Data.Datetime.Time(),
}
}
return nil
}
@@ -1897,16 +1836,13 @@ func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair, assetType a
return nil
}
func (ku *Kucoin) listOfAssetsCurrencyPairEnabledFor(cp currency.Pair) map[asset.Item]bool {
assetTypes := ku.CurrencyPairs.GetAssetTypes(true)
// we need this all asset types on the map even if their value is false
assetPairEnabled := map[asset.Item]bool{asset.Spot: false, asset.Futures: false, asset.Margin: false}
for i := range assetTypes {
pairs, err := ku.GetEnabledPairs(assetTypes[i])
if err != nil {
continue
func (ku *Kucoin) listOfAssetsCurrencyPairEnabledFor(cp currency.Pair) []asset.Item {
assets := []asset.Item{}
for _, a := range ku.CurrencyPairs.GetAssetTypes(true) {
pairs, err := ku.GetEnabledPairs(a)
if err == nil && pairs.Contains(cp, true) {
assets = append(assets, a)
}
assetPairEnabled[assetTypes[i]] = pairs.Contains(cp, true)
}
return assetPairEnabled
return assets
}

View File

@@ -405,29 +405,19 @@ func (ku *Kucoin) UpdateTickers(ctx context.Context, assetType asset.Item) error
if !pairs.Contains(pair, true) {
continue
}
tick := ticker.Price{
Last: ticks.Tickers[t].Last,
High: ticks.Tickers[t].High,
Low: ticks.Tickers[t].Low,
Volume: ticks.Tickers[t].Volume,
Ask: ticks.Tickers[t].Sell,
Bid: ticks.Tickers[t].Buy,
Pair: pair,
ExchangeName: ku.Name,
AssetType: assetType,
LastUpdated: ticks.Time.Time(),
}
assetEnabledPairs := ku.listOfAssetsCurrencyPairEnabledFor(pair)
if assetEnabledPairs[asset.Spot] && ku.CurrencyPairs.IsAssetEnabled(asset.Spot) == nil {
err = ticker.ProcessTicker(&tick)
if err != nil {
return err
}
}
if assetEnabledPairs[asset.Margin] && ku.CurrencyPairs.IsAssetEnabled(asset.Margin) == nil {
marginTick := tick
marginTick.AssetType = asset.Margin
err = ticker.ProcessTicker(&marginTick)
for _, assetType := range ku.listOfAssetsCurrencyPairEnabledFor(pair) {
err = ticker.ProcessTicker(&ticker.Price{
Last: ticks.Tickers[t].Last,
High: ticks.Tickers[t].High,
Low: ticks.Tickers[t].Low,
Volume: ticks.Tickers[t].Volume,
Ask: ticks.Tickers[t].Sell,
Bid: ticks.Tickers[t].Buy,
Pair: pair,
ExchangeName: ku.Name,
AssetType: assetType,
LastUpdated: ticks.Time.Time(),
})
if err != nil {
return err
}

View File

@@ -0,0 +1,3 @@
{"type":"message","topic":"/market/snapshot:BTC","subject":"trade.snapshot","data":{"sequence":1698740324504,"data":{"averagePrice":0.00001164,"baseCurrency":"XMR","board":0,"buy":0.00001252,"changePrice":0.00000104800000000000,"changeRate":0.0914,"close":0.000012508,"datetime":1698740324415,"high":0.00001402100000000000,"lastTradedPrice":0.000012508,"low":0.00001129200000000000,"makerCoefficient":2.000000,"makerFeeRate":0.001,"marginTrade":false,"mark":0,"market":"BTC","marketChange1h":{"changeRate":0,"high":0,"low":0,"open":0,"vol":0,"volValue":0},"marketChange24h":{"changePrice":0.00000104800000000000,"changeRate":0.0914,"high":0.00001402100000000000,"low":0.00001129200000000000,"open":0.00001146000000000000,"vol":28474.47280000000000000000,"volValue":0.37038038297340000000},"marketChange4h":{"changePrice":0.00000009600000000000,"changeRate":0.0077,"high":0.00001308400000000000,"low":0.00001241200000000000,"open":0.00001241200000000000,"vol":7090.00000000000000000000,"volValue":0.08885800028840000000},"markets":["BTC"],"open":0.00001146000000000000,"quoteCurrency":"BTC","sell":0.000013191,"sort":100,"symbol":"XMR-BTC","symbolCode":"XMR-BTC","takerCoefficient":2.000000,"takerFeeRate":0.001,"trading":true,"vol":28474.47280000000000000000,"volValue":0.37038038297340000000}}}
{"type":"message","topic":"/market/snapshot:BTC","subject":"trade.snapshot","data":{"sequence":1698740324488,"data":{"averagePrice":0.00000037,"baseCurrency":"MTV","board":0,"buy":0.0000003641,"changePrice":0.00000004770000000000,"changeRate":0.1394,"close":0.0000003897,"datetime":1698740324483,"high":0.00000039450000000000,"lastTradedPrice":0.0000003897,"low":0.00000034200000000000,"makerCoefficient":2.000000,"makerFeeRate":0.001,"marginTrade":false,"mark":0,"market":"BTC","marketChange1h":{"changeRate":0,"high":0,"low":0,"open":0,"vol":0,"volValue":0},"marketChange24h":{"changePrice":0.00000004770000000000,"changeRate":0.1394,"high":0.00000039450000000000,"low":0.00000034200000000000,"open":0.00000034200000000000,"vol":316078.69700000000000000000,"volValue":0.11768519138877000000},"marketChange4h":{"changePrice":0.00000003290000000000,"changeRate":0.0922,"high":0.00000038970000000000,"low":0.00000035680000000000,"open":0.00000035680000000000,"vol":2309.46880000000000000000,"volValue":0.00089999999136000000},"markets":["BTC"],"open":0.00000034200000000000,"quoteCurrency":"BTC","sell":0.0000004022,"sort":100,"symbol":"MTV-BTC","symbolCode":"MTV-BTC","takerCoefficient":2.000000,"takerFeeRate":0.001,"trading":true,"vol":316078.69700000000000000000,"volValue":0.11768519138877000000}}}
{"type":"message","topic":"/market/snapshot:BTC","subject":"trade.snapshot","data":{"sequence":1698740324508,"data":{"averagePrice":0.00007307,"baseCurrency":"BTC","board":0,"buy":0.00008388,"changePrice":0.00001166000000000000,"changeRate":0.1630,"close":0.00008318,"datetime":1698740324437,"high":0.00008486000000000000,"lastTradedPrice":0.00008318,"low":0.00007152000000000000,"makerCoefficient":1.000000,"makerFeeRate":0.001,"marginTrade":false,"mark":0,"market":"USDT","marketChange1h":{"changePrice":-0.00000116000000000000,"changeRate":-0.0137,"high":0.00008434000000000000,"low":0.00008318000000000000,"open":0.00008434000000000000,"vol":189.33430000000000000000,"volValue":0.01578748292300000000},"marketChange24h":{"changePrice":0.00001166000000000000,"changeRate":0.1630,"high":0.00008486000000000000,"low":0.00007152000000000000,"open":0.00007152000000000000,"vol":17062.45450000000000000000,"volValue":1.33076678861000000000},"marketChange4h":{"changePrice":0.00000143000000000000,"changeRate":0.0174,"high":0.00008486000000000000,"low":0.00008175000000000000,"open":0.00008175000000000000,"vol":1752.55690000000000000000,"volValue":0.14543003812900000000},"markets":["BTC"],"open":0.00007152000000000000,"quoteCurrency":"USDT","sell":0.00008421,"sort":100,"symbol":"BTC-USDT","symbolCode":"BTC-USDT","takerCoefficient":1.000000,"takerFeeRate":0.001,"trading":true,"vol":17062.45450000000000000000,"volValue":1.33076678861000000000}}}

File diff suppressed because one or more lines are too long