gateio: Remove listOfAssetsCurrencyPairEnabledFor method and optimise websocket asset processing (#1723)

* remove listOfAssetsCurrencyPairEnabledFor and optimise paths

* add helper method and further optimise

* glorious: nits thanksssss!

---------

Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
Ryan O'Hara-Reid
2024-11-20 16:15:38 +11:00
committed by GitHub
parent 105591b2a8
commit 1fab9c72d2
2 changed files with 127 additions and 210 deletions

View File

@@ -2064,23 +2064,23 @@ type WsOrderbookTickerData struct {
// WsOrderbookUpdate represents websocket orderbook update push data
type WsOrderbookUpdate struct {
UpdateTimeMs types.Time `json:"t"`
IgnoreField string `json:"e"`
UpdateTime types.Time `json:"E"`
CurrencyPair currency.Pair `json:"s"`
FirstOrderbookUpdatedID int64 `json:"U"` // First update order book id in this event since last update
LastOrderbookUpdatedID int64 `json:"u"`
Bids [][2]string `json:"b"`
Asks [][2]string `json:"a"`
UpdateTimeMs types.Time `json:"t"`
IgnoreField string `json:"e"`
UpdateTime types.Time `json:"E"`
CurrencyPair currency.Pair `json:"s"`
FirstOrderbookUpdatedID int64 `json:"U"` // First update order book id in this event since last update
LastOrderbookUpdatedID int64 `json:"u"`
Bids [][2]types.Number `json:"b"`
Asks [][2]types.Number `json:"a"`
}
// WsOrderbookSnapshot represents a websocket orderbook snapshot push data
type WsOrderbookSnapshot struct {
UpdateTimeMs types.Time `json:"t"`
LastUpdateID int64 `json:"lastUpdateId"`
CurrencyPair currency.Pair `json:"s"`
Bids [][2]string `json:"bids"`
Asks [][2]string `json:"asks"`
UpdateTimeMs types.Time `json:"t"`
LastUpdateID int64 `json:"lastUpdateId"`
CurrencyPair currency.Pair `json:"s"`
Bids [][2]types.Number `json:"bids"`
Asks [][2]types.Number `json:"asks"`
}
// WsSpotOrder represents an order push data through the websocket channel.

View File

@@ -74,6 +74,8 @@ var subscriptionNames = map[string]string{
subscription.AllTradesChannel: spotTradesChannel,
}
var standardMarginAssetTypes = []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin}
// WsConnectSpot initiates a websocket connection
func (g *Gateio) WsConnectSpot(ctx context.Context, conn stream.Connection) error {
err := g.CurrencyPairs.IsAssetEnabled(asset.Spot)
@@ -160,37 +162,28 @@ func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error {
func (g *Gateio) processTicker(incoming []byte, pushTime time.Time) error {
var data WsTicker
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
tickerPrice := ticker.Price{
ExchangeName: g.Name,
Volume: data.BaseVolume.Float64(),
QuoteVolume: data.QuoteVolume.Float64(),
High: data.High24H.Float64(),
Low: data.Low24H.Float64(),
Last: data.Last.Float64(),
Bid: data.HighestBid.Float64(),
Ask: data.LowestAsk.Float64(),
AssetType: asset.Spot,
Pair: data.CurrencyPair,
LastUpdated: pushTime,
}
assetPairEnabled := g.listOfAssetsCurrencyPairEnabledFor(data.CurrencyPair)
if assetPairEnabled[asset.Spot] {
g.Websocket.DataHandler <- &tickerPrice
}
if assetPairEnabled[asset.Margin] {
marginTicker := tickerPrice
marginTicker.AssetType = asset.Margin
g.Websocket.DataHandler <- &marginTicker
}
if assetPairEnabled[asset.CrossMargin] {
crossMarginTicker := tickerPrice
crossMarginTicker.AssetType = asset.CrossMargin
g.Websocket.DataHandler <- &crossMarginTicker
out := make([]ticker.Price, 0, len(standardMarginAssetTypes))
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
out = append(out, ticker.Price{
ExchangeName: g.Name,
Volume: data.BaseVolume.Float64(),
QuoteVolume: data.QuoteVolume.Float64(),
High: data.High24H.Float64(),
Low: data.Low24H.Float64(),
Last: data.Last.Float64(),
Bid: data.HighestBid.Float64(),
Ask: data.LowestAsk.Float64(),
AssetType: a,
Pair: data.CurrencyPair,
LastUpdated: pushTime,
})
}
}
g.Websocket.DataHandler <- out
return nil
}
@@ -201,8 +194,7 @@ func (g *Gateio) processTrades(incoming []byte) error {
}
var data WsTrade
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
@@ -210,21 +202,19 @@ func (g *Gateio) processTrades(incoming []byte) error {
if err != nil {
return err
}
tData := trade.Data{
Timestamp: data.CreateTimeMs.Time(),
CurrencyPair: data.CurrencyPair,
AssetType: asset.Spot,
Exchange: g.Name,
Price: data.Price.Float64(),
Amount: data.Amount.Float64(),
Side: side,
TID: strconv.FormatInt(data.ID, 10),
}
for _, assetType := range []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin} {
if g.listOfAssetsCurrencyPairEnabledFor(data.CurrencyPair)[assetType] {
tData.AssetType = assetType
if err := g.Websocket.Trade.Update(saveTradeData, tData); err != nil {
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
if err := g.Websocket.Trade.Update(saveTradeData, trade.Data{
Timestamp: data.CreateTimeMs.Time(),
CurrencyPair: data.CurrencyPair,
AssetType: a,
Exchange: g.Name,
Price: data.Price.Float64(),
Amount: data.Amount.Float64(),
Side: side,
TID: strconv.FormatInt(data.ID, 10),
}); err != nil {
return err
}
}
@@ -235,8 +225,7 @@ func (g *Gateio) processTrades(incoming []byte) error {
func (g *Gateio) processCandlestick(incoming []byte) error {
var data WsCandlesticks
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
icp := strings.Split(data.NameOfSubscription, currency.UnderscoreDelimiter)
@@ -247,42 +236,33 @@ func (g *Gateio) processCandlestick(incoming []byte) error {
if err != nil {
return err
}
spotCandlestick := stream.KlineData{
Pair: currencyPair,
AssetType: asset.Spot,
Exchange: g.Name,
StartTime: data.Timestamp.Time(),
Interval: icp[0],
OpenPrice: data.OpenPrice.Float64(),
ClosePrice: data.ClosePrice.Float64(),
HighPrice: data.HighestPrice.Float64(),
LowPrice: data.LowestPrice.Float64(),
Volume: data.TotalVolume.Float64(),
}
assetPairEnabled := g.listOfAssetsCurrencyPairEnabledFor(currencyPair)
if assetPairEnabled[asset.Spot] {
g.Websocket.DataHandler <- spotCandlestick
}
if assetPairEnabled[asset.Margin] {
marginCandlestick := spotCandlestick
marginCandlestick.AssetType = asset.Margin
g.Websocket.DataHandler <- marginCandlestick
}
if assetPairEnabled[asset.CrossMargin] {
crossMarginCandlestick := spotCandlestick
crossMarginCandlestick.AssetType = asset.CrossMargin
g.Websocket.DataHandler <- crossMarginCandlestick
out := make([]stream.KlineData, 0, len(standardMarginAssetTypes))
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(currencyPair, a); enabled {
out = append(out, stream.KlineData{
Pair: currencyPair,
AssetType: a,
Exchange: g.Name,
StartTime: data.Timestamp.Time(),
Interval: icp[0],
OpenPrice: data.OpenPrice.Float64(),
ClosePrice: data.ClosePrice.Float64(),
HighPrice: data.HighestPrice.Float64(),
LowPrice: data.LowestPrice.Float64(),
Volume: data.TotalVolume.Float64(),
})
}
}
g.Websocket.DataHandler <- out
return nil
}
func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Time) error {
var data WsOrderbookTickerData
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
return g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: data.CurrencyPair,
@@ -296,145 +276,96 @@ func (g *Gateio) processOrderbookTicker(incoming []byte, updatePushedAt time.Tim
func (g *Gateio) processOrderbookUpdate(incoming []byte, updatePushedAt time.Time) error {
var data WsOrderbookUpdate
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
assetPairEnabled := g.listOfAssetsCurrencyPairEnabledFor(data.CurrencyPair)
if !fetchedCurrencyPairSnapshotOrderbook[data.CurrencyPair.String()] {
var orderbooks *orderbook.Base
orderbooks, err = g.FetchOrderbook(context.Background(), data.CurrencyPair, asset.Spot) // currency pair orderbook data for Spot, Margin, and Cross Margin is same
if len(data.Asks) == 0 && len(data.Bids) == 0 {
return nil
}
enabledAssets := make([]asset.Item, 0, len(standardMarginAssetTypes))
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
enabledAssets = append(enabledAssets, a)
}
}
sPair := data.CurrencyPair.String()
if !fetchedCurrencyPairSnapshotOrderbook[sPair] {
orderbooks, err := g.FetchOrderbook(context.Background(), data.CurrencyPair, asset.Spot) // currency pair orderbook data for Spot, Margin, and Cross Margin is same
if err != nil {
return err
}
// TODO: handle orderbook update synchronisation
for _, assetType := range []asset.Item{asset.Spot, asset.Margin, asset.CrossMargin} {
if !assetPairEnabled[assetType] {
continue
}
for _, a := range enabledAssets {
assetOrderbook := *orderbooks
assetOrderbook.Asset = assetType
assetOrderbook.Asset = a
err = g.Websocket.Orderbook.LoadSnapshot(&assetOrderbook)
if err != nil {
return err
}
}
fetchedCurrencyPairSnapshotOrderbook[data.CurrencyPair.String()] = true
fetchedCurrencyPairSnapshotOrderbook[sPair] = true
}
updates := orderbook.Update{
UpdateTime: data.UpdateTimeMs.Time(),
UpdatePushedAt: updatePushedAt,
Pair: data.CurrencyPair,
}
updates.Asks = make([]orderbook.Tranche, len(data.Asks))
asks := make([]orderbook.Tranche, len(data.Asks))
for x := range data.Asks {
updates.Asks[x].Price, err = strconv.ParseFloat(data.Asks[x][0], 64)
if err != nil {
return err
}
updates.Asks[x].Amount, err = strconv.ParseFloat(data.Asks[x][1], 64)
if err != nil {
return err
}
asks[x].Price = data.Asks[x][0].Float64()
asks[x].Amount = data.Asks[x][1].Float64()
}
updates.Bids = make([]orderbook.Tranche, len(data.Bids))
bids := make([]orderbook.Tranche, len(data.Bids))
for x := range data.Bids {
updates.Bids[x].Price, err = strconv.ParseFloat(data.Bids[x][0], 64)
if err != nil {
return err
}
updates.Bids[x].Amount, err = strconv.ParseFloat(data.Bids[x][1], 64)
if err != nil {
return err
}
}
if len(updates.Asks) == 0 && len(updates.Bids) == 0 {
return nil
}
if assetPairEnabled[asset.Spot] {
updates.Asset = asset.Spot
err = g.Websocket.Orderbook.Update(&updates)
if err != nil {
return err
}
}
if assetPairEnabled[asset.Margin] {
marginUpdates := updates
marginUpdates.Asset = asset.Margin
err = g.Websocket.Orderbook.Update(&marginUpdates)
if err != nil {
return err
}
}
if assetPairEnabled[asset.CrossMargin] {
crossMarginUpdate := updates
crossMarginUpdate.Asset = asset.CrossMargin
err = g.Websocket.Orderbook.Update(&crossMarginUpdate)
if err != nil {
bids[x].Price = data.Bids[x][0].Float64()
bids[x].Amount = data.Bids[x][1].Float64()
}
for _, a := range enabledAssets {
if err := g.Websocket.Orderbook.Update(&orderbook.Update{
UpdateTime: data.UpdateTimeMs.Time(),
UpdatePushedAt: updatePushedAt,
Pair: data.CurrencyPair,
Asset: a,
Asks: asks,
Bids: bids,
}); err != nil {
return err
}
}
return nil
}
func (g *Gateio) processOrderbookSnapshot(incoming []byte, updatePushedAt time.Time) error {
var data WsOrderbookSnapshot
err := json.Unmarshal(incoming, &data)
if err != nil {
if err := json.Unmarshal(incoming, &data); err != nil {
return err
}
assetPairEnabled := g.listOfAssetsCurrencyPairEnabledFor(data.CurrencyPair)
bases := orderbook.Base{
Exchange: g.Name,
Pair: data.CurrencyPair,
Asset: asset.Spot,
LastUpdated: data.UpdateTimeMs.Time(),
UpdatePushedAt: updatePushedAt,
LastUpdateID: data.LastUpdateID,
VerifyOrderbook: g.CanVerifyOrderbook,
}
bases.Asks = make([]orderbook.Tranche, len(data.Asks))
asks := make([]orderbook.Tranche, len(data.Asks))
for x := range data.Asks {
bases.Asks[x].Price, err = strconv.ParseFloat(data.Asks[x][0], 64)
if err != nil {
return err
}
bases.Asks[x].Amount, err = strconv.ParseFloat(data.Asks[x][1], 64)
if err != nil {
return err
}
asks[x].Price = data.Asks[x][0].Float64()
asks[x].Amount = data.Asks[x][1].Float64()
}
bases.Bids = make([]orderbook.Tranche, len(data.Bids))
bids := make([]orderbook.Tranche, len(data.Bids))
for x := range data.Bids {
bases.Bids[x].Price, err = strconv.ParseFloat(data.Bids[x][0], 64)
if err != nil {
return err
}
bases.Bids[x].Amount, err = strconv.ParseFloat(data.Bids[x][1], 64)
if err != nil {
return err
}
bids[x].Price = data.Bids[x][0].Float64()
bids[x].Amount = data.Bids[x][1].Float64()
}
if assetPairEnabled[asset.Spot] {
err = g.Websocket.Orderbook.LoadSnapshot(&bases)
if err != nil {
return err
}
}
if assetPairEnabled[asset.Margin] {
marginBases := bases
marginBases.Asset = asset.Margin
err = g.Websocket.Orderbook.LoadSnapshot(&marginBases)
if err != nil {
return err
}
}
if assetPairEnabled[asset.CrossMargin] {
crossMarginBases := bases
crossMarginBases.Asset = asset.CrossMargin
err = g.Websocket.Orderbook.LoadSnapshot(&crossMarginBases)
if err != nil {
return err
for _, a := range standardMarginAssetTypes {
if enabled, _ := g.CurrencyPairs.IsPairEnabled(data.CurrencyPair, a); enabled {
if err := g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Exchange: g.Name,
Pair: data.CurrencyPair,
Asset: a,
LastUpdated: data.UpdateTimeMs.Time(),
UpdatePushedAt: updatePushedAt,
Bids: bids,
Asks: asks,
}); err != nil {
return err
}
}
}
return nil
@@ -715,20 +646,6 @@ func (g *Gateio) Unsubscribe(ctx context.Context, conn stream.Connection, subs s
return g.manageSubs(ctx, unsubscribeEvent, conn, subs)
}
func (g *Gateio) listOfAssetsCurrencyPairEnabledFor(cp currency.Pair) map[asset.Item]bool {
assetTypes := g.CurrencyPairs.GetAssetTypes(true)
// we need this all asset types on the map even if their value is false
assetPairEnabled := map[asset.Item]bool{asset.Spot: false, asset.Options: false, asset.Futures: false, asset.CrossMargin: false, asset.Margin: false, asset.DeliveryFutures: false}
for i := range assetTypes {
pairs, err := g.GetEnabledPairs(assetTypes[i])
if err != nil {
continue
}
assetPairEnabled[assetTypes[i]] = pairs.Contains(cp, true)
}
return assetPairEnabled
}
// GenerateWebsocketMessageID generates a message ID for the individual connection
func (g *Gateio) GenerateWebsocketMessageID(bool) int64 {
return g.Counter.IncrementAndGet()