Engine QA (#367)

* Improved error message when no config is set on startup

* Change inccorect error wording

* bump Bitfinex websocket orderbook return length to max

* temporary fix of incorrect orderbook updates, limit to bid and ask len of 100, will be extended later if needed

* Fixed issue in binance websocket that appended 0 volume bid/ask items

* Fix panic when unmarshalling an empty pair from config

* Add get pair asset method for exchange base
Fix Bitmex orderbook stream
Unbuffer Bitmex orderbook stream

* force syncer to update ticker instead of fetch, which allows a stream

* Fix websocket last price for coinbasepro

* fix websocket ticker for coinut

* Fix websocket orderbook stream Huobi

* increase orderbook depth REST for Huobi

* Fix websocket support and ensure data integrity

* Fix time parsing issue after error checks

* check error, only process enabled currency pairs, signal websocket data processing

* expanded websocket functionality for okgroup

* Add logic to not process zero length slice for orderbooks

* fix websocket ticker only updating enabled and individual book updates

* ZB fixes to order submission/retrieval/cancellation w/ general fixes

* Quiet unnecessary warning

* updated config entry values for REST and websocket (initial hack until I come up with a better solution for asset types)

* Ch GetName function to field access modifyer & rm useless code

* Add in error I missed

* Nits addressed

* some more fixes

* Turned kraken default websocket to true and some small changes

* fixes linter issues

* Ensured okgroup books and sent update through to datahandler. Zb update as well.

* Add test case to get asset type from pair

* Add test for pairs unmarshal

* Add testing and addressed nits

* FIX linter issue

* Addressed Gees nits

* Thanks glorious spotter

* more nitorinos

* Addres even more nits

* Add stringerino 4000

* Fix for panic cause by sort slice out of range, also nits addressed

* fix linter issues

* Changed from function to field access

* Changed from function to field access

* fix for orderbook update panic, removes quick fix - caused by sync item fetching through same protocol

* Add new test and update random generator

* pass in invalid string to future ob fetching, due to futures contract expire and a http 400 error is returned
This commit is contained in:
Ryan O'Hara-Reid
2019-11-04 15:34:30 +11:00
committed by Adrian Gallagher
parent e2c349424f
commit 22ff33cd54
53 changed files with 1813 additions and 1074 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler"
log "github.com/thrasher-corp/gocryptotrader/logger"
)
@@ -307,11 +308,35 @@ func (o *OKGroup) GetSpotTokenPairDetails() (resp []GetSpotTokenPairDetailsRespo
return resp, o.SendHTTPRequest(http.MethodGet, okGroupTokenSubsection, OKGroupInstruments, nil, &resp, false)
}
// GetSpotOrderBook Getting the order book of a trading pair. Pagination is not supported here.
// The whole book will be returned for one request. Websocket is recommended here.
func (o *OKGroup) GetSpotOrderBook(request GetSpotOrderBookRequest) (resp GetSpotOrderBookResponse, _ error) {
requestURL := fmt.Sprintf("%v/%v/%v%v", OKGroupInstruments, request.InstrumentID, OKGroupGetSpotOrderBook, FormatParameters(request))
return resp, o.SendHTTPRequest(http.MethodGet, okGroupTokenSubsection, requestURL, nil, &resp, false)
// GetOrderBook Getting the order book of a trading pair. Pagination is not
// supported here. The whole book will be returned for one request. Websocket is
// recommended here.
func (o *OKGroup) GetOrderBook(request GetOrderBookRequest, a asset.Item) (resp GetOrderBookResponse, _ error) {
var requestType, endpoint string
switch a {
case asset.Spot:
endpoint = OKGroupGetSpotOrderBook
requestType = okGroupTokenSubsection
case asset.Futures:
endpoint = OKGroupGetSpotOrderBook
requestType = "futures"
case asset.PerpetualSwap:
endpoint = "depth"
requestType = "swap"
default:
return resp, errors.New("unhandled asset type")
}
requestURL := fmt.Sprintf("%v/%v/%v/%v",
OKGroupInstruments,
request.InstrumentID,
endpoint,
FormatParameters(request))
return resp, o.SendHTTPRequest(http.MethodGet,
requestType,
requestURL,
nil,
&resp,
false)
}
// GetSpotAllTokenPairsInformation Get the last traded price, best bid/ask price, 24 hour trading volume and more info of all trading pairs.

View File

@@ -0,0 +1,78 @@
package okgroup
import (
"log"
"os"
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/config"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/websocket/wshandler"
)
const (
apiKey = ""
apiSecret = ""
testAPIURL = "https://www.okex.com/api/"
testAPIVersion = "/v3/"
)
var o OKGroup
func TestMain(m *testing.M) {
cfg := config.GetConfig()
err := cfg.LoadConfig("../../testdata/configtest.json", true)
if err != nil {
log.Fatal("okgroup load config error", err)
}
okgroup, err := cfg.GetExchangeConfig("Okex")
if err != nil {
log.Fatal("okgroup Setup() init error", err)
}
okgroup.API.AuthenticatedSupport = true
okgroup.API.Credentials.Key = apiKey
okgroup.API.Credentials.Secret = apiSecret
o.API.Endpoints.URL = testAPIURL
o.APIVersion = testAPIVersion
o.Requester = request.New("okgroup_test_things",
request.NewRateLimit(time.Second, 10),
request.NewRateLimit(time.Second, 10),
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
)
o.Websocket = wshandler.New()
err = o.Setup(okgroup)
if err != nil {
log.Fatal("okgroup setup error", err)
}
os.Exit(m.Run())
}
func TestGetOrderbook(t *testing.T) {
t.Parallel()
_, err := o.GetOrderBook(GetOrderBookRequest{InstrumentID: "BTC-USDT"},
asset.Spot)
if err != nil {
t.Error(err)
}
// futures expire and break test, will need to mock this in the future
_, err = o.GetOrderBook(GetOrderBookRequest{InstrumentID: "Payload"},
asset.Futures)
if err == nil {
t.Error("error cannot be nil")
}
_, err = o.GetOrderBook(GetOrderBookRequest{InstrumentID: "BTC-USD-SWAP"},
asset.PerpetualSwap)
if err != nil {
t.Error(err)
}
}

View File

@@ -274,15 +274,15 @@ type GetSpotTokenPairDetailsResponse struct {
TickSize string `json:"tick_size"`
}
// GetSpotOrderBookRequest request data for GetSpotOrderBook
type GetSpotOrderBookRequest struct {
// GetOrderBookRequest request data for GetOrderBook
type GetOrderBookRequest struct {
Size int64 `url:"size,string,omitempty"` // [optional] number of results per request. Maximum 200
Depth float64 `url:"depth,string,omitempty"` // [optional] the aggregation of the book. e.g . 0.1,0.001
InstrumentID string `url:"-"` // [required] trading pairs
}
// GetSpotOrderBookResponse response data for GetSpotOrderBook
type GetSpotOrderBookResponse struct {
// GetOrderBookResponse response data
type GetOrderBookResponse struct {
Timestamp time.Time `json:"timestamp"`
Asks [][]string `json:"asks"` // [[0]: "Price", [1]: "Size", [2]: "Num_orders"], ...
Bids [][]string `json:"bids"` // [[0]: "Price", [1]: "Size", [2]: "Num_orders"], ...
@@ -678,37 +678,16 @@ type GetFuturesContractInformationResponse struct {
UnderlyingIndex string `json:"underlying_index"`
}
// GetFuturesOrderBookRequest request data for GetFuturesOrderBook
type GetFuturesOrderBookRequest struct {
InstrumentID string `url:"-"` // [required] Contract ID, e.g. "BTC-USD-180213"
Size int64 `url:"size,omitempty"` // [optional] The size of the price range (max: 200)
}
// FuturesOrderbookItem stores an individual futures orderbook item
type FuturesOrderbookItem struct {
Price float64
Size int64
ForceLiquidatedOrders int64 // Number of force liquidated orders
NumberOrders int64 // Number of orders on the price
}
// GetFuturesOrderBookResponse response data for GetFuturesOrderBook
type GetFuturesOrderBookResponse struct {
Asks []FuturesOrderbookItem
Bids []FuturesOrderbookItem
Timestamp time.Time
}
// GetFuturesTokenInfoResponse response data for GetFuturesOrderBook
type GetFuturesTokenInfoResponse struct {
BestAsk float64 `json:"best_ask,string"`
BestBid float64 `json:"best_bid,string"`
High24h float64 `json:"high_24h,string"`
InstrumentID currency.Pair `json:"instrument_id"`
Last float64 `json:"last,string"`
Low24h float64 `json:"low_24h,string"`
Timestamp time.Time `json:"timestamp"`
Volume24h float64 `json:"volume_24h,string"`
BestAsk float64 `json:"best_ask,string"`
BestBid float64 `json:"best_bid,string"`
High24h float64 `json:"high_24h,string"`
InstrumentID string `json:"instrument_id"`
Last float64 `json:"last,string"`
Low24h float64 `json:"low_24h,string"`
Timestamp time.Time `json:"timestamp"`
Volume24h float64 `json:"volume_24h,string"`
}
// GetFuturesFilledOrderRequest request data for GetFuturesFilledOrder
@@ -1059,14 +1038,14 @@ type GetSwapOrderBookResponse struct {
// GetAllSwapTokensInformationResponse response data for GetAllSwapTokensInformation
type GetAllSwapTokensInformationResponse struct {
InstrumentID currency.Pair `json:"instrument_id"`
Last float64 `json:"last,string"`
High24H float64 `json:"high_24h,string"`
Low24H float64 `json:"low_24h,string"`
BestBid float64 `json:"best_bid,string"`
BestAsk float64 `json:"best_ask,string"`
Volume24H float64 `json:"volume_24h,string"`
Timestamp time.Time `json:"timestamp"`
InstrumentID string `json:"instrument_id"`
Last float64 `json:"last,string"`
High24H float64 `json:"high_24h,string"`
Low24H float64 `json:"low_24h,string"`
BestBid float64 `json:"best_bid,string"`
BestAsk float64 `json:"best_ask,string"`
Volume24H float64 `json:"volume_24h,string"`
Timestamp time.Time `json:"timestamp"`
}
// GetSwapFilledOrdersDataRequest request data for GetSwapFilledOrdersData

View File

@@ -140,11 +140,36 @@ const (
okGroupWsFuturesOrder = okGroupWsFuturesSubsection + okGroupWsOrder
okGroupWsRateLimit = 30
allowableIterations = 25
delimiterColon = ":"
delimiterDash = "-"
delimiterUnderscore = "_"
)
// orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time
// orderbookMutex Ensures if two entries arrive at once, only one can be
// processed at a time
var orderbookMutex sync.Mutex
var defaultSubscribedChannels = []string{okGroupWsSpotDepth, okGroupWsSpotCandle300s, okGroupWsSpotTicker, okGroupWsSpotTrade}
var defaultSpotSubscribedChannels = []string{okGroupWsSpotDepth,
okGroupWsSpotCandle300s,
okGroupWsSpotTicker,
okGroupWsSpotTrade}
var defaultFuturesSubscribedChannels = []string{okGroupWsFuturesDepth,
okGroupWsFuturesCandle300s,
okGroupWsFuturesTicker,
okGroupWsFuturesTrade}
var defaultIndexSubscribedChannels = []string{okGroupWsIndexCandle300s,
okGroupWsIndexTicker}
var defaultSwapSubscribedChannels = []string{okGroupWsSwapDepth,
okGroupWsSwapCandle300s,
okGroupWsSwapTicker,
okGroupWsSwapTrade,
okGroupWsSwapFundingRate,
okGroupWsSwapMarkPrice}
// WsConnect initiates a websocket connection
func (o *OKGroup) WsConnect() error {
@@ -161,13 +186,15 @@ func (o *OKGroup) WsConnect() error {
o.Websocket.GetWebsocketURL())
}
wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
go o.WsHandleData(&wg)
go o.wsPingHandler(&wg)
if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
err = o.WsLogin()
if err != nil {
log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", o.Name, err)
log.Errorf(log.ExchangeSys,
"%v - authentication failed: %v\n",
o.Name,
err)
}
}
@@ -177,36 +204,6 @@ func (o *OKGroup) WsConnect() error {
return nil
}
// wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket
func (o *OKGroup) wsPingHandler(wg *sync.WaitGroup) {
o.Websocket.Wg.Add(1)
defer o.Websocket.Wg.Done()
ticker := time.NewTicker(time.Second * 27)
defer ticker.Stop()
wg.Done()
for {
select {
case <-o.Websocket.ShutdownC:
return
case <-ticker.C:
if !o.Websocket.IsConnected() {
continue
}
err := o.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("ping"))
if o.Verbose {
log.Debugf(log.ExchangeSys, "%v sending ping", o.GetName())
}
if err != nil {
o.Websocket.DataHandler <- err
}
}
}
}
// WsHandleData handles the read data from the websocket connection
func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) {
o.Websocket.Wg.Add(1)
@@ -240,7 +237,11 @@ func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) {
err = common.JSONDecode(resp.Raw, &errorResponse)
if err == nil && errorResponse.ErrorCode > 0 {
if o.Verbose {
log.Debugf(log.ExchangeSys, "WS Error Event: %v Message: %v", errorResponse.Event, errorResponse.Message)
log.Debugf(log.ExchangeSys,
"WS Error Event: %v Message: %v for %s",
errorResponse.Event,
errorResponse.Message,
o.Name)
}
o.WsHandleErrorResponse(errorResponse)
continue
@@ -252,10 +253,12 @@ func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) {
o.Websocket.SetCanUseAuthenticatedEndpoints(eventResponse.Success)
}
if o.Verbose {
log.Debugf(log.ExchangeSys, "WS Event: %v on Channel: %v", eventResponse.Event, eventResponse.Channel)
log.Debugf(log.ExchangeSys,
"WS Event: %v on Channel: %v for %s",
eventResponse.Event,
eventResponse.Channel,
o.Name)
}
o.Websocket.DataHandler <- eventResponse
continue
}
}
}
@@ -273,7 +276,10 @@ func (o *OKGroup) WsLogin() error {
base64 := crypto.Base64Encode(hmac)
request := WebsocketEventRequest{
Operation: "login",
Arguments: []string{o.API.Credentials.Key, o.API.Credentials.ClientID, fmt.Sprintf("%v", unixTime), base64},
Arguments: []string{o.API.Credentials.Key,
o.API.Credentials.ClientID,
fmt.Sprintf("%v", unixTime),
base64},
}
err := o.WebsocketConn.SendMessage(request)
if err != nil {
@@ -286,7 +292,9 @@ func (o *OKGroup) WsLogin() error {
// WsHandleErrorResponse sends an error message to ws handler
func (o *OKGroup) WsHandleErrorResponse(event WebsocketErrorResponse) {
errorMessage := fmt.Sprintf("%v error - %v message: %s ",
o.GetName(), event.ErrorCode, event.Message)
o.Name,
event.ErrorCode,
event.Message)
if o.Verbose {
log.Error(log.ExchangeSys, errorMessage)
}
@@ -314,28 +322,54 @@ func (o *OKGroup) GetWsChannelWithoutOrderType(table string) string {
// eg "spot/ticker:BTCUSD" results in "SPOT"
func (o *OKGroup) GetAssetTypeFromTableName(table string) asset.Item {
assetIndex := strings.Index(table, "/")
return asset.Item(table[:assetIndex])
switch table[:assetIndex] {
case asset.Futures.String():
return asset.Futures
case asset.Spot.String():
return asset.Spot
case "swap":
return asset.PerpetualSwap
case asset.Index.String():
return asset.Index
default:
log.Warnf(log.ExchangeSys, "%s unhandled asset type %s",
o.Name,
table[:assetIndex])
return asset.Item(table[:assetIndex])
}
}
// WsHandleDataResponse classifies the WS response and sends to appropriate handler
func (o *OKGroup) WsHandleDataResponse(response *WebsocketDataResponse) {
switch o.GetWsChannelWithoutOrderType(response.Table) {
case okGroupWsCandle60s, okGroupWsCandle180s, okGroupWsCandle300s, okGroupWsCandle900s,
okGroupWsCandle1800s, okGroupWsCandle3600s, okGroupWsCandle7200s, okGroupWsCandle14400s,
okGroupWsCandle21600s, okGroupWsCandle43200s, okGroupWsCandle86400s, okGroupWsCandle604900s:
case okGroupWsCandle60s, okGroupWsCandle180s, okGroupWsCandle300s,
okGroupWsCandle900s, okGroupWsCandle1800s, okGroupWsCandle3600s,
okGroupWsCandle7200s, okGroupWsCandle14400s, okGroupWsCandle21600s,
okGroupWsCandle43200s, okGroupWsCandle86400s, okGroupWsCandle604900s:
o.wsProcessCandles(response)
case okGroupWsDepth, okGroupWsDepth5:
// Locking, orderbooks cannot be processed out of order
orderbookMutex.Lock()
err := o.WsProcessOrderBook(response)
if err != nil {
pair := currency.NewPairDelimiter(response.Data[0].InstrumentID, "-")
channelToResubscribe := wshandler.WebsocketChannelSubscription{
Channel: response.Table,
Currency: pair,
for i := range response.Data {
a := o.GetAssetTypeFromTableName(response.Table)
var c currency.Pair
switch a {
case asset.Futures, asset.PerpetualSwap:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0]+delimiterDash+f[1], f[2], delimiterDash)
default:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash)
}
channelToResubscribe := wshandler.WebsocketChannelSubscription{
Channel: response.Table,
Currency: c,
}
o.Websocket.ResubscribeToChannel(channelToResubscribe)
}
o.Websocket.ResubscribeToChannel(channelToResubscribe)
}
orderbookMutex.Unlock()
case okGroupWsTicker:
@@ -343,26 +377,37 @@ func (o *OKGroup) WsHandleDataResponse(response *WebsocketDataResponse) {
case okGroupWsTrade:
o.wsProcessTrades(response)
default:
logDataResponse(response)
logDataResponse(response, o.Name)
}
}
// logDataResponse will log the details of any websocket data event
// where there is no websocket datahandler for it
func logDataResponse(response *WebsocketDataResponse) {
func logDataResponse(response *WebsocketDataResponse, exchangeName string) {
for i := range response.Data {
log.Errorf(log.ExchangeSys, "Unhandled channel: '%v'. Instrument '%v' Timestamp '%v', Data '%v",
log.Warnf(log.ExchangeSys,
"%s Unhandled channel: '%v'. Instrument '%v' Timestamp '%v'",
exchangeName,
response.Table,
response.Data[i].InstrumentID,
response.Data[i].Timestamp,
response.Data[i])
response.Data[i].Timestamp)
}
}
// wsProcessTickers converts ticker data and sends it to the datahandler
func (o *OKGroup) wsProcessTickers(response *WebsocketDataResponse) {
for i := range response.Data {
instrument := currency.NewPairDelimiter(response.Data[i].InstrumentID, "-")
a := o.GetAssetTypeFromTableName(response.Table)
var c currency.Pair
switch a {
case asset.Futures, asset.PerpetualSwap:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0]+delimiterDash+f[1], f[2], delimiterUnderscore)
default:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash)
}
o.Websocket.DataHandler <- wshandler.TickerData{
Exchange: o.Name,
Open: response.Data[i].Open24h,
@@ -376,7 +421,7 @@ func (o *OKGroup) wsProcessTickers(response *WebsocketDataResponse) {
Last: response.Data[i].Last,
Timestamp: response.Data[i].Timestamp,
AssetType: o.GetAssetTypeFromTableName(response.Table),
Pair: instrument,
Pair: c,
}
}
}
@@ -384,11 +429,21 @@ func (o *OKGroup) wsProcessTickers(response *WebsocketDataResponse) {
// wsProcessTrades converts trade data and sends it to the datahandler
func (o *OKGroup) wsProcessTrades(response *WebsocketDataResponse) {
for i := range response.Data {
instrument := currency.NewPairDelimiter(response.Data[i].InstrumentID, "-")
a := o.GetAssetTypeFromTableName(response.Table)
var c currency.Pair
switch a {
case asset.Futures, asset.PerpetualSwap:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0]+delimiterDash+f[1], f[2], delimiterUnderscore)
default:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash)
}
o.Websocket.DataHandler <- wshandler.TradeData{
Amount: response.Data[i].Size,
AssetType: o.GetAssetTypeFromTableName(response.Table),
CurrencyPair: instrument,
CurrencyPair: c,
EventTime: time.Now().Unix(),
Exchange: o.GetName(),
Price: response.Data[i].WebsocketTradeResponse.Price,
@@ -401,10 +456,24 @@ func (o *OKGroup) wsProcessTrades(response *WebsocketDataResponse) {
// wsProcessCandles converts candle data and sends it to the data handler
func (o *OKGroup) wsProcessCandles(response *WebsocketDataResponse) {
for i := range response.Data {
instrument := currency.NewPairDelimiter(response.Data[i].InstrumentID, "-")
timeData, err := time.Parse(time.RFC3339Nano, response.Data[i].WebsocketCandleResponse.Candle[0])
a := o.GetAssetTypeFromTableName(response.Table)
var c currency.Pair
switch a {
case asset.Futures, asset.PerpetualSwap:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0]+delimiterDash+f[1], f[2], delimiterUnderscore)
default:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash)
}
timeData, err := time.Parse(time.RFC3339Nano,
response.Data[i].WebsocketCandleResponse.Candle[0])
if err != nil {
log.Warnf(log.ExchangeSys, "%v Time data could not be parsed: %v", o.GetName(), response.Data[i].Candle[0])
log.Warnf(log.ExchangeSys,
"%v Time data could not be parsed: %v",
o.Name,
response.Data[i].Candle[0])
}
candleIndex := strings.LastIndex(response.Table, okGroupWsCandle)
@@ -416,16 +485,36 @@ func (o *OKGroup) wsProcessCandles(response *WebsocketDataResponse) {
klineData := wshandler.KlineData{
AssetType: o.GetAssetTypeFromTableName(response.Table),
Pair: instrument,
Pair: c,
Exchange: o.GetName(),
Timestamp: timeData,
Interval: candleInterval,
}
klineData.OpenPrice, _ = strconv.ParseFloat(response.Data[i].Candle[1], 64)
klineData.HighPrice, _ = strconv.ParseFloat(response.Data[i].Candle[2], 64)
klineData.LowPrice, _ = strconv.ParseFloat(response.Data[i].Candle[3], 64)
klineData.ClosePrice, _ = strconv.ParseFloat(response.Data[i].Candle[4], 64)
klineData.Volume, _ = strconv.ParseFloat(response.Data[i].Candle[5], 64)
klineData.OpenPrice, err = strconv.ParseFloat(response.Data[i].Candle[1], 64)
if err != nil {
o.Websocket.DataHandler <- err
continue
}
klineData.HighPrice, err = strconv.ParseFloat(response.Data[i].Candle[2], 64)
if err != nil {
o.Websocket.DataHandler <- err
continue
}
klineData.LowPrice, err = strconv.ParseFloat(response.Data[i].Candle[3], 64)
if err != nil {
o.Websocket.DataHandler <- err
continue
}
klineData.ClosePrice, err = strconv.ParseFloat(response.Data[i].Candle[4], 64)
if err != nil {
o.Websocket.DataHandler <- err
continue
}
klineData.Volume, err = strconv.ParseFloat(response.Data[i].Candle[5], 64)
if err != nil {
o.Websocket.DataHandler <- err
continue
}
o.Websocket.DataHandler <- klineData
}
@@ -434,57 +523,96 @@ func (o *OKGroup) wsProcessCandles(response *WebsocketDataResponse) {
// WsProcessOrderBook Validates the checksum and updates internal orderbook values
func (o *OKGroup) WsProcessOrderBook(response *WebsocketDataResponse) (err error) {
for i := range response.Data {
instrument := currency.NewPairDelimiter(response.Data[i].InstrumentID, "-")
a := o.GetAssetTypeFromTableName(response.Table)
var c currency.Pair
switch a {
case asset.Futures, asset.PerpetualSwap:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0]+delimiterDash+f[1], f[2], delimiterUnderscore)
default:
f := strings.Split(response.Data[i].InstrumentID, delimiterDash)
c = currency.NewPairWithDelimiter(f[0], f[1], delimiterDash)
}
if response.Action == okGroupWsOrderbookPartial {
err = o.WsProcessPartialOrderBook(&response.Data[i], instrument, response.Table)
err = o.WsProcessPartialOrderBook(&response.Data[i], c, a)
if err != nil {
return
}
} else if response.Action == okGroupWsOrderbookUpdate {
err = o.WsProcessUpdateOrderbook(&response.Data[i], instrument, response.Table)
if len(response.Data[i].Asks) == 0 && len(response.Data[i].Bids) == 0 {
continue
}
err = o.WsProcessUpdateOrderbook(&response.Data[i], c, a)
if err != nil {
return
}
}
}
return
}
// AppendWsOrderbookItems adds websocket orderbook data bid/asks into an orderbook item array
func (o *OKGroup) AppendWsOrderbookItems(entries [][]interface{}) (orderbookItems []orderbook.Item) {
func (o *OKGroup) AppendWsOrderbookItems(entries [][]interface{}) ([]orderbook.Item, error) {
var items []orderbook.Item
for j := range entries {
amount, _ := strconv.ParseFloat(entries[j][1].(string), 64)
price, _ := strconv.ParseFloat(entries[j][0].(string), 64)
orderbookItems = append(orderbookItems, orderbook.Item{
Amount: amount,
Price: price,
})
amount, err := strconv.ParseFloat(entries[j][1].(string), 64)
if err != nil {
return nil, err
}
price, err := strconv.ParseFloat(entries[j][0].(string), 64)
if err != nil {
return nil, err
}
items = append(items, orderbook.Item{Amount: amount, Price: price})
}
return
return items, nil
}
// WsProcessPartialOrderBook takes websocket orderbook data and creates an orderbook
// Calculates checksum to ensure it is valid
func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, instrument currency.Pair, tableName string) error {
func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, instrument currency.Pair, a asset.Item) error {
signedChecksum := o.CalculatePartialOrderbookChecksum(wsEventData)
if signedChecksum != wsEventData.Checksum {
return fmt.Errorf("channel: %v. Orderbook partial for %v checksum invalid", tableName, instrument)
return fmt.Errorf("%s channel: %s. Orderbook partial for %v checksum invalid",
o.Name,
a,
instrument)
}
if o.Verbose {
log.Debug(log.ExchangeSys, "Passed checksum!")
log.Debugf(log.ExchangeSys,
"%s passed checksum for instrument %s",
o.Name,
instrument)
}
asks := o.AppendWsOrderbookItems(wsEventData.Asks)
bids := o.AppendWsOrderbookItems(wsEventData.Bids)
asks, err := o.AppendWsOrderbookItems(wsEventData.Asks)
if err != nil {
return err
}
bids, err := o.AppendWsOrderbookItems(wsEventData.Bids)
if err != nil {
return err
}
newOrderBook := orderbook.Base{
Asks: asks,
Bids: bids,
AssetType: o.GetAssetTypeFromTableName(tableName),
AssetType: a,
LastUpdated: wsEventData.Timestamp,
Pair: instrument,
ExchangeName: o.GetName(),
}
err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
err = o.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
}
o.Websocket.DataHandler <- wshandler.WebsocketOrderbookUpdate{
Exchange: o.GetName(),
Asset: o.GetAssetTypeFromTableName(tableName),
Asset: a,
Pair: instrument,
}
return nil
@@ -492,123 +620,209 @@ func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, i
// WsProcessUpdateOrderbook updates an existing orderbook using websocket data
// After merging WS data, it will sort, validate and finally update the existing orderbook
func (o *OKGroup) WsProcessUpdateOrderbook(wsEventData *WebsocketDataWrapper, instrument currency.Pair, tableName string) error {
func (o *OKGroup) WsProcessUpdateOrderbook(wsEventData *WebsocketDataWrapper, instrument currency.Pair, a asset.Item) error {
update := wsorderbook.WebsocketOrderbookUpdate{
AssetType: asset.Spot,
CurrencyPair: instrument,
UpdateTime: wsEventData.Timestamp,
Asset: a,
Pair: instrument,
UpdateTime: wsEventData.Timestamp,
}
update.Asks = o.AppendWsOrderbookItems(wsEventData.Asks)
update.Bids = o.AppendWsOrderbookItems(wsEventData.Bids)
err := o.Websocket.Orderbook.Update(&update)
if err != nil {
log.Error(log.ExchangeSys, err)
}
updatedOb := o.Websocket.Orderbook.GetOrderbook(instrument, asset.Spot)
checksum := o.CalculateUpdateOrderbookChecksum(updatedOb)
if checksum == wsEventData.Checksum {
if o.Verbose {
log.Debug(log.ExchangeSys, "Orderbook valid")
}
o.Websocket.DataHandler <- wshandler.WebsocketOrderbookUpdate{
Exchange: o.GetName(),
Asset: o.GetAssetTypeFromTableName(tableName),
Pair: instrument,
}
} else {
if o.Verbose {
log.Warnln(log.ExchangeSys, "Orderbook invalid")
}
return fmt.Errorf("channel: %v. Orderbook update for %v checksum invalid. Received %v Calculated %v", tableName, instrument, wsEventData.Checksum, checksum)
var err error
update.Asks, err = o.AppendWsOrderbookItems(wsEventData.Asks)
if err != nil {
return err
}
update.Bids, err = o.AppendWsOrderbookItems(wsEventData.Bids)
if err != nil {
return err
}
err = o.Websocket.Orderbook.Update(&update)
if err != nil {
return err
}
updatedOb := o.Websocket.Orderbook.GetOrderbook(instrument, a)
checksum := o.CalculateUpdateOrderbookChecksum(updatedOb)
if checksum != wsEventData.Checksum {
// re-sub
log.Warnf(log.ExchangeSys, "%s checksum failure for item %s",
o.Name,
wsEventData.InstrumentID)
return errors.New("checksum failed")
}
o.Websocket.DataHandler <- wshandler.WebsocketOrderbookUpdate{
Exchange: o.GetName(),
Asset: a,
Pair: instrument,
}
return nil
}
// CalculatePartialOrderbookChecksum alternates over the first 25 bid and ask entries from websocket data
// The checksum is made up of the price and the quantity with a semicolon (:) deliminating them
// This will also work when there are less than 25 entries (for whatever reason)
// CalculatePartialOrderbookChecksum alternates over the first 25 bid and ask
// entries from websocket data. The checksum is made up of the price and the
// quantity with a semicolon (:) deliminating them. This will also work when
// there are less than 25 entries (for whatever reason)
// eg Bid:Ask:Bid:Ask:Ask:Ask
func (o *OKGroup) CalculatePartialOrderbookChecksum(orderbookData *WebsocketDataWrapper) int32 {
var checksum string
iterations := 25
for i := 0; i < iterations; i++ {
bidsMessage := ""
askMessage := ""
for i := 0; i < allowableIterations; i++ {
if len(orderbookData.Bids)-1 >= i {
bidsMessage = fmt.Sprintf("%v:%v:", orderbookData.Bids[i][0], orderbookData.Bids[i][1])
checksum += orderbookData.Bids[i][0].(string) +
delimiterColon +
orderbookData.Bids[i][1].(string) +
delimiterColon
}
if len(orderbookData.Asks)-1 >= i {
askMessage = fmt.Sprintf("%v:%v:", orderbookData.Asks[i][0], orderbookData.Asks[i][1])
}
if checksum == "" {
checksum = fmt.Sprintf("%v%v", bidsMessage, askMessage)
} else {
checksum = fmt.Sprintf("%v%v%v", checksum, bidsMessage, askMessage)
checksum += orderbookData.Asks[i][0].(string) +
delimiterColon +
orderbookData.Asks[i][1].(string) +
delimiterColon
}
}
checksum = strings.TrimSuffix(checksum, ":")
checksum = strings.TrimSuffix(checksum, delimiterColon)
return int32(crc32.ChecksumIEEE([]byte(checksum)))
}
// CalculateUpdateOrderbookChecksum alternates over the first 25 bid and ask entries of a merged orderbook
// The checksum is made up of the price and the quantity with a semicolon (:) deliminating them
// This will also work when there are less than 25 entries (for whatever reason)
// CalculateUpdateOrderbookChecksum alternates over the first 25 bid and ask
// entries of a merged orderbook. The checksum is made up of the price and the
// quantity with a semicolon (:) deliminating them. This will also work when
// there are less than 25 entries (for whatever reason)
// eg Bid:Ask:Bid:Ask:Ask:Ask
func (o *OKGroup) CalculateUpdateOrderbookChecksum(orderbookData *orderbook.Base) int32 {
var checksum string
iterations := 25
for i := 0; i < iterations; i++ {
bidsMessage := ""
askMessage := ""
for i := 0; i < allowableIterations; i++ {
if len(orderbookData.Bids)-1 >= i {
price := strconv.FormatFloat(orderbookData.Bids[i].Price, 'f', -1, 64)
amount := strconv.FormatFloat(orderbookData.Bids[i].Amount, 'f', -1, 64)
bidsMessage = fmt.Sprintf("%v:%v:", price, amount)
checksum += price + delimiterColon + amount + delimiterColon
}
if len(orderbookData.Asks)-1 >= i {
price := strconv.FormatFloat(orderbookData.Asks[i].Price, 'f', -1, 64)
amount := strconv.FormatFloat(orderbookData.Asks[i].Amount, 'f', -1, 64)
askMessage = fmt.Sprintf("%v:%v:", price, amount)
}
if checksum == "" {
checksum = fmt.Sprintf("%v%v", bidsMessage, askMessage)
} else {
checksum = fmt.Sprintf("%v%v%v", checksum, bidsMessage, askMessage)
checksum += price + delimiterColon + amount + delimiterColon
}
}
checksum = strings.TrimSuffix(checksum, ":")
checksum = strings.TrimSuffix(checksum, delimiterColon)
return int32(crc32.ChecksumIEEE([]byte(checksum)))
}
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be
// handled by ManageSubscriptions()
func (o *OKGroup) GenerateDefaultSubscriptions() {
enabledCurrencies := o.GetEnabledPairs(asset.Spot)
var subscriptions []wshandler.WebsocketChannelSubscription
if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
defaultSubscribedChannels = append(defaultSubscribedChannels, okGroupWsSpotMarginAccount, okGroupWsSpotAccount, okGroupWsSpotOrder)
}
for i := range defaultSubscribedChannels {
for j := range enabledCurrencies {
enabledCurrencies[j].Delimiter = "-"
subscriptions = append(subscriptions, wshandler.WebsocketChannelSubscription{
Channel: defaultSubscribedChannels[i],
Currency: enabledCurrencies[j],
})
assets := o.GetAssetTypes()
for x := range assets {
enabledCurrencies := o.GetEnabledPairs(assets[x])
if len(enabledCurrencies) == 0 {
continue
}
switch assets[x] {
case asset.Spot:
for i := range enabledCurrencies {
for y := range defaultSpotSubscribedChannels {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: defaultSpotSubscribedChannels[y],
Currency: o.FormatExchangeCurrency(enabledCurrencies[i],
asset.Spot),
})
}
}
if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSpotMarginAccount,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSpotAccount,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSpotOrder,
})
}
case asset.Futures:
for i := range enabledCurrencies {
for y := range defaultFuturesSubscribedChannels {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: defaultFuturesSubscribedChannels[y],
Currency: o.FormatExchangeCurrency(enabledCurrencies[i],
asset.Futures),
})
}
}
if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsFuturesAccount,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsFuturesPosition,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsFuturesOrder,
})
}
case asset.PerpetualSwap:
for i := range enabledCurrencies {
for y := range defaultSwapSubscribedChannels {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: defaultSwapSubscribedChannels[y],
Currency: o.FormatExchangeCurrency(enabledCurrencies[i],
asset.PerpetualSwap),
})
}
}
if o.GetAuthenticatedAPISupport(exchange.WebsocketAuthentication) {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSwapAccount,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSwapPosition,
},
wshandler.WebsocketChannelSubscription{
Channel: okGroupWsSwapOrder,
})
}
case asset.Index:
for i := range enabledCurrencies {
for y := range defaultIndexSubscribedChannels {
subscriptions = append(subscriptions,
wshandler.WebsocketChannelSubscription{
Channel: defaultIndexSubscribedChannels[y],
Currency: o.FormatExchangeCurrency(enabledCurrencies[i], asset.Index),
})
}
}
default:
o.Websocket.DataHandler <- errors.New("unhandled asset type")
}
}
o.Websocket.SubscribeToChannels(subscriptions)
}
// Subscribe sends a websocket message to receive data from the channel
func (o *OKGroup) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
c := channelToSubscribe.Currency.String()
request := WebsocketEventRequest{
Operation: "subscribe",
Arguments: []string{fmt.Sprintf("%v:%v", channelToSubscribe.Channel, channelToSubscribe.Currency.String())},
Arguments: []string{channelToSubscribe.Channel + delimiterColon + c},
}
if strings.EqualFold(channelToSubscribe.Channel, okGroupWsSpotAccount) {
request.Arguments = []string{fmt.Sprintf("%v:%v", channelToSubscribe.Channel, channelToSubscribe.Currency.Base.String())}
request.Arguments = []string{channelToSubscribe.Channel +
delimiterColon +
channelToSubscribe.Currency.Base.String()}
}
return o.WebsocketConn.SendMessage(request)
@@ -618,7 +832,9 @@ func (o *OKGroup) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscri
func (o *OKGroup) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
request := WebsocketEventRequest{
Operation: "unsubscribe",
Arguments: []string{fmt.Sprintf("%v:%v", channelToSubscribe.Channel, channelToSubscribe.Currency.String())},
Arguments: []string{channelToSubscribe.Channel +
delimiterColon +
channelToSubscribe.Currency.String()},
}
return o.WebsocketConn.SendMessage(request)
}

