exchanges: Update GateIO exchange to V4 (#1058)

* Adding Public Endpoints and test functions

* Adding public endpoints and test functions

* Adding private spot endpoints

* Adding private endpoints and corresponding tests for margin

* Adding Margin Private endpoints

* Adding cross margin and flash swap endpoints

* Adding futures private endpoints

* Adding futures private endpoints and corresponding tests

* Adding Options and SubAccount endpoints and their unit tests

* Adding Wrapper functions

* Complete wrapper functions and corresponding unit test functions

* Fixing wrapper issues and adding websocket functions

* Update of Spot websocket and adding futures websocket handlers

* completed futures WS push data endpoints

* Completed Options websocket endpoints

* Adding websocket support for delivery futures and slight update on endpoint funcs

* Added Delivery websocket support and fix linter issues

* Update on Unit tests

* fix slight currency format error

* Fix slight endpoint tempos

* Update on conditional statements and unit tests issues

* fixing slight tempos

* Slight model and websocket data push method change

* Fix unit test tempos and updating models

* Fix on code structures and update on unit tests

* Slight code fix

* Remove print statements

* Update on tradable pairs fetch eps

* Fix websocket tempos

* Adding types to websocket routine manager

* Fix slight issues

* Slight fixes

* Updating wrapper funcs and models

* Slight update

* Update on test

* Update on tradable pairs

* update conditional statements

* Fixing slight issues

* Updating unit tests

* Minor fixes depending review comments

* Remove redundant method declaration

* Adding missing intervals

* Updating fetch tradable pairs

* update tradable pairs issues

* Addressing small tempos

* Slight fix on ticker

* Minor Fixes

* Minor review comment fixes

* Unit test and minor code updates

* Slight code updates

* Minor updates depending review comments

* Fixes

* Updating incoming message matcher

* Fix missing merge issue

* Fix minor wrapper issues

* Updating ratelimit and other issues

* Updating endpoint models and adding missing eps

* Update on code structure and models

* Minor codespell fixes

* Minor update on models

* fix unit test panic

* Minor race fix

* Fix issues in generating signature and unit tests

* Minor update on wrapper and unit tests

* Minor fix on wrapper

* Mini linter issues fix

* Minor fix

* endpoint fixes and slight update

* Minor fixes

* Updating exchange functions and unit tests

* Unit test and wrapper updates

* Remove options candlestick support

* Minor unit test and wrapper fix

* Unit test update

* minor fix on unit test and wrapper

* endpoints constants name change

* Add minor wrapper issues

* endpoint constants update

* endpoint url updates

* Updating subscriptions

* fixing dual mode endpoint methods

* minor fix

* rm small tempo

* Update on websocket orderbook handling

* Orderbook and currency pair update

* fix linter and test issues

* minor helper function update

* Fix wrapper coverage and wrapper issues

* delete unused variables

* Minor fix on ReadData() call

* separating websocket handlers

* separating websocket handlers

* Minor fix on enabled pair

* minor fix

* check instrument availability in spot

* create a separate subscriber for sake of multiple websocket connection

* linter fix

* minor websocket and gateio endpoints fix

* fix nil pointer exception

* minor fixes

* spelling fix decerializes -> deserializes

* fix Bitfinex unit test issues

* minor unknown currency pair labling fix

* minor currency pair handling fix

* slight update on GetDepositAddress wrapper unit test

* setting max request job to 200

* fixing numerical and timestamp type convert

* fix value overflow error

* change method of parsing orderbook price

* unifying timestamp conversion types to gateioTime

---------

Co-authored-by: Samuael Adnew <samuaelad@Samuaels-MacBook-Air.local>
This commit is contained in:
Samuael A
2023-05-30 04:03:53 +00:00
committed by GitHub
parent d6bff305c7
commit 3eac6d12bd
25 changed files with 14353 additions and 2871 deletions

1
.gitignore vendored
View File

@@ -48,3 +48,4 @@ __debug_bin
# Coverage reports
coverage.txt
wrapperconfig.json

File diff suppressed because one or more lines are too long

View File

@@ -187,7 +187,7 @@ func (b *BaseCodes) Register(c string, newRole Role) Code {
var format bool
// Digits fool upper and lower casing. So find first letter and check case.
for x := range c {
if !unicode.IsDigit(rune(c[x])) {
if unicode.IsLetter(rune(c[x])) {
format = unicode.IsUpper(rune(c[x]))
break
}

View File

@@ -2999,6 +2999,7 @@ var (
USDFL = NewCode("USDFL")
FLUSD = NewCode("FLUSD")
DUSD = NewCode("DUSD")
STETH = NewCode("STETH")
stables = Currencies{
USDT,

View File

@@ -84,17 +84,29 @@ func NewPairFromIndex(currencyPair, index string) (Pair, error) {
// NewPairFromString converts currency string into a new CurrencyPair
// with or without delimiter
func NewPairFromString(currencyPair string) (Pair, error) {
for x := range delimiters {
if strings.Contains(currencyPair, delimiters[x]) {
return NewPairDelimiter(currencyPair, delimiters[x])
}
}
if len(currencyPair) < 3 {
return EMPTYPAIR,
fmt.Errorf("%w from %s string too short to be a current pair",
fmt.Errorf("%w from %s string too short to be a currency pair",
errCannotCreatePair,
currencyPair)
}
var delimiter string
pairStrings := []string{currencyPair}
for x := range delimiters {
if strings.Contains(pairStrings[0], delimiters[x]) {
values := strings.SplitN(pairStrings[0], delimiters[x], 2)
if delimiter != "" {
values[1] += delimiter + pairStrings[1]
pairStrings = values
} else {
pairStrings = values
}
delimiter = delimiters[x]
}
}
if delimiter != "" {
return Pair{Base: NewCode(pairStrings[0]), Delimiter: delimiter, Quote: NewCode(pairStrings[1])}, nil
}
return NewPairFromStrings(currencyPair[0:3], currencyPair[3:])
}

View File

@@ -507,6 +507,25 @@ func TestNewPairFromString(t *testing.T) {
actual, expected,
)
}
pairMap := map[string]Pair{
"BTC_USDT-20230630-45000-C": {Base: NewCode("BTC"), Delimiter: UnderscoreDelimiter, Quote: NewCode("USDT-20230630-45000-C")},
"BTC-USD-221007": {Base: NewCode("BTC"), Delimiter: DashDelimiter, Quote: NewCode("USD-221007")},
"IHT_ETH": {Base: NewCode("IHT"), Delimiter: UnderscoreDelimiter, Quote: NewCode("ETH")},
"BTC-USD-220930-30000-P": {Base: NewCode("BTC"), Delimiter: DashDelimiter, Quote: NewCode("USD-220930-30000-P")},
"XBTUSDTM": {Base: NewCode("XBT"), Delimiter: "", Quote: NewCode("USDTM")},
"BTC-PERPETUAL": {Base: NewCode("BTC"), Delimiter: DashDelimiter, Quote: NewCode("PERPETUAL")},
"SOL-21OCT22-20-C": {Base: NewCode("SOL"), Delimiter: DashDelimiter, Quote: NewCode("21OCT22-20-C")},
"SOL-FS-30DEC22_28OCT22": {Base: NewCode("SOL"), Delimiter: DashDelimiter, Quote: NewCode("FS-30DEC22_28OCT22")},
}
for key, expectedPair := range pairMap {
pair, err = NewPairFromString(key)
if err != nil {
t.Fatal(err)
}
if !pair.Equal(expectedPair) || pair.Delimiter != expectedPair.Delimiter {
t.Errorf("Pair(): %s was not equal to expected value: %s", pair.String(), expectedPair.String())
}
}
}
func TestNewPairFromFormattedPairs(t *testing.T) {

View File

@@ -234,6 +234,24 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
return err
}
m.syncer.PrintTickerSummary(d, "websocket", err)
case []ticker.Price:
for x := range d {
if m.syncer.IsRunning() {
err := m.syncer.Update(exchName,
d[x].Pair,
d[x].AssetType,
SyncItemTicker,
nil)
if err != nil {
return err
}
}
err := ticker.ProcessTicker(&d[x])
if err != nil {
return err
}
m.syncer.PrintTickerSummary(&d[x], "websocket", err)
}
case stream.KlineData:
if m.verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
@@ -242,6 +260,16 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
d.AssetType,
d)
}
case []stream.KlineData:
for x := range d {
if m.verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
exchName,
m.FormatCurrency(d[x].Pair),
d[x].AssetType,
d)
}
}
case *orderbook.Depth:
base, err := d.Retrieve()
if err != nil {
@@ -281,6 +309,30 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
}
m.printOrderSummary(d, true)
}
case []order.Detail:
for x := range d {
if !m.orderManager.Exists(&d[x]) {
err := m.orderManager.Add(&d[x])
if err != nil {
return err
}
m.printOrderSummary(&d[x], false)
} else {
od, err := m.orderManager.GetByExchangeAndID(d[x].Exchange, d[x].OrderID)
if err != nil {
return err
}
err = od.UpdateOrderFromDetail(&d[x])
if err != nil {
return err
}
err = m.orderManager.UpdateExistingOrder(od)
if err != nil {
return err
}
m.printOrderSummary(&d[x], true)
}
}
case order.ClassificationError:
return fmt.Errorf("%w %s", d.Err, d.Error())
case stream.UnhandledMessageWarning:
@@ -289,6 +341,12 @@ func (m *WebsocketRoutineManager) websocketDataHandler(exchName string, data int
if m.verbose {
m.printAccountHoldingsChangeSummary(d)
}
case []account.Change:
if m.verbose {
for x := range d {
m.printAccountHoldingsChangeSummary(d[x])
}
}
case []trade.Data:
if m.verbose {
log.Infof(log.Trade, "%+v", d)

View File

@@ -23,12 +23,14 @@ const (
Empty Item = 0
Spot Item = 1 << iota
Margin
CrossMargin
MarginFunding
Index
Binary
PerpetualContract
PerpetualSwap
Futures
DeliveryFutures
UpsideProfitContract
DownsideProfitContract
CoinMarginedFutures
@@ -36,17 +38,19 @@ const (
USDCMarginedFutures
Options
futuresFlag = PerpetualContract | PerpetualSwap | Futures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures
supportedFlag = Spot | Margin | MarginFunding | Index | Binary | PerpetualContract | PerpetualSwap | Futures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | Options
futuresFlag = PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures
supportedFlag = Spot | Margin | CrossMargin | MarginFunding | Index | Binary | PerpetualContract | PerpetualSwap | Futures | DeliveryFutures | UpsideProfitContract | DownsideProfitContract | CoinMarginedFutures | USDTMarginedFutures | USDCMarginedFutures | Options
spot = "spot"
margin = "margin"
crossMargin = "cross_margin" // for Gateio exchange
marginFunding = "marginfunding"
index = "index"
binary = "binary"
perpetualContract = "perpetualcontract"
perpetualSwap = "perpetualswap"
futures = "futures"
deliveryFutures = "delivery"
upsideProfitContract = "upsideprofitcontract"
downsideProfitContract = "downsideprofitcontract"
coinMarginedFutures = "coinmarginedfutures"
@@ -56,7 +60,7 @@ const (
)
var (
supportedList = Items{Spot, Margin, MarginFunding, Index, Binary, PerpetualContract, PerpetualSwap, Futures, UpsideProfitContract, DownsideProfitContract, CoinMarginedFutures, USDTMarginedFutures, USDCMarginedFutures, Options}
supportedList = Items{Spot, Margin, CrossMargin, MarginFunding, Index, Binary, PerpetualContract, PerpetualSwap, Futures, DeliveryFutures, UpsideProfitContract, DownsideProfitContract, CoinMarginedFutures, USDTMarginedFutures, USDCMarginedFutures, Options}
)
// Supported returns a list of supported asset types
@@ -71,6 +75,8 @@ func (a Item) String() string {
return spot
case Margin:
return margin
case CrossMargin:
return crossMargin
case MarginFunding:
return marginFunding
case Index:
@@ -83,6 +89,8 @@ func (a Item) String() string {
return perpetualSwap
case Futures:
return futures
case DeliveryFutures:
return deliveryFutures
case UpsideProfitContract:
return upsideProfitContract
case DownsideProfitContract:
@@ -170,6 +178,10 @@ func New(input string) (Item, error) {
return Margin, nil
case marginFunding:
return MarginFunding, nil
case crossMargin:
return CrossMargin, nil
case deliveryFutures:
return DeliveryFutures, nil
case index:
return Index, nil
case binary:

View File

@@ -1189,7 +1189,7 @@ func TestWsCandleResponse(t *testing.T) {
}
func TestWsOrderSnapshot(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"os",[[34930659963,null,1574955083558,"tETHUSD",1574955083558,1574955083573,0.201104,0.201104,"EXCHANGE LIMIT",null,null,null,0,"ACTIVE",null,null,120,0,0,0,null,null,null,0,0,null,null,null,"BFX",null,null,null]]]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
@@ -1203,7 +1203,7 @@ func TestWsOrderSnapshot(t *testing.T) {
}
func TestWsNotifications(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"n",[1575282446099,"fon-req",null,null,[41238905,null,null,null,-1000,null,null,null,null,null,null,null,null,null,0.002,2,null,null,null,null,null],null,"SUCCESS","Submitting funding bid of 1000.0 USD at 0.2000 for 2 days."]]`
err := b.wsHandleData([]byte(pressXToJSON))
if err != nil {
@@ -1218,7 +1218,7 @@ func TestWsNotifications(t *testing.T) {
}
func TestWSFundingOfferSnapshotAndUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"fos",[[41237920,"fETH",1573912039000,1573912039000,0.5,0.5,"LIMIT",null,null,0,"ACTIVE",null,null,null,0.0024,2,0,0,null,0,null]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1231,7 +1231,7 @@ func TestWSFundingOfferSnapshotAndUpdate(t *testing.T) {
}
func TestWSFundingCreditSnapshotAndUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"fcs",[[26223578,"fUST",1,1575052261000,1575296187000,350,0,"ACTIVE",null,null,null,0,30,1575052261000,1575293487000,0,0,null,0,null,0,"tBTCUST"],[26223711,"fUSD",-1,1575291961000,1575296187000,180,0,"ACTIVE",null,null,null,0.002,7,1575282446000,1575295587000,0,0,null,0,null,0,"tETHUSD"]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1244,7 +1244,7 @@ func TestWSFundingCreditSnapshotAndUpdate(t *testing.T) {
}
func TestWSFundingLoanSnapshotAndUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"fls",[[2995442,"fUSD",-1,1575291961000,1575295850000,820,0,"ACTIVE",null,null,null,0.002,7,1575282446000,1575295850000,0,0,null,0,null,0]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1257,7 +1257,7 @@ func TestWSFundingLoanSnapshotAndUpdate(t *testing.T) {
}
func TestWSWalletSnapshot(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"ws",[["exchange","SAN",19.76,0,null,null,null]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1265,7 +1265,7 @@ func TestWSWalletSnapshot(t *testing.T) {
}
func TestWSBalanceUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
const pressXToJSON = `[0,"bu",[4131.85,4131.85]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1273,7 +1273,7 @@ func TestWSBalanceUpdate(t *testing.T) {
}
func TestWSMarginInfoUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
const pressXToJSON = `[0,"miu",["base",[-13.014640000000007,0,49331.70267297,49318.68803297,27]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1281,7 +1281,7 @@ func TestWSMarginInfoUpdate(t *testing.T) {
}
func TestWSFundingInfoUpdate(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
const pressXToJSON = `[0,"fiu",["sym","tETHUSD",[149361.09689202666,149639.26293509,830.0182168075556,895.0658432466332]]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)
@@ -1289,7 +1289,7 @@ func TestWSFundingInfoUpdate(t *testing.T) {
}
func TestWSFundingTrade(t *testing.T) {
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
pressXToJSON := `[0,"fte",[636854,"fUSD",1575282446000,41238905,-1000,0.002,7,null]]`
if err := b.wsHandleData([]byte(pressXToJSON)); err != nil {
t.Error(err)

View File

@@ -172,7 +172,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
}
if status == "OK" {
b.Websocket.DataHandler <- d
b.WsAddSubscriptionChannel(0, "account", "N/A")
b.WsAddSubscriptionChannel(0, "account", "")
} else if status == "fail" {
if code, ok := d["code"].(string); ok {
return fmt.Errorf("websocket unable to AUTH. Error code: %s",
@@ -238,7 +238,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
if err != nil {
return err
}
case len(pairInfo) == 1:
case len(pairInfo) == 1 && chanInfo.Pair != "":
newPair := pairInfo[0]
if newPair[0] == 'f' {
chanAsset = asset.MarginFunding

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,133 @@
package gateio
import (
"encoding/json"
"fmt"
"strconv"
"time"
)
type gateioTime time.Time
// UnmarshalJSON deserializes json, and timestamp information.
func (a *gateioTime) UnmarshalJSON(data []byte) error {
var value interface{}
err := json.Unmarshal(data, &value)
if err != nil {
return err
}
var standard int64
switch val := value.(type) {
case float64:
standard = int64(val)
case int64:
standard = val
case int32:
standard = int64(val)
case string:
if val == "" {
return nil
}
parsedValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
standard = int64(parsedValue)
default:
return fmt.Errorf("cannot unmarshal %T into gateioTime", val)
}
if standard > 9999999999 {
*a = gateioTime(time.UnixMilli(standard))
} else {
*a = gateioTime(time.Unix(standard, 0))
}
return nil
}
// Time represents a time instance.
func (a *gateioTime) Time() time.Time { return time.Time(*a) }
type gateioNumericalValue float64
// UnmarshalJSON is custom type json unmarshaller for gateioNumericalValue
func (a *gateioNumericalValue) UnmarshalJSON(data []byte) error {
var num interface{}
err := json.Unmarshal(data, &num)
if err != nil {
return err
}
switch d := num.(type) {
case float64:
*a = gateioNumericalValue(d)
case string:
if d == "" {
*a = gateioNumericalValue(0)
return nil
}
convNum, err := strconv.ParseFloat(d, 64)
if err != nil {
return err
}
*a = gateioNumericalValue(convNum)
}
return nil
}
// Float64 returns float64 value from gateioNumericalValue instance.
func (a *gateioNumericalValue) Float64() float64 { return float64(*a) }
// UnmarshalJSON to deserialize timestamp information and create OrderbookItem instance from the list of asks and bids data.
func (a *Orderbook) UnmarshalJSON(data []byte) error {
type Alias Orderbook
type askorbid struct {
Price gateioNumericalValue `json:"p"`
Size float64 `json:"s"`
}
chil := &struct {
*Alias
Current float64 `json:"current"`
Update float64 `json:"update"`
Asks []askorbid `json:"asks"`
Bids []askorbid `json:"bids"`
}{
Alias: (*Alias)(a),
}
err := json.Unmarshal(data, &chil)
if err != nil {
return err
}
a.Current = time.UnixMilli(int64(chil.Current * 1000))
a.Update = time.UnixMilli(int64(chil.Update * 1000))
a.Asks = make([]OrderbookItem, len(chil.Asks))
a.Bids = make([]OrderbookItem, len(chil.Bids))
for x := range chil.Asks {
a.Asks[x] = OrderbookItem{
Amount: chil.Asks[x].Size,
Price: chil.Asks[x].Price.Float64(),
}
}
for x := range chil.Bids {
a.Bids[x] = OrderbookItem{
Amount: chil.Bids[x].Size,
Price: chil.Bids[x].Price.Float64(),
}
}
return nil
}
// UnmarshalJSON deserialises the JSON info, including the timestamp
func (a *WsUserPersonalTrade) UnmarshalJSON(data []byte) error {
type Alias WsUserPersonalTrade
chil := &struct {
*Alias
CreateTimeMicroS float64 `json:"create_time_ms,string"`
}{
Alias: (*Alias)(a),
}
if err := json.Unmarshal(data, chil); err != nil {
return err
}
a.CreateTimeMicroS = time.UnixMicro(int64(chil.CreateTimeMicroS * 1000))
return nil
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,333 @@
package gateio
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
// delivery real trading urls
deliveryRealUSDTTradingURL = "wss://fx-ws.gateio.ws/v4/ws/delivery/usdt"
deliveryRealBTCTradingURL = "wss://fx-ws.gateio.ws/v4/ws/delivery/btc"
// delivery testnet urls
deliveryTestNetBTCTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/btc"
deliveryTestNetUSDTTradingURL = "wss://fx-ws-testnet.gateio.ws/v4/ws/delivery/usdt"
)
var defaultDeliveryFuturesSubscriptions = []string{
futuresTickersChannel,
futuresTradesChannel,
futuresOrderbookChannel,
futuresCandlesticksChannel,
}
// responseDeliveryFuturesStream a channel thought which the data coming from the two websocket connection will go through.
var responseDeliveryFuturesStream = make(chan stream.Response)
var fetchedFuturesCurrencyPairSnapshotOrderbook = make(map[string]bool)
// WsDeliveryFuturesConnect initiates a websocket connection for delivery futures account
func (g *Gateio) WsDeliveryFuturesConnect() error {
if !g.Websocket.IsEnabled() || !g.IsEnabled() {
return errors.New(stream.WebsocketNotEnabled)
}
err := g.CurrencyPairs.IsAssetEnabled(asset.DeliveryFutures)
if err != nil {
return err
}
var dialer websocket.Dialer
err = g.Websocket.SetWebsocketURL(deliveryRealUSDTTradingURL, false, true)
if err != nil {
return err
}
err = g.Websocket.Conn.Dial(&dialer, http.Header{})
if err != nil {
return err
}
err = g.Websocket.SetupNewConnection(stream.ConnectionSetup{
URL: deliveryRealBTCTradingURL,
RateLimit: gateioWebsocketRateLimit,
ResponseCheckTimeout: g.Config.WebsocketResponseCheckTimeout,
ResponseMaxLimit: g.Config.WebsocketResponseMaxLimit,
Authenticated: true,
})
if err != nil {
return err
}
err = g.Websocket.AuthConn.Dial(&dialer, http.Header{})
if err != nil {
return err
}
g.Websocket.Wg.Add(3)
go g.wsReadDeliveryFuturesData()
go g.wsFunnelDeliveryFuturesConnectionData(g.Websocket.Conn)
go g.wsFunnelDeliveryFuturesConnectionData(g.Websocket.AuthConn)
if g.Verbose {
log.Debugf(log.ExchangeSys, "successful connection to %v\n",
g.Websocket.GetWebsocketURL())
}
pingMessage, err := json.Marshal(WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Time: time.Now().Unix(),
Channel: futuresPingChannel,
})
if err != nil {
return err
}
g.Websocket.Conn.SetupPingHandler(stream.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: websocket.PingMessage,
Message: pingMessage,
})
return nil
}
// wsReadDeliveryFuturesData read coming messages thought the websocket connection and pass the data to wsHandleFuturesData for further process.
func (g *Gateio) wsReadDeliveryFuturesData() {
defer g.Websocket.Wg.Done()
for {
select {
case <-g.Websocket.ShutdownC:
select {
case resp := <-responseDeliveryFuturesStream:
err := g.wsHandleFuturesData(resp.Raw, asset.DeliveryFutures)
if err != nil {
select {
case g.Websocket.DataHandler <- err:
default:
log.Errorf(log.WebsocketMgr, "%s websocket handle data error: %v", g.Name, err)
}
}
default:
}
return
case resp := <-responseDeliveryFuturesStream:
err := g.wsHandleFuturesData(resp.Raw, asset.DeliveryFutures)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
}
// wsFunnelDeliveryFuturesConnectionData receives data from multiple connection and pass the data
// to wsRead through a channel responseStream
func (g *Gateio) wsFunnelDeliveryFuturesConnectionData(ws stream.Connection) {
defer g.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
if resp.Raw == nil {
return
}
responseDeliveryFuturesStream <- stream.Response{Raw: resp.Raw}
}
}
// GenerateDeliveryFuturesDefaultSubscriptions returns delivery futures default subscriptions params.
func (g *Gateio) GenerateDeliveryFuturesDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
_, err := g.GetCredentials(context.Background())
if err != nil {
g.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
channelsToSubscribe := defaultDeliveryFuturesSubscriptions
if g.Websocket.CanUseAuthenticatedEndpoints() {
channelsToSubscribe = append(
channelsToSubscribe,
futuresOrdersChannel,
futuresUserTradesChannel,
futuresBalancesChannel,
)
}
pairs, err := g.GetAvailablePairs(asset.DeliveryFutures)
if err != nil {
return nil, err
}
var subscriptions []stream.ChannelSubscription
for i := range channelsToSubscribe {
for j := range pairs {
params := make(map[string]interface{})
switch channelsToSubscribe[i] {
case futuresOrderbookChannel:
params["limit"] = 20
params["interval"] = "0"
case futuresCandlesticksChannel:
params["interval"] = kline.FiveMin
}
fpair, err := g.FormatExchangeCurrency(pairs[j], asset.DeliveryFutures)
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, stream.ChannelSubscription{
Channel: channelsToSubscribe[i],
Currency: fpair.Upper(),
Params: params,
})
}
}
return subscriptions, nil
}
// DeliveryFuturesSubscribe sends a websocket message to stop receiving data from the channel
func (g *Gateio) DeliveryFuturesSubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleDeliveryFuturesSubscription("subscribe", channelsToUnsubscribe)
}
// DeliveryFuturesUnsubscribe sends a websocket message to stop receiving data from the channel
func (g *Gateio) DeliveryFuturesUnsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleDeliveryFuturesSubscription("unsubscribe", channelsToUnsubscribe)
}
// handleDeliveryFuturesSubscription sends a websocket message to receive data from the channel
func (g *Gateio) handleDeliveryFuturesSubscription(event string, channelsToSubscribe []stream.ChannelSubscription) error {
payloads, err := g.generateDeliveryFuturesPayload(event, channelsToSubscribe)
if err != nil {
return err
}
var errs error
var respByte []byte
// con represents the websocket connection. 0 - for usdt settle and 1 - for btc settle connections.
for con, val := range payloads {
for k := range val {
if con == 0 {
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(val[k].ID, val[k])
} else {
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(val[k].ID, val[k])
}
if err != nil {
errs = common.AppendError(errs, err)
continue
}
var resp WsEventResponse
if err = json.Unmarshal(respByte, &resp); err != nil {
errs = common.AppendError(errs, err)
} else {
if resp.Error != nil && resp.Error.Code != 0 {
errs = common.AppendError(errs, fmt.Errorf("error while %s to channel %s error code: %d message: %s", val[k].Event, val[k].Channel, resp.Error.Code, resp.Error.Message))
continue
}
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
}
}
}
return errs
}
func (g *Gateio) generateDeliveryFuturesPayload(event string, channelsToSubscribe []stream.ChannelSubscription) ([2][]WsInput, error) {
if len(channelsToSubscribe) == 0 {
return [2][]WsInput{}, errors.New("cannot generate payload, no channels supplied")
}
var creds *account.Credentials
var err error
if g.Websocket.CanUseAuthenticatedEndpoints() {
creds, err = g.GetCredentials(context.TODO())
if err != nil {
g.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
payloads := [2][]WsInput{}
for i := range channelsToSubscribe {
var auth *WsAuthInput
timestamp := time.Now()
var params []string
params = []string{channelsToSubscribe[i].Currency.String()}
if g.Websocket.CanUseAuthenticatedEndpoints() {
switch channelsToSubscribe[i].Channel {
case futuresOrdersChannel, futuresUserTradesChannel,
futuresLiquidatesChannel, futuresAutoDeleveragesChannel,
futuresAutoPositionCloseChannel, futuresBalancesChannel,
futuresReduceRiskLimitsChannel, futuresPositionsChannel,
futuresAutoOrdersChannel:
value, ok := channelsToSubscribe[i].Params["user"].(string)
if ok {
params = append(
[]string{value},
params...)
}
var sigTemp string
sigTemp, err = g.generateWsSignature(creds.Secret, event, channelsToSubscribe[i].Channel, timestamp)
if err != nil {
return [2][]WsInput{}, err
}
auth = &WsAuthInput{
Method: "api_key",
Key: creds.Key,
Sign: sigTemp,
}
}
}
frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval)
if okay {
var frequencyString string
frequencyString, err = g.GetIntervalString(frequency)
if err != nil {
return payloads, err
}
params = append(params, frequencyString)
}
levelString, okay := channelsToSubscribe[i].Params["level"].(string)
if okay {
params = append(params, levelString)
}
limit, okay := channelsToSubscribe[i].Params["limit"].(int)
if okay {
params = append(params, strconv.Itoa(limit))
}
accuracy, okay := channelsToSubscribe[i].Params["accuracy"].(string)
if okay {
params = append(params, accuracy)
}
switch channelsToSubscribe[i].Channel {
case futuresCandlesticksChannel:
interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if okay {
var intervalString string
intervalString, err = g.GetIntervalString(interval)
if err != nil {
return payloads, err
}
params = append([]string{intervalString}, params...)
}
case futuresOrderbookChannel:
intervalString, okay := channelsToSubscribe[i].Params["interval"].(string)
if okay {
params = append(params, intervalString)
}
}
if strings.HasPrefix(channelsToSubscribe[i].Currency.Quote.Upper().String(), "USDT") {
payloads[0] = append(payloads[0], WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Event: event,
Channel: channelsToSubscribe[i].Channel,
Payload: params,
Auth: auth,
Time: timestamp.Unix(),
})
} else {
payloads[1] = append(payloads[1], WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Event: event,
Channel: channelsToSubscribe[i].Channel,
Payload: params,
Auth: auth,
Time: timestamp.Unix(),
})
}
}
return payloads, nil
}

View File

@@ -0,0 +1,870 @@
package gateio
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
futuresWebsocketBtcURL = "wss://fx-ws.gateio.ws/v4/ws/btc"
futuresWebsocketUsdtURL = "wss://fx-ws.gateio.ws/v4/ws/usdt"
futuresPingChannel = "futures.ping"
futuresTickersChannel = "futures.tickers"
futuresTradesChannel = "futures.trades"
futuresOrderbookChannel = "futures.order_book"
futuresOrderbookTickerChannel = "futures.book_ticker"
futuresOrderbookUpdateChannel = "futures.order_book_update"
futuresCandlesticksChannel = "futures.candlesticks"
futuresOrdersChannel = "futures.orders"
// authenticated channels
futuresUserTradesChannel = "futures.usertrades"
futuresLiquidatesChannel = "futures.liquidates"
futuresAutoDeleveragesChannel = "futures.auto_deleverages"
futuresAutoPositionCloseChannel = "futures.position_closes"
futuresBalancesChannel = "futures.balances"
futuresReduceRiskLimitsChannel = "futures.reduce_risk_limits"
futuresPositionsChannel = "futures.positions"
futuresAutoOrdersChannel = "futures.autoorders"
)
var defaultFuturesSubscriptions = []string{
futuresTickersChannel,
futuresTradesChannel,
futuresOrderbookChannel,
futuresOrderbookUpdateChannel,
futuresCandlesticksChannel,
}
// responseFuturesStream a channel thought which the data coming from the two websocket connection will go through.
var responseFuturesStream = make(chan stream.Response)
// WsFuturesConnect initiates a websocket connection for futures account
func (g *Gateio) WsFuturesConnect() error {
if !g.Websocket.IsEnabled() || !g.IsEnabled() {
return errors.New(stream.WebsocketNotEnabled)
}
err := g.CurrencyPairs.IsAssetEnabled(asset.Futures)
if err != nil {
return err
}
var dialer websocket.Dialer
err = g.Websocket.SetWebsocketURL(futuresWebsocketUsdtURL, false, true)
if err != nil {
return err
}
err = g.Websocket.Conn.Dial(&dialer, http.Header{})
if err != nil {
return err
}
err = g.Websocket.SetupNewConnection(stream.ConnectionSetup{
URL: futuresWebsocketBtcURL,
RateLimit: gateioWebsocketRateLimit,
ResponseCheckTimeout: g.Config.WebsocketResponseCheckTimeout,
ResponseMaxLimit: g.Config.WebsocketResponseMaxLimit,
Authenticated: true,
})
if err != nil {
return err
}
err = g.Websocket.AuthConn.Dial(&dialer, http.Header{})
if err != nil {
return err
}
g.Websocket.Wg.Add(3)
go g.wsReadFuturesData()
go g.wsFunnelFuturesConnectionData(g.Websocket.Conn)
go g.wsFunnelFuturesConnectionData(g.Websocket.AuthConn)
if g.Verbose {
log.Debugf(log.ExchangeSys, "Successful connection to %v\n",
g.Websocket.GetWebsocketURL())
}
pingMessage, err := json.Marshal(WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Time: func() int64 {
return time.Now().Unix()
}(),
Channel: futuresPingChannel,
})
if err != nil {
return err
}
g.Websocket.Conn.SetupPingHandler(stream.PingHandler{
Websocket: true,
MessageType: websocket.PingMessage,
Delay: time.Second * 15,
Message: pingMessage,
})
return nil
}
// GenerateFuturesDefaultSubscriptions returns default subscriptions information.
func (g *Gateio) GenerateFuturesDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
channelsToSubscribe := defaultFuturesSubscriptions
if g.Websocket.CanUseAuthenticatedEndpoints() {
channelsToSubscribe = append(channelsToSubscribe,
futuresOrdersChannel,
futuresUserTradesChannel,
futuresBalancesChannel,
)
}
pairs, err := g.GetEnabledPairs(asset.Futures)
if err != nil {
return nil, err
}
subscriptions := make([]stream.ChannelSubscription, len(channelsToSubscribe)*len(pairs))
count := 0
for i := range channelsToSubscribe {
for j := range pairs {
params := make(map[string]interface{})
switch channelsToSubscribe[i] {
case futuresOrderbookChannel:
params["limit"] = 100
params["interval"] = "0"
case futuresCandlesticksChannel:
params["interval"] = kline.FiveMin
case futuresOrderbookUpdateChannel:
params["frequency"] = kline.ThousandMilliseconds
params["level"] = "100"
}
fpair, err := g.FormatExchangeCurrency(pairs[j], asset.Futures)
if err != nil {
return nil, err
}
subscriptions[count] = stream.ChannelSubscription{
Channel: channelsToSubscribe[i],
Currency: fpair.Upper(),
Params: params,
}
count++
}
}
return subscriptions, nil
}
// FuturesSubscribe sends a websocket message to stop receiving data from the channel
func (g *Gateio) FuturesSubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleFuturesSubscription("subscribe", channelsToUnsubscribe)
}
// FuturesUnsubscribe sends a websocket message to stop receiving data from the channel
func (g *Gateio) FuturesUnsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleFuturesSubscription("unsubscribe", channelsToUnsubscribe)
}
// wsReadFuturesData read coming messages thought the websocket connection and pass the data to wsHandleData for further process.
func (g *Gateio) wsReadFuturesData() {
defer g.Websocket.Wg.Done()
for {
select {
case <-g.Websocket.ShutdownC:
select {
case resp := <-responseFuturesStream:
err := g.wsHandleFuturesData(resp.Raw, asset.Futures)
if err != nil {
select {
case g.Websocket.DataHandler <- err:
default:
log.Errorf(log.WebsocketMgr, "%s websocket handle data error: %v", g.Name, err)
}
}
default:
}
return
case resp := <-responseFuturesStream:
err := g.wsHandleFuturesData(resp.Raw, asset.Futures)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
}
// wsFunnelFuturesConnectionData receives data from multiple connection and pass the data
// to wsRead through a channel responseStream
func (g *Gateio) wsFunnelFuturesConnectionData(ws stream.Connection) {
defer g.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
if resp.Raw == nil {
return
}
responseFuturesStream <- stream.Response{Raw: resp.Raw}
}
}
func (g *Gateio) wsHandleFuturesData(respRaw []byte, assetType asset.Item) error {
var result WsResponse
var eventResponse WsEventResponse
err := json.Unmarshal(respRaw, &eventResponse)
if err == nil &&
(eventResponse.Result != nil || eventResponse.Error != nil) &&
(eventResponse.Event == "subscribe" || eventResponse.Event == "unsubscribe") {
if !g.Websocket.Match.IncomingWithData(eventResponse.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", eventResponse.ID)
}
return nil
}
err = json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
switch result.Channel {
// Futures push datas.
case futuresTickersChannel:
return g.processFuturesTickers(respRaw, assetType)
case futuresTradesChannel:
return g.processFuturesTrades(respRaw, assetType)
case futuresOrderbookChannel:
return g.processFuturesOrderbookSnapshot(result.Event, respRaw, assetType)
case futuresOrderbookTickerChannel:
return g.processFuturesOrderbookTicker(respRaw)
case futuresOrderbookUpdateChannel:
return g.processFuturesAndOptionsOrderbookUpdate(respRaw, assetType)
case futuresCandlesticksChannel:
return g.processFuturesCandlesticks(respRaw, assetType)
case futuresOrdersChannel:
return g.processFuturesOrdersPushData(respRaw, assetType)
case futuresUserTradesChannel:
return g.procesFuturesUserTrades(respRaw, assetType)
case futuresLiquidatesChannel:
return g.processFuturesLiquidatesNotification(respRaw)
case futuresAutoDeleveragesChannel:
return g.processFuturesAutoDeleveragesNotification(respRaw)
case futuresAutoPositionCloseChannel:
return g.processPositionCloseData(respRaw)
case futuresBalancesChannel:
return g.processBalancePushData(respRaw, assetType)
case futuresReduceRiskLimitsChannel:
return g.processFuturesReduceRiskLimitNotification(respRaw)
case futuresPositionsChannel:
return g.processFuturesPositionsNotification(respRaw)
case futuresAutoOrdersChannel:
return g.processFuturesAutoOrderPushData(respRaw)
default:
g.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: g.Name + stream.UnhandledMessage + string(respRaw),
}
return errors.New(stream.UnhandledMessage)
}
}
// handleFuturesSubscription sends a websocket message to receive data from the channel
func (g *Gateio) handleFuturesSubscription(event string, channelsToSubscribe []stream.ChannelSubscription) error {
payloads, err := g.generateFuturesPayload(event, channelsToSubscribe)
if err != nil {
return err
}
var errs error
var respByte []byte
// con represents the websocket connection. 0 - for usdt settle and 1 - for btc settle connections.
for con, val := range payloads {
for k := range val {
if con == 0 {
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(val[k].ID, val[k])
} else {
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(val[k].ID, val[k])
}
if err != nil {
errs = common.AppendError(errs, err)
continue
}
var resp WsEventResponse
if err = json.Unmarshal(respByte, &resp); err != nil {
errs = common.AppendError(errs, err)
} else {
if resp.Error != nil && resp.Error.Code != 0 {
errs = common.AppendError(errs, fmt.Errorf("error while %s to channel %s error code: %d message: %s", val[k].Event, val[k].Channel, resp.Error.Code, resp.Error.Message))
continue
}
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
}
}
}
if errs != nil {
return errs
}
return nil
}
func (g *Gateio) generateFuturesPayload(event string, channelsToSubscribe []stream.ChannelSubscription) ([2][]WsInput, error) {
if len(channelsToSubscribe) == 0 {
return [2][]WsInput{}, errors.New("cannot generate payload, no channels supplied")
}
var creds *account.Credentials
var err error
if g.Websocket.CanUseAuthenticatedEndpoints() {
creds, err = g.GetCredentials(context.TODO())
if err != nil {
g.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
payloads := [2][]WsInput{}
for i := range channelsToSubscribe {
var auth *WsAuthInput
timestamp := time.Now()
var params []string
params = []string{channelsToSubscribe[i].Currency.String()}
if g.Websocket.CanUseAuthenticatedEndpoints() {
switch channelsToSubscribe[i].Channel {
case futuresOrdersChannel, futuresUserTradesChannel,
futuresLiquidatesChannel, futuresAutoDeleveragesChannel,
futuresAutoPositionCloseChannel, futuresBalancesChannel,
futuresReduceRiskLimitsChannel, futuresPositionsChannel,
futuresAutoOrdersChannel:
value, ok := channelsToSubscribe[i].Params["user"].(string)
if ok {
params = append(
[]string{value},
params...)
}
var sigTemp string
sigTemp, err = g.generateWsSignature(creds.Secret, event, channelsToSubscribe[i].Channel, timestamp)
if err != nil {
return [2][]WsInput{}, err
}
auth = &WsAuthInput{
Method: "api_key",
Key: creds.Key,
Sign: sigTemp,
}
}
}
frequency, okay := channelsToSubscribe[i].Params["frequency"].(kline.Interval)
if okay {
var frequencyString string
frequencyString, err = g.GetIntervalString(frequency)
if err != nil {
return payloads, err
}
params = append(params, frequencyString)
}
levelString, okay := channelsToSubscribe[i].Params["level"].(string)
if okay {
params = append(params, levelString)
}
limit, okay := channelsToSubscribe[i].Params["limit"].(int)
if okay {
params = append(params, strconv.Itoa(limit))
}
accuracy, okay := channelsToSubscribe[i].Params["accuracy"].(string)
if okay {
params = append(params, accuracy)
}
switch channelsToSubscribe[i].Channel {
case futuresCandlesticksChannel:
interval, okay := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if okay {
var intervalString string
intervalString, err = g.GetIntervalString(interval)
if err != nil {
return payloads, err
}
params = append([]string{intervalString}, params...)
}
case futuresOrderbookChannel:
intervalString, okay := channelsToSubscribe[i].Params["interval"].(string)
if okay {
params = append(params, intervalString)
}
}
if strings.HasPrefix(channelsToSubscribe[i].Currency.Quote.Upper().String(), "USDT") {
payloads[0] = append(payloads[0], WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Event: event,
Channel: channelsToSubscribe[i].Channel,
Payload: params,
Auth: auth,
Time: timestamp.Unix(),
})
} else {
payloads[1] = append(payloads[1], WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Event: event,
Channel: channelsToSubscribe[i].Channel,
Payload: params,
Auth: auth,
Time: timestamp.Unix(),
})
}
}
return payloads, nil
}
func (g *Gateio) processFuturesTickers(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFutureTicker `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
tickerPriceDatas := make([]ticker.Price, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
tickerPriceDatas[x] = ticker.Price{
ExchangeName: g.Name,
Volume: resp.Result[x].Volume24HBase,
QuoteVolume: resp.Result[x].Volume24HQuote,
High: resp.Result[x].High24H,
Low: resp.Result[x].Low24H,
Last: resp.Result[x].Last,
AssetType: assetType,
Pair: currencyPair,
LastUpdated: time.Unix(resp.Time, 0),
}
}
g.Websocket.DataHandler <- tickerPriceDatas
return nil
}
func (g *Gateio) processFuturesTrades(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesTrades `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
trades := make([]trade.Data, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
trades[x] = trade.Data{
Timestamp: resp.Result[x].CreateTimeMs.Time(),
CurrencyPair: currencyPair,
AssetType: assetType,
Exchange: g.Name,
Price: resp.Result[x].Price,
Amount: resp.Result[x].Size,
TID: strconv.FormatInt(resp.Result[x].ID, 10),
}
}
return trade.AddTradesToBuffer(g.Name, trades...)
}
func (g *Gateio) processFuturesCandlesticks(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []FuturesCandlestick `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
klineDatas := make([]stream.KlineData, len(resp.Result))
for x := range resp.Result {
icp := strings.Split(resp.Result[x].Name, currency.UnderscoreDelimiter)
if len(icp) < 3 {
return errors.New("malformed futures candlestick websocket push data")
}
currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter))
if err != nil {
return err
}
klineDatas[x] = stream.KlineData{
Pair: currencyPair,
AssetType: assetType,
Exchange: g.Name,
StartTime: resp.Result[x].Timestamp.Time(),
Interval: icp[0],
OpenPrice: resp.Result[x].OpenPrice,
ClosePrice: resp.Result[x].ClosePrice,
HighPrice: resp.Result[x].HighestPrice,
LowPrice: resp.Result[x].LowestPrice,
Volume: resp.Result[x].Volume,
}
}
g.Websocket.DataHandler <- klineDatas
return nil
}
func (g *Gateio) processFuturesOrderbookTicker(data []byte) error {
var response WsResponse
orderbookTicker := &WsFuturesOrderbookTicker{}
response.Result = orderbookTicker
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- response
return nil
}
func (g *Gateio) processFuturesAndOptionsOrderbookUpdate(data []byte, assetType asset.Item) error {
var response WsResponse
update := &WsFuturesAndOptionsOrderbookUpdate{}
response.Result = update
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(update.ContractName)
if err != nil {
return err
}
if (assetType == asset.Options && !fetchedOptionsCurrencyPairSnapshotOrderbook[update.ContractName]) ||
(assetType != asset.Options && !fetchedFuturesCurrencyPairSnapshotOrderbook[update.ContractName]) {
orderbooks, err := g.FetchOrderbook(context.Background(), pair, assetType)
if err != nil {
return err
}
if orderbooks.LastUpdateID < update.FirstUpdatedID || orderbooks.LastUpdateID > update.LastUpdatedID {
return nil
}
err = g.Websocket.Orderbook.LoadSnapshot(orderbooks)
if err != nil {
return err
}
if assetType == asset.Options {
fetchedOptionsCurrencyPairSnapshotOrderbook[update.ContractName] = true
} else {
fetchedFuturesCurrencyPairSnapshotOrderbook[update.ContractName] = true
}
}
updates := orderbook.Update{
UpdateTime: time.UnixMilli(update.TimestampInMs),
Pair: pair,
Asset: assetType,
}
updates.Bids = make([]orderbook.Item, len(update.Bids))
updates.Asks = make([]orderbook.Item, len(update.Asks))
for x := range updates.Asks {
updates.Asks[x] = orderbook.Item{
Amount: update.Asks[x].Size,
Price: update.Asks[x].Price,
}
}
for x := range updates.Bids {
updates.Bids[x] = orderbook.Item{
Amount: update.Bids[x].Size,
Price: update.Bids[x].Price,
}
}
if len(updates.Asks) == 0 && len(updates.Bids) == 0 {
return errors.New("malformed orderbook data")
}
return g.Websocket.Orderbook.Update(&updates)
}
func (g *Gateio) processFuturesOrderbookSnapshot(event string, data []byte, assetType asset.Item) error {
if event == "all" {
var response WsResponse
snapshot := &WsFuturesOrderbookSnapshot{}
response.Result = snapshot
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(snapshot.Contract)
if err != nil {
return err
}
base := orderbook.Base{
Asset: assetType,
Exchange: g.Name,
Pair: pair,
LastUpdated: snapshot.TimestampInMs.Time(),
VerifyOrderbook: g.CanVerifyOrderbook,
}
base.Bids = make([]orderbook.Item, len(snapshot.Bids))
base.Asks = make([]orderbook.Item, len(snapshot.Asks))
for x := range base.Asks {
base.Asks[x] = orderbook.Item{
Amount: snapshot.Asks[x].Size,
Price: snapshot.Asks[x].Price,
}
}
for x := range base.Bids {
base.Bids[x] = orderbook.Item{
Amount: snapshot.Bids[x].Size,
Price: snapshot.Bids[x].Price,
}
}
return g.Websocket.Orderbook.LoadSnapshot(&base)
}
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesOrderbookUpdateEvent `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
dataMap := map[string][2][]orderbook.Item{}
for x := range resp.Result {
ab, ok := dataMap[resp.Result[x].CurrencyPair]
if !ok {
ab = [2][]orderbook.Item{}
}
if resp.Result[x].Amount > 0 {
ab[1] = append(ab[1], orderbook.Item{
Price: resp.Result[x].Price,
Amount: resp.Result[x].Amount,
})
} else {
ab[0] = append(ab[0], orderbook.Item{
Price: resp.Result[x].Price,
Amount: -resp.Result[x].Amount,
})
}
if !ok {
dataMap[resp.Result[x].CurrencyPair] = ab
}
}
if len(dataMap) == 0 {
return errors.New("missing orderbook ask and bid data")
}
for key, ab := range dataMap {
currencyPair, err := currency.NewPairFromString(key)
if err != nil {
return err
}
err = g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Asks: ab[0],
Bids: ab[1],
Asset: assetType,
Exchange: g.Name,
Pair: currencyPair,
LastUpdated: time.Unix(resp.Time, 0),
VerifyOrderbook: g.CanVerifyOrderbook,
})
if err != nil {
return err
}
}
return nil
}
func (g *Gateio) processFuturesOrdersPushData(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesOrder `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
orderDetails := make([]order.Detail, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
status, err := order.StringToOrderStatus(func() string {
if resp.Result[x].Status == "finished" {
return "cancelled"
}
return resp.Result[x].Status
}())
if err != nil {
return err
}
orderDetails[x] = order.Detail{
Amount: resp.Result[x].Size,
Exchange: g.Name,
OrderID: strconv.FormatInt(resp.Result[x].ID, 10),
Status: status,
Pair: currencyPair,
LastUpdated: resp.Result[x].FinishTimeMs.Time(),
Date: resp.Result[x].CreateTimeMs.Time(),
ExecutedAmount: resp.Result[x].Size - resp.Result[x].Left,
Price: resp.Result[x].Price,
AssetType: assetType,
AccountID: resp.Result[x].User,
CloseTime: resp.Result[x].FinishTimeMs.Time(),
}
}
g.Websocket.DataHandler <- orderDetails
return nil
}
func (g *Gateio) procesFuturesUserTrades(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesUserTrade `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
fills := make([]fill.Data, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
fills[x] = fill.Data{
Timestamp: resp.Result[x].CreateTimeMs.Time(),
Exchange: g.Name,
CurrencyPair: currencyPair,
OrderID: resp.Result[x].OrderID,
TradeID: resp.Result[x].ID,
Price: resp.Result[x].Price,
Amount: resp.Result[x].Size,
AssetType: assetType,
}
}
return g.Websocket.Fills.Update(fills...)
}
func (g *Gateio) processFuturesLiquidatesNotification(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesLiquidationNotification `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processFuturesAutoDeleveragesNotification(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesAutoDeleveragesNotification `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processPositionCloseData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsPositionClose `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processBalancePushData(data []byte, assetType asset.Item) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsBalance `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
accountChange := make([]account.Change, len(resp.Result))
for x := range resp.Result {
info := strings.Split(resp.Result[x].Text, currency.UnderscoreDelimiter)
if len(info) != 2 {
return errors.New("malformed text")
}
code := currency.NewCode(info[0])
accountChange[x] = account.Change{
Exchange: g.Name,
Currency: code,
Asset: assetType,
Amount: resp.Result[x].Balance,
Account: resp.Result[x].User,
}
}
g.Websocket.DataHandler <- accountChange
return nil
}
func (g *Gateio) processFuturesReduceRiskLimitNotification(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesReduceRiskLimitNotification `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processFuturesPositionsNotification(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesPosition `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processFuturesAutoOrderPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesAutoOrder `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}

View File

@@ -0,0 +1,780 @@
package gateio
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/fill"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
"github.com/thrasher-corp/gocryptotrader/exchanges/trade"
"github.com/thrasher-corp/gocryptotrader/log"
)
const (
optionsWebsocketURL = "wss://op-ws.gateio.live/v4/ws"
optionsWebsocketTestnetURL = "wss://op-ws-testnet.gateio.live/v4/ws"
// channels
optionsPingChannel = "options.ping"
optionsContractTickersChannel = "options.contract_tickers"
optionsUnderlyingTickersChannel = "options.ul_tickers"
optionsTradesChannel = "options.trades"
optionsUnderlyingTradesChannel = "options.ul_trades"
optionsUnderlyingPriceChannel = "options.ul_price"
optionsMarkPriceChannel = "options.mark_price"
optionsSettlementChannel = "options.settlements"
optionsContractsChannel = "options.contracts"
optionsContractCandlesticksChannel = "options.contract_candlesticks"
optionsUnderlyingCandlesticksChannel = "options.ul_candlesticks"
optionsOrderbookChannel = "options.order_book"
optionsOrderbookTickerChannel = "options.book_ticker"
optionsOrderbookUpdateChannel = "options.order_book_update"
optionsOrdersChannel = "options.orders"
optionsUserTradesChannel = "options.usertrades"
optionsLiquidatesChannel = "options.liquidates"
optionsUserSettlementChannel = "options.user_settlements"
optionsPositionCloseChannel = "options.position_closes"
optionsBalancesChannel = "options.balances"
optionsPositionsChannel = "options.positions"
)
var defaultOptionsSubscriptions = []string{
optionsContractTickersChannel,
optionsUnderlyingTickersChannel,
optionsTradesChannel,
optionsUnderlyingTradesChannel,
optionsContractCandlesticksChannel,
optionsUnderlyingCandlesticksChannel,
optionsOrderbookChannel,
optionsOrderbookUpdateChannel,
}
var fetchedOptionsCurrencyPairSnapshotOrderbook = make(map[string]bool)
// WsOptionsConnect initiates a websocket connection to options websocket endpoints.
func (g *Gateio) WsOptionsConnect() error {
if !g.Websocket.IsEnabled() || !g.IsEnabled() {
return errors.New(stream.WebsocketNotEnabled)
}
err := g.CurrencyPairs.IsAssetEnabled(asset.Options)
if err != nil {
return err
}
var dialer websocket.Dialer
err = g.Websocket.SetWebsocketURL(optionsWebsocketURL, false, true)
if err != nil {
return err
}
err = g.Websocket.Conn.Dial(&dialer, http.Header{})
if err != nil {
return err
}
pingMessage, err := json.Marshal(WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Time: time.Now().Unix(),
Channel: optionsPingChannel,
})
if err != nil {
return err
}
g.Websocket.Wg.Add(1)
go g.wsReadOptionsConnData()
g.Websocket.Conn.SetupPingHandler(stream.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: websocket.PingMessage,
Message: pingMessage,
})
return nil
}
// GenerateOptionsDefaultSubscriptions generates list of channel subscriptions for options asset type.
func (g *Gateio) GenerateOptionsDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
channelsToSubscribe := defaultOptionsSubscriptions
var userID int64
if g.Websocket.CanUseAuthenticatedEndpoints() {
var err error
_, err = g.GetCredentials(context.TODO())
if err != nil {
g.Websocket.SetCanUseAuthenticatedEndpoints(false)
goto getEnabledPairs
}
response, err := g.GetSubAccountBalances(context.Background(), "")
if err != nil {
return nil, err
}
if len(response) != 0 {
channelsToSubscribe = append(channelsToSubscribe,
optionsUserTradesChannel,
optionsBalancesChannel,
)
userID = response[0].UserID
} else if g.Verbose {
log.Errorf(log.ExchangeSys, "no subaccount found for authenticated options channel subscriptions")
}
}
getEnabledPairs:
var subscriptions []stream.ChannelSubscription
pairs, err := g.GetEnabledPairs(asset.Options)
if err != nil {
return nil, err
}
for i := range channelsToSubscribe {
for j := range pairs {
params := make(map[string]interface{})
switch channelsToSubscribe[i] {
case optionsOrderbookChannel:
params["accuracy"] = "0"
params["level"] = "20"
case optionsContractCandlesticksChannel, optionsUnderlyingCandlesticksChannel:
params["interval"] = kline.FiveMin
case optionsOrderbookUpdateChannel:
params["interval"] = kline.ThousandMilliseconds
params["level"] = "20"
case optionsOrdersChannel,
optionsUserTradesChannel,
optionsLiquidatesChannel,
optionsUserSettlementChannel,
optionsPositionCloseChannel,
optionsBalancesChannel,
optionsPositionsChannel:
if userID == 0 {
continue
}
params["user_id"] = userID
}
fpair, err := g.FormatExchangeCurrency(pairs[j], asset.Options)
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, stream.ChannelSubscription{
Channel: channelsToSubscribe[i],
Currency: fpair.Upper(),
Params: params,
})
}
}
return subscriptions, nil
}
func (g *Gateio) generateOptionsPayload(event string, channelsToSubscribe []stream.ChannelSubscription) ([]WsInput, error) {
if len(channelsToSubscribe) == 0 {
return nil, errors.New("cannot generate payload, no channels supplied")
}
var err error
var intervalString string
payloads := make([]WsInput, len(channelsToSubscribe))
for i := range channelsToSubscribe {
var auth *WsAuthInput
timestamp := time.Now()
var params []string
switch channelsToSubscribe[i].Channel {
case optionsUnderlyingTickersChannel,
optionsUnderlyingTradesChannel,
optionsUnderlyingPriceChannel,
optionsUnderlyingCandlesticksChannel:
var uly currency.Pair
uly, err = g.GetUnderlyingFromCurrencyPair(channelsToSubscribe[i].Currency)
if err != nil {
return nil, err
}
params = append(params, uly.String())
case optionsBalancesChannel:
// options.balance channel does not require underlying or contract
default:
channelsToSubscribe[i].Currency.Delimiter = currency.UnderscoreDelimiter
params = append(params, channelsToSubscribe[i].Currency.String())
}
switch channelsToSubscribe[i].Channel {
case optionsOrderbookChannel:
accuracy, ok := channelsToSubscribe[i].Params["accuracy"].(string)
if !ok {
return nil, fmt.Errorf("%w, invalid options orderbook accuracy", orderbook.ErrOrderbookInvalid)
}
level, ok := channelsToSubscribe[i].Params["level"].(string)
if !ok {
return nil, fmt.Errorf("%w, invalid options orderbook level", orderbook.ErrOrderbookInvalid)
}
params = append(
params,
level,
accuracy,
)
case optionsUserTradesChannel,
optionsBalancesChannel,
optionsOrdersChannel,
optionsLiquidatesChannel,
optionsUserSettlementChannel,
optionsPositionCloseChannel,
optionsPositionsChannel:
userID, ok := channelsToSubscribe[i].Params["user_id"].(int64)
if !ok {
continue
}
params = append([]string{strconv.FormatInt(userID, 10)}, params...)
var creds *account.Credentials
creds, err = g.GetCredentials(context.Background())
if err != nil {
return nil, err
}
var sigTemp string
sigTemp, err = g.generateWsSignature(creds.Secret, event, channelsToSubscribe[i].Channel, timestamp)
if err != nil {
return nil, err
}
auth = &WsAuthInput{
Method: "api_key",
Key: creds.Key,
Sign: sigTemp,
}
case optionsOrderbookUpdateChannel:
interval, ok := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if !ok {
return nil, fmt.Errorf("%w, missing options orderbook interval", orderbook.ErrOrderbookInvalid)
}
intervalString, err = g.GetIntervalString(interval)
if err != nil {
return nil, err
}
params = append(params,
intervalString)
if value, ok := channelsToSubscribe[i].Params["level"].(int); ok {
params = append(params, strconv.Itoa(value))
}
case optionsContractCandlesticksChannel,
optionsUnderlyingCandlesticksChannel:
interval, ok := channelsToSubscribe[i].Params["interval"].(kline.Interval)
if !ok {
return nil, errors.New("missing options underlying candlesticks interval")
}
intervalString, err = g.GetIntervalString(interval)
if err != nil {
return nil, err
}
params = append(
[]string{intervalString},
params...)
}
payloads[i] = WsInput{
ID: g.Websocket.Conn.GenerateMessageID(false),
Event: event,
Channel: channelsToSubscribe[i].Channel,
Payload: params,
Auth: auth,
Time: timestamp.Unix(),
}
}
return payloads, nil
}
// wsReadOptionsConnData receives and passes on websocket messages for processing
func (g *Gateio) wsReadOptionsConnData() {
defer g.Websocket.Wg.Done()
for {
resp := g.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
return
}
err := g.wsHandleOptionsData(resp.Raw)
if err != nil {
g.Websocket.DataHandler <- err
}
}
}
// OptionsSubscribe sends a websocket message to stop receiving data for asset type options
func (g *Gateio) OptionsSubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleOptionsSubscription("subscribe", channelsToUnsubscribe)
}
// OptionsUnsubscribe sends a websocket message to stop receiving data for asset type options
func (g *Gateio) OptionsUnsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error {
return g.handleOptionsSubscription("unsubscribe", channelsToUnsubscribe)
}
// handleOptionsSubscription sends a websocket message to receive data from the channel
func (g *Gateio) handleOptionsSubscription(event string, channelsToSubscribe []stream.ChannelSubscription) error {
payloads, err := g.generateOptionsPayload(event, channelsToSubscribe)
if err != nil {
return err
}
var errs error
for k := range payloads {
result, err := g.Websocket.Conn.SendMessageReturnResponse(payloads[k].ID, payloads[k])
if err != nil {
errs = common.AppendError(errs, err)
continue
}
var resp WsEventResponse
if err = json.Unmarshal(result, &resp); err != nil {
errs = common.AppendError(errs, err)
} else {
if resp.Error != nil && resp.Error.Code != 0 {
errs = common.AppendError(errs, fmt.Errorf("error while %s to channel %s asset type: options error code: %d message: %s", payloads[k].Event, payloads[k].Channel, resp.Error.Code, resp.Error.Message))
continue
}
if payloads[k].Event == "subscribe" {
g.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[k])
} else {
g.Websocket.RemoveSuccessfulUnsubscriptions(channelsToSubscribe[k])
}
}
}
return errs
}
func (g *Gateio) wsHandleOptionsData(respRaw []byte) error {
var result WsResponse
var eventResponse WsEventResponse
err := json.Unmarshal(respRaw, &eventResponse)
if err == nil &&
(eventResponse.Result != nil || eventResponse.Error != nil) &&
(eventResponse.Event == "subscribe" || eventResponse.Event == "unsubscribe") {
if !g.Websocket.Match.IncomingWithData(eventResponse.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", eventResponse.ID)
}
return nil
}
err = json.Unmarshal(respRaw, &result)
if err != nil {
return err
}
switch result.Channel {
case optionsContractTickersChannel:
return g.processOptionsContractTickers(respRaw)
case optionsUnderlyingTickersChannel:
return g.processOptionsUnderlyingTicker(respRaw)
case optionsTradesChannel,
optionsUnderlyingTradesChannel:
return g.processOptionsTradesPushData(respRaw)
case optionsUnderlyingPriceChannel:
return g.processOptionsUnderlyingPricePushData(respRaw)
case optionsMarkPriceChannel:
return g.processOptionsMarkPrice(respRaw)
case optionsSettlementChannel:
return g.processOptionsSettlementPushData(respRaw)
case optionsContractsChannel:
return g.processOptionsContractPushData(respRaw)
case optionsContractCandlesticksChannel,
optionsUnderlyingCandlesticksChannel:
return g.processOptionsCandlestickPushData(respRaw)
case optionsOrderbookChannel:
return g.processOptionsOrderbookSnapshotPushData(result.Event, respRaw)
case optionsOrderbookTickerChannel:
return g.processOrderbookTickerPushData(respRaw)
case optionsOrderbookUpdateChannel:
return g.processFuturesAndOptionsOrderbookUpdate(respRaw, asset.Options)
case optionsOrdersChannel:
return g.processOptionsOrderPushData(respRaw)
case optionsUserTradesChannel:
return g.processOptionsUserTradesPushData(respRaw)
case optionsLiquidatesChannel:
return g.processOptionsLiquidatesPushData(respRaw)
case optionsUserSettlementChannel:
return g.processOptionsUsersPersonalSettlementsPushData(respRaw)
case optionsPositionCloseChannel:
return g.processPositionCloseData(respRaw)
case optionsBalancesChannel:
return g.processBalancePushData(respRaw, asset.Options)
case optionsPositionsChannel:
return g.processOptionsPositionPushData(respRaw)
default:
g.Websocket.DataHandler <- stream.UnhandledMessageWarning{
Message: g.Name + stream.UnhandledMessage + string(respRaw),
}
return errors.New(stream.UnhandledMessage)
}
}
func (g *Gateio) processOptionsContractTickers(data []byte) error {
var response WsResponse
tickerData := OptionsTicker{}
response.Result = &tickerData
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
currencyPair, err := currency.NewPairFromString(tickerData.Name)
if err != nil {
return err
}
g.Websocket.DataHandler <- &ticker.Price{
Pair: currencyPair,
Last: tickerData.LastPrice.Float64(),
Bid: tickerData.Bid1Price,
Ask: tickerData.Ask1Price,
AskSize: tickerData.Ask1Size,
BidSize: tickerData.Bid1Size,
ExchangeName: g.Name,
AssetType: asset.Options,
}
return nil
}
func (g *Gateio) processOptionsUnderlyingTicker(data []byte) error {
var response WsResponse
response.Result = &WsOptionUnderlyingTicker{}
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsTradesPushData(data []byte) error {
saveTradeData := g.IsSaveTradeDataEnabled()
if !saveTradeData &&
!g.IsTradeFeedEnabled() {
return nil
}
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsTrades `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
trades := make([]trade.Data, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
trades[x] = trade.Data{
Timestamp: resp.Result[x].CreateTimeMs.Time(),
CurrencyPair: currencyPair,
AssetType: asset.Options,
Exchange: g.Name,
Price: resp.Result[x].Price,
Amount: resp.Result[x].Size,
TID: strconv.FormatInt(resp.Result[x].ID, 10),
}
}
return g.Websocket.Trade.Update(saveTradeData, trades...)
}
func (g *Gateio) processOptionsUnderlyingPricePushData(data []byte) error {
var response WsResponse
priceD := WsOptionsUnderlyingPrice{}
response.Result = &priceD
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsMarkPrice(data []byte) error {
var response WsResponse
markPrice := WsOptionsMarkPrice{}
response.Result = &markPrice
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsSettlementPushData(data []byte) error {
var response WsResponse
settlementData := WsOptionsSettlement{}
response.Result = &settlementData
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsContractPushData(data []byte) error {
var response WsResponse
contractData := WsOptionsContract{}
response.Result = &contractData
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsCandlestickPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsContractCandlestick `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
klineDatas := make([]stream.KlineData, len(resp.Result))
for x := range resp.Result {
icp := strings.Split(resp.Result[x].NameOfSubscription, currency.UnderscoreDelimiter)
if len(icp) < 3 {
return errors.New("malformed options candlestick websocket push data")
}
currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter))
if err != nil {
return err
}
klineDatas[x] = stream.KlineData{
Pair: currencyPair,
AssetType: asset.Options,
Exchange: g.Name,
StartTime: time.Unix(resp.Result[x].Timestamp, 0),
Interval: icp[0],
OpenPrice: resp.Result[x].OpenPrice,
ClosePrice: resp.Result[x].ClosePrice,
HighPrice: resp.Result[x].HighestPrice,
LowPrice: resp.Result[x].LowestPrice,
Volume: resp.Result[x].Amount,
}
}
g.Websocket.DataHandler <- klineDatas
return nil
}
func (g *Gateio) processOrderbookTickerPushData(data []byte) error {
var response WsResponse
orderbookTicker := WsOptionsOrderbookTicker{}
response.Result = &orderbookTicker
err := json.Unmarshal(data, &orderbookTicker)
if err != nil {
return err
}
g.Websocket.DataHandler <- &response
return nil
}
func (g *Gateio) processOptionsOrderbookSnapshotPushData(event string, data []byte) error {
if event == "all" {
var response WsResponse
snapshot := WsOptionsOrderbookSnapshot{}
response.Result = &snapshot
err := json.Unmarshal(data, &response)
if err != nil {
return err
}
pair, err := currency.NewPairFromString(snapshot.Contract)
if err != nil {
return err
}
base := orderbook.Base{
Asset: asset.Options,
Exchange: g.Name,
Pair: pair,
LastUpdated: snapshot.Timestamp.Time(),
VerifyOrderbook: g.CanVerifyOrderbook,
}
base.Asks = make([]orderbook.Item, len(snapshot.Asks))
base.Bids = make([]orderbook.Item, len(snapshot.Bids))
for x := range base.Asks {
base.Asks[x] = orderbook.Item{
Amount: snapshot.Asks[x].Size,
Price: snapshot.Asks[x].Price,
}
}
for x := range base.Bids {
base.Bids[x] = orderbook.Item{
Amount: snapshot.Bids[x].Size,
Price: snapshot.Bids[x].Price,
}
}
return g.Websocket.Orderbook.LoadSnapshot(&base)
}
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsFuturesOrderbookUpdateEvent `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
dataMap := map[string][2][]orderbook.Item{}
for x := range resp.Result {
ab, ok := dataMap[resp.Result[x].CurrencyPair]
if !ok {
ab = [2][]orderbook.Item{}
}
if resp.Result[x].Amount > 0 {
ab[1] = append(ab[1], orderbook.Item{
Price: resp.Result[x].Price,
Amount: resp.Result[x].Amount,
})
} else {
ab[0] = append(ab[0], orderbook.Item{
Price: resp.Result[x].Price,
Amount: -resp.Result[x].Amount,
})
}
if !ok {
dataMap[resp.Result[x].CurrencyPair] = ab
}
}
if len(dataMap) == 0 {
return errors.New("missing orderbook ask and bid data")
}
for key, ab := range dataMap {
currencyPair, err := currency.NewPairFromString(key)
if err != nil {
return err
}
err = g.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{
Asks: ab[0],
Bids: ab[1],
Asset: asset.Options,
Exchange: g.Name,
Pair: currencyPair,
LastUpdated: time.Unix(resp.Time, 0),
VerifyOrderbook: g.CanVerifyOrderbook,
})
if err != nil {
return err
}
}
return nil
}
func (g *Gateio) processOptionsOrderPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsOrder `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
orderDetails := make([]order.Detail, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
status, err := order.StringToOrderStatus(func() string {
if resp.Result[x].Status == "finished" {
return "cancelled"
}
return resp.Result[x].Status
}())
if err != nil {
return err
}
orderDetails[x] = order.Detail{
Amount: resp.Result[x].Size,
Exchange: g.Name,
OrderID: strconv.FormatInt(resp.Result[x].ID, 10),
Status: status,
Pair: currencyPair,
Date: resp.Result[x].CreationTimeMs.Time(),
ExecutedAmount: resp.Result[x].Size - resp.Result[x].Left,
Price: resp.Result[x].Price,
AssetType: asset.Options,
AccountID: resp.Result[x].User,
}
}
g.Websocket.DataHandler <- orderDetails
return nil
}
func (g *Gateio) processOptionsUserTradesPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsUserTrade `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
fills := make([]fill.Data, len(resp.Result))
for x := range resp.Result {
currencyPair, err := currency.NewPairFromString(resp.Result[x].Contract)
if err != nil {
return err
}
fills[x] = fill.Data{
Timestamp: resp.Result[x].CreateTimeMs.Time(),
Exchange: g.Name,
CurrencyPair: currencyPair,
OrderID: resp.Result[x].OrderID,
TradeID: resp.Result[x].ID,
Price: resp.Result[x].Price,
Amount: resp.Result[x].Size,
}
}
return g.Websocket.Fills.Update(fills...)
}
func (g *Gateio) processOptionsLiquidatesPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsLiquidates `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processOptionsUsersPersonalSettlementsPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsUserSettlement `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}
func (g *Gateio) processOptionsPositionPushData(data []byte) error {
resp := struct {
Time int64 `json:"time"`
Channel string `json:"channel"`
Event string `json:"event"`
Result []WsOptionsPosition `json:"result"`
}{}
err := json.Unmarshal(data, &resp)
if err != nil {
return err
}
g.Websocket.DataHandler <- &resp
return nil
}

View File

@@ -0,0 +1,118 @@
package gateio
import (
"context"
"fmt"
"time"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"golang.org/x/time/rate"
)
// GateIO endpoints limits.
const (
spotDefaultEPL request.EndpointLimit = iota
spotPrivateEPL
spotPlaceOrdersEPL
spotCancelOrdersEPL
perpetualSwapDefaultEPL
perpetualSwapPlaceOrdersEPL
perpetualSwapPrivateEPL
perpetualSwapCancelOrdersEPL
walletEPL
withdrawalEPL
// Request rates per interval
spotPublicRate = 900
spotPrivateRate = 900
spotPlaceOrdersRate = 10
spotCancelOrdersRate = 500
perpetualSwapPublicRate = 300
perpetualSwapPlaceOrdersRate = 100
perpetualSwapPrivateRate = 400
perpetualSwapCancelOrdersRate = 400
walletRate = 200
withdrawalRate = 1
// interval
oneSecondInterval = time.Second
threeSecondsInterval = time.Second * 3
)
// RateLimitter represents a rate limiter structure for gateIO endpoints.
type RateLimitter struct {
SpotDefault *rate.Limiter
SpotPrivate *rate.Limiter
SpotPlaceOrders *rate.Limiter
SpotCancelOrders *rate.Limiter
PerpetualSwapDefault *rate.Limiter
PerpetualSwapPlaceOrders *rate.Limiter
PerpetualSwapPrivate *rate.Limiter
PerpetualSwapCancelOrders *rate.Limiter
Wallet *rate.Limiter
Withdrawal *rate.Limiter
}
// Limit executes rate limiting functionality
// implements the request.Limiter interface
func (r *RateLimitter) Limit(ctx context.Context, epl request.EndpointLimit) error {
var limiter *rate.Limiter
var tokens int
switch epl {
case spotDefaultEPL:
limiter, tokens = r.SpotDefault, 1
case spotPrivateEPL:
return r.SpotPrivate.Wait(ctx)
case spotPlaceOrdersEPL:
return r.SpotPlaceOrders.Wait(ctx)
case spotCancelOrdersEPL:
return r.SpotCancelOrders.Wait(ctx)
case perpetualSwapDefaultEPL:
limiter, tokens = r.PerpetualSwapDefault, 1
case perpetualSwapPlaceOrdersEPL:
return r.PerpetualSwapPlaceOrders.Wait(ctx)
case perpetualSwapPrivateEPL:
return r.PerpetualSwapPrivate.Wait(ctx)
case perpetualSwapCancelOrdersEPL:
return r.PerpetualSwapCancelOrders.Wait(ctx)
case walletEPL:
return r.Wallet.Wait(ctx)
case withdrawalEPL:
return r.Withdrawal.Wait(ctx)
default:
}
var finalDelay time.Duration
var reserves = make([]*rate.Reservation, tokens)
for i := 0; i < tokens; i++ {
reserves[i] = limiter.Reserve()
finalDelay = reserves[i].Delay()
}
if dl, ok := ctx.Deadline(); ok && dl.Before(time.Now().Add(finalDelay)) {
for x := range reserves {
reserves[x].Cancel()
}
return fmt.Errorf("rate limit delay of %s will exceed deadline: %w",
finalDelay,
context.DeadlineExceeded)
}
time.Sleep(finalDelay)
return nil
}
// SetRateLimit returns the rate limiter for the exchange
func SetRateLimit() *RateLimitter {
return &RateLimitter{
SpotDefault: request.NewRateLimit(oneSecondInterval, spotPublicRate),
SpotPrivate: request.NewRateLimit(oneSecondInterval, spotPrivateRate),
SpotPlaceOrders: request.NewRateLimit(oneSecondInterval, spotPlaceOrdersRate),
SpotCancelOrders: request.NewRateLimit(oneSecondInterval, spotCancelOrdersRate),
PerpetualSwapDefault: request.NewRateLimit(oneSecondInterval, perpetualSwapPublicRate),
PerpetualSwapPlaceOrders: request.NewRateLimit(oneSecondInterval, perpetualSwapPlaceOrdersRate),
PerpetualSwapPrivate: request.NewRateLimit(oneSecondInterval, perpetualSwapPrivateRate),
PerpetualSwapCancelOrders: request.NewRateLimit(oneSecondInterval, perpetualSwapCancelOrdersRate),
Wallet: request.NewRateLimit(oneSecondInterval, walletRate),
Withdrawal: request.NewRateLimit(threeSecondsInterval, withdrawalRate),
}
}

View File

@@ -251,6 +251,12 @@ func (k *Item) FormatDates() {
// durationToWord returns english version of interval
func durationToWord(in Interval) string {
switch in {
case HundredMilliseconds:
return "hundredmillisec"
case ThousandMilliseconds:
return "thousandmillisec"
case TenSecond:
return "tensec"
case FifteenSecond:
return "fifteensecond"
case OneMin:
@@ -291,6 +297,10 @@ func durationToWord(in Interval) string {
return "twoweek"
case OneMonth:
return "onemonth"
case ThreeMonth:
return "threemonth"
case SixMonth:
return "sixmonth"
case OneYear:
return "oneyear"
default:

View File

@@ -153,6 +153,18 @@ func TestDurationToWord(t *testing.T) {
name string
interval Interval
}{
{
"hundredmillisec",
HundredMilliseconds,
},
{
"thousandmillisec",
ThousandMilliseconds,
},
{
"tensec",
TenSecond,
},
{
"FifteenSecond",
FifteenSecond,
@@ -233,6 +245,14 @@ func TestDurationToWord(t *testing.T) {
"OneMonth",
OneMonth,
},
{
"ThreeMonth",
ThreeMonth,
},
{
"SixMonth",
SixMonth,
},
{
"OneYear",
OneYear,

View File

@@ -11,32 +11,36 @@ import (
// Consts here define basic time intervals
const (
FifteenSecond = Interval(15 * time.Second)
OneMin = Interval(time.Minute)
ThreeMin = 3 * OneMin
FiveMin = 5 * OneMin
TenMin = 10 * OneMin
FifteenMin = 15 * OneMin
ThirtyMin = 30 * OneMin
OneHour = Interval(time.Hour)
TwoHour = 2 * OneHour
ThreeHour = 3 * OneHour
FourHour = 4 * OneHour
SixHour = 6 * OneHour
EightHour = 8 * OneHour
TwelveHour = 12 * OneHour
OneDay = 24 * OneHour
TwoDay = 2 * OneDay
ThreeDay = 3 * OneDay
FiveDay = 5 * OneDay
SevenDay = 7 * OneDay
FifteenDay = 15 * OneDay
OneWeek = 7 * OneDay
TwoWeek = 2 * OneWeek
OneMonth = 30 * OneDay
ThreeMonth = 3 * OneMonth
SixMonth = 6 * OneMonth
OneYear = 365 * OneDay
HundredMilliseconds = Interval(100 * time.Millisecond)
ThousandMilliseconds = 10 * HundredMilliseconds
TenSecond = Interval(10 * time.Second)
FifteenSecond = Interval(15 * time.Second)
ThirtySecond = 2 * FifteenSecond
OneMin = Interval(time.Minute)
ThreeMin = 3 * OneMin
FiveMin = 5 * OneMin
TenMin = 10 * OneMin
FifteenMin = 15 * OneMin
ThirtyMin = 30 * OneMin
OneHour = Interval(time.Hour)
TwoHour = 2 * OneHour
ThreeHour = 3 * OneHour
FourHour = 4 * OneHour
SixHour = 6 * OneHour
EightHour = 8 * OneHour
TwelveHour = 12 * OneHour
OneDay = 24 * OneHour
TwoDay = 2 * OneDay
ThreeDay = 3 * OneDay
SevenDay = 7 * OneDay
FifteenDay = 15 * OneDay
OneWeek = 7 * OneDay
TwoWeek = 2 * OneWeek
OneMonth = 30 * OneDay
ThreeMonth = 90 * OneDay
SixMonth = 2 * ThreeMonth
OneYear = 365 * OneDay
FiveDay = 5 * OneDay
)
var (
@@ -84,6 +88,9 @@ var (
// SupportedIntervals is a list of all supported intervals
SupportedIntervals = []Interval{
HundredMilliseconds,
ThousandMilliseconds,
TenSecond,
FifteenSecond,
OneMin,
ThreeMin,
@@ -100,6 +107,7 @@ var (
TwelveHour,
OneDay,
ThreeDay,
FiveDay,
SevenDay,
FifteenDay,
OneWeek,
@@ -108,6 +116,8 @@ var (
ThreeMonth,
SixMonth,
OneYear,
ThreeMonth,
SixMonth,
}
)

View File

@@ -236,7 +236,7 @@ func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRe
}
if resp.StatusCode < http.StatusOK ||
resp.StatusCode > http.StatusAccepted {
resp.StatusCode > http.StatusNoContent {
return fmt.Errorf("%s unsuccessful HTTP status code: %d raw response: %s",
r.name,
resp.StatusCode,

File diff suppressed because one or more lines are too long