View File

@@ -1,6 +1,7 @@
package okgroup
import (
"errors"
"fmt"
"strconv"
"strings"
@@ -79,62 +80,93 @@ func (o *OKGroup) FetchOrderbook(p currency.Pair, assetType asset.Item) (resp or
}
// UpdateOrderbook updates and returns the orderbook for a currency pair
func (o *OKGroup) UpdateOrderbook(p currency.Pair, assetType asset.Item) (resp orderbook.Base, err error) {
orderbookNew, err := o.GetSpotOrderBook(GetSpotOrderBookRequest{
InstrumentID: o.FormatExchangeCurrency(p, assetType).String(),
})
func (o *OKGroup) UpdateOrderbook(p currency.Pair, a asset.Item) (orderbook.Base, error) {
var resp orderbook.Base
if a == asset.Index {
return resp, errors.New("no orderbooks for index")
}
orderbookNew, err := o.GetOrderBook(GetOrderBookRequest{
InstrumentID: o.FormatExchangeCurrency(p, a).String(),
}, a)
if err != nil {
return
return resp, err
}
for x := range orderbookNew.Bids {
amount, convErr := strconv.ParseFloat(orderbookNew.Bids[x][1], 64)
if convErr != nil {
log.Errorf(log.ExchangeSys,
"Could not convert %v to float64",
orderbookNew.Bids[x][1])
return resp, err
}
price, convErr := strconv.ParseFloat(orderbookNew.Bids[x][0], 64)
if convErr != nil {
log.Errorf(log.ExchangeSys,
"Could not convert %v to float64",
orderbookNew.Bids[x][0])
return resp, err
}
var liquidationOrders, orderCount int64
// Contract specific variables
if len(orderbookNew.Bids[x]) == 4 {
liquidationOrders, convErr = strconv.ParseInt(orderbookNew.Bids[x][2], 10, 64)
if convErr != nil {
return resp, err
}
orderCount, convErr = strconv.ParseInt(orderbookNew.Bids[x][3], 10, 64)
if convErr != nil {
return resp, err
}
}
resp.Bids = append(resp.Bids, orderbook.Item{
Amount: amount,
Price: price,
Amount: amount,
Price: price,
LiquidationOrders: liquidationOrders,
OrderCount: orderCount,
})
}
for x := range orderbookNew.Asks {
amount, convErr := strconv.ParseFloat(orderbookNew.Asks[x][1], 64)
if convErr != nil {
log.Errorf(log.ExchangeSys,
"Could not convert %v to float64",
orderbookNew.Asks[x][1])
return resp, err
}
price, convErr := strconv.ParseFloat(orderbookNew.Asks[x][0], 64)
if convErr != nil {
log.Errorf(log.ExchangeSys,
"Could not convert %v to float64",
orderbookNew.Asks[x][0])
return resp, err
}
var liquidationOrders, orderCount int64
// Contract specific variables
if len(orderbookNew.Asks[x]) == 4 {
liquidationOrders, convErr = strconv.ParseInt(orderbookNew.Asks[x][2], 10, 64)
if convErr != nil {
return resp, err
}
orderCount, convErr = strconv.ParseInt(orderbookNew.Asks[x][3], 10, 64)
if convErr != nil {
return resp, err
}
}
resp.Asks = append(resp.Asks, orderbook.Item{
Amount: amount,
Price: price,
Amount: amount,
Price: price,
LiquidationOrders: liquidationOrders,
OrderCount: orderCount,
})
}
resp.Pair = p
resp.AssetType = assetType
resp.AssetType = a
resp.ExchangeName = o.Name
err = resp.Process()
if err != nil {
return
return resp, err
}
return orderbook.Get(o.Name, p, assetType)
return orderbook.Get(o.Name, p, a)
}
// GetAccountInfo retrieves balances for all enabled currencies