Implement Request Retry and Backoff (#491)

Refactoring the timeout retries into a more general 'retry policy' with
 support for retrying on HTTP 429 (Too Many Requests) and other responses
 with a `Retry-After` header

The delay between requests is controlled by a combination of a 'backoff'
 (currently only a simple linear backoff), and honouring the
 `Retry-After` value (longest delay wins)

This makes the 'rate limiter' an optional argument as well, removing the
 use of `nil` when one isn't supplied

Signed-off-by: David Ackroyd <daveo.ackroyd@gmail.com>
This commit is contained in:
David Ackroyd
2020-05-05 13:12:29 +10:00
committed by GitHub
parent 70615279bd
commit 56e535001c
80 changed files with 791 additions and 250 deletions

View File

@@ -2,6 +2,7 @@ package alphapoint
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -520,7 +521,7 @@ func (a *Alphapoint) SendHTTPRequest(method, path string, data map[string]interf
return errors.New("unable to JSON request")
}
return a.SendPayload(&request.Item{
return a.SendPayload(context.Background(), &request.Item{
Method: method,
Path: path,
Headers: headers,
@@ -554,7 +555,7 @@ func (a *Alphapoint) SendAuthenticatedHTTPRequest(method, path string, data map[
return errors.New("unable to JSON request")
}
return a.SendPayload(&request.Item{
return a.SendPayload(context.Background(), &request.Item{
Method: method,
Path: path,
Headers: headers,

View File

@@ -70,8 +70,7 @@ func (a *Alphapoint) SetDefaults() {
}
a.Requester = request.New(a.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
}
// FetchTradablePairs returns a list of the exchanges tradable pairs

View File

@@ -2,6 +2,7 @@ package binance
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -475,7 +476,7 @@ func (b *Binance) GetAccount() (*Account, error) {
// SendHTTPRequest sends an unauthenticated request
func (b *Binance) SendHTTPRequest(path string, f request.EndpointLimit, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -494,7 +495,8 @@ func (b *Binance) SendAuthHTTPRequest(method, path string, params url.Values, f
if params == nil {
params = url.Values{}
}
params.Set("recvWindow", strconv.FormatInt(convert.RecvWindow(5*time.Second), 10))
recvWindow := 5 * time.Second
params.Set("recvWindow", strconv.FormatInt(convert.RecvWindow(recvWindow), 10))
params.Set("timestamp", strconv.FormatInt(time.Now().Unix()*1000, 10))
signature := params.Encode()
@@ -518,7 +520,9 @@ func (b *Binance) SendAuthHTTPRequest(method, path string, params url.Values, f
Message string `json:"msg"`
}{}
err := b.SendPayload(&request.Item{
ctx, cancel := context.WithTimeout(context.Background(), recvWindow)
defer cancel()
err := b.SendPayload(ctx, &request.Item{
Method: method,
Path: path,
Headers: headers,
@@ -698,7 +702,7 @@ func (b *Binance) GetWsAuthStreamKey() (string, error) {
path := b.API.Endpoints.URL + userAccountStream
headers := make(map[string]string)
headers["X-MBX-APIKEY"] = b.API.Credentials.Key
err := b.SendPayload(&request.Item{
err := b.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: path,
Headers: headers,
@@ -728,7 +732,7 @@ func (b *Binance) MaintainWsAuthStreamKey() error {
path = common.EncodeURLValues(path, params)
headers := make(map[string]string)
headers["X-MBX-APIKEY"] = b.API.Credentials.Key
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodPut,
Path: path,
Headers: headers,

View File

@@ -118,7 +118,7 @@ func (b *Binance) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.URLDefault = apiURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package bitfinex
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -1126,7 +1127,7 @@ func (b *Bitfinex) CloseMarginFunding(swapID int64) (Offer, error) {
// SendHTTPRequest sends an unauthenticated request
func (b *Bitfinex) SendHTTPRequest(path string, result interface{}, e request.EndpointLimit) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -1171,7 +1172,7 @@ func (b *Bitfinex) SendAuthenticatedHTTPRequest(method, path string, params map[
headers["X-BFX-PAYLOAD"] = PayloadBase64
headers["X-BFX-SIGNATURE"] = crypto.HexEncodeToString(hmac)
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: method,
Path: b.API.Endpoints.URL + bitfinexAPIVersion + path,
Headers: headers,

View File

@@ -127,7 +127,7 @@ func (b *Bitfinex) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.URLDefault = bitfinexAPIURLBase
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package bitflyer
import (
"context"
"errors"
"fmt"
"net/http"
@@ -304,7 +305,7 @@ func (b *Bitflyer) GetTradingCommission() {
// SendHTTPRequest sends an unauthenticated request
func (b *Bitflyer) SendHTTPRequest(path string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,

View File

@@ -91,7 +91,7 @@ func (b *Bitflyer) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.URLDefault = japanURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package bithumb
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -451,7 +452,7 @@ func (b *Bithumb) MarketSellOrder(currency string, units float64) (MarketSell, e
// SendHTTPRequest sends an unauthenticated HTTP request
func (b *Bithumb) SendHTTPRequest(path string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -494,7 +495,7 @@ func (b *Bithumb) SendAuthenticatedHTTPRequest(path string, params url.Values, r
Message string `json:"message"`
}{}
err := b.SendPayload(&request.Item{
err := b.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: b.API.Endpoints.URL + path,
Headers: headers,

View File

@@ -106,7 +106,7 @@ func (b *Bithumb) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.URLDefault = apiURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package bitmex
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
@@ -769,7 +770,7 @@ func (b *Bitmex) SendHTTPRequest(path string, params Parameter, result interface
if err != nil {
return err
}
err = b.SendPayload(&request.Item{
err = b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: encodedPath,
Result: &respCheck,
@@ -783,7 +784,7 @@ func (b *Bitmex) SendHTTPRequest(path string, params Parameter, result interface
return b.CaptureError(respCheck, result)
}
}
err := b.SendPayload(&request.Item{
err := b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: &respCheck,
@@ -804,7 +805,8 @@ func (b *Bitmex) SendAuthenticatedHTTPRequest(verb, path string, params Paramete
b.Name)
}
timestamp := time.Now().Add(time.Second * 10).UnixNano()
expires := time.Now().Add(time.Second * 10)
timestamp := expires.UnixNano()
timestampStr := strconv.FormatInt(timestamp, 10)
timestampNew := timestampStr[:13]
@@ -834,7 +836,9 @@ func (b *Bitmex) SendAuthenticatedHTTPRequest(verb, path string, params Paramete
var respCheck interface{}
err := b.SendPayload(&request.Item{
ctx, cancel := context.WithDeadline(context.Background(), expires)
defer cancel()
err := b.SendPayload(ctx, &request.Item{
Method: verb,
Path: b.API.Endpoints.URL + path,
Headers: headers,

View File

@@ -139,7 +139,7 @@ func (b *Bitmex) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.URLDefault = bitmexAPIURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package bitstamp
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -608,7 +609,7 @@ func (b *Bitstamp) TransferAccountBalance(amount float64, currency, subAccount s
// SendHTTPRequest sends an unauthenticated HTTP request
func (b *Bitstamp) SendHTTPRequest(path string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -661,7 +662,7 @@ func (b *Bitstamp) SendAuthenticatedHTTPRequest(path string, v2 bool, values url
Reason interface{} `json:"reason"`
}{}
err := b.SendPayload(&request.Item{
err := b.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: path,
Headers: headers,

View File

@@ -111,7 +111,7 @@ func (b *Bitstamp) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
request.NewBasicRateLimit(bitstampRateInterval, bitstampRequestRate))
request.WithLimiter(request.NewBasicRateLimit(bitstampRateInterval, bitstampRequestRate)))
b.API.Endpoints.URLDefault = bitstampAPIURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package bittrex
import (
"context"
"errors"
"fmt"
"net/http"
@@ -430,7 +431,7 @@ func (b *Bittrex) GetDepositHistory(currency string) (DepositHistory, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (b *Bittrex) SendHTTPRequest(path string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -458,7 +459,7 @@ func (b *Bittrex) SendAuthenticatedHTTPRequest(path string, values url.Values, r
headers := make(map[string]string)
headers["apisign"] = crypto.HexEncodeToString(hmac)
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: rawQuery,
Headers: headers,

View File

@@ -101,7 +101,7 @@ func (b *Bittrex) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
request.NewBasicRateLimit(bittrexRateInterval, bittrexRequestRate))
request.WithLimiter(request.NewBasicRateLimit(bittrexRateInterval, bittrexRequestRate)))
b.API.Endpoints.URLDefault = bittrexAPIURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package btcmarkets
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -740,7 +741,7 @@ func (b *BTCMarkets) CancelBatchOrders(ids []string) (BatchCancelResponse, error
// SendHTTPRequest sends an unauthenticated HTTP request
func (b *BTCMarkets) SendHTTPRequest(path string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -757,7 +758,8 @@ func (b *BTCMarkets) SendAuthenticatedRequest(method, path string, data, result
b.Name)
}
strTime := strconv.FormatInt(time.Now().UTC().UnixNano()/1000000, 10)
now := time.Now()
strTime := strconv.FormatInt(now.UTC().UnixNano()/1000000, 10)
var body io.Reader
var payload, hmac []byte
@@ -786,7 +788,10 @@ func (b *BTCMarkets) SendAuthenticatedRequest(method, path string, data, result
headers["BM-AUTH-TIMESTAMP"] = strTime
headers["BM-AUTH-SIGNATURE"] = crypto.Base64Encode(hmac)
return b.SendPayload(&request.Item{
// The timestamp included with an authenticated request must be within +/- 30 seconds of the server timestamp
ctx, cancel := context.WithDeadline(context.Background(), now.Add(30*time.Second))
defer cancel()
return b.SendPayload(ctx, &request.Item{
Method: method,
Path: btcMarketsAPIURL + btcMarketsAPIVersion + path,
Headers: headers,

View File

@@ -115,7 +115,7 @@ func (b *BTCMarkets) SetDefaults() {
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
b.API.Endpoints.WebsocketURL = btcMarketsWSURL
b.Websocket = wshandler.New()

View File

@@ -2,6 +2,7 @@ package btse
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -188,7 +189,7 @@ func (b *BTSE) GetFills(orderID, symbol, before, after, limit, username string)
// SendHTTPRequest sends an HTTP request to the desired endpoint
func (b *BTSE) SendHTTPRequest(method, endpoint string, result interface{}) error {
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: method,
Path: b.API.Endpoints.URL + btseAPIPath + endpoint,
Result: result,
@@ -238,13 +239,14 @@ func (b *BTSE) SendAuthenticatedHTTPRequest(method, endpoint string, req map[str
b.Name, method, path, string(payload))
}
return b.SendPayload(&request.Item{
return b.SendPayload(context.Background(), &request.Item{
Method: method,
Path: b.API.Endpoints.URL + path,
Headers: headers,
Body: body,
Result: result,
AuthRequest: true,
NonceEnabled: true,
Verbose: b.Verbose,
HTTPDebugging: b.HTTPDebugging,
HTTPRecording: b.HTTPRecording,

View File

@@ -108,8 +108,7 @@ func (b *BTSE) SetDefaults() {
}
b.Requester = request.New(b.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
b.API.Endpoints.URLDefault = btseAPIURL
b.API.Endpoints.URL = b.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package coinbasepro
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -720,7 +721,7 @@ func (c *CoinbasePro) GetTrailingVolume() ([]Volume, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (c *CoinbasePro) SendHTTPRequest(path string, result interface{}) error {
return c.SendPayload(&request.Item{
return c.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -750,7 +751,8 @@ func (c *CoinbasePro) SendAuthenticatedHTTPRequest(method, path string, params m
}
}
n := strconv.FormatInt(time.Now().Unix(), 10)
now := time.Now()
n := strconv.FormatInt(now.Unix(), 10)
message := n + method + "/" + path + string(payload)
hmac := crypto.GetHMAC(crypto.HashSHA256, []byte(message), []byte(c.API.Credentials.Secret))
headers := make(map[string]string)
@@ -760,7 +762,10 @@ func (c *CoinbasePro) SendAuthenticatedHTTPRequest(method, path string, params m
headers["CB-ACCESS-PASSPHRASE"] = c.API.Credentials.ClientID
headers["Content-Type"] = "application/json"
return c.SendPayload(&request.Item{
// Timestamp must be within 30 seconds of the api service time
ctx, cancel := context.WithDeadline(context.Background(), now.Add(30*time.Second))
defer cancel()
return c.SendPayload(ctx, &request.Item{
Method: method,
Path: c.API.Endpoints.URL + path,
Headers: headers,

View File

@@ -120,7 +120,7 @@ func (c *CoinbasePro) SetDefaults() {
c.Requester = request.New(c.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
c.API.Endpoints.URLDefault = coinbaseproAPIURL
c.API.Endpoints.URL = c.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package coinbene
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -1096,7 +1097,7 @@ func (c *Coinbene) SendHTTPRequest(path string, f request.EndpointLimit, result
Message string `json:"message"`
}{}
if err := c.SendPayload(&request.Item{
if err := c.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: &resp,
@@ -1128,7 +1129,8 @@ func (c *Coinbene) SendAuthHTTPRequest(method, path, epPath string, isSwap bool,
if isSwap {
authPath = coinbeneSwapAuthPath
}
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.999Z")
now := time.Now()
timestamp := now.UTC().Format("2006-01-02T15:04:05.999Z")
var finalBody io.Reader
var preSign string
switch {
@@ -1175,7 +1177,10 @@ func (c *Coinbene) SendAuthHTTPRequest(method, path, epPath string, isSwap bool,
Message string `json:"message"`
}{}
if err := c.SendPayload(&request.Item{
// Expiry of timestamp doesn't appear to be documented, so making a reasonable assumption
ctx, cancel := context.WithDeadline(context.Background(), now.Add(15*time.Second))
defer cancel()
if err := c.SendPayload(ctx, &request.Item{
Method: method,
Path: path,
Headers: headers,

View File

@@ -121,7 +121,7 @@ func (c *Coinbene) SetDefaults() {
}
c.Requester = request.New(c.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
c.API.Endpoints.URLDefault = coinbeneAPIURL
c.API.Endpoints.URL = c.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package coinut
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -290,13 +291,14 @@ func (c *COINUT) SendHTTPRequest(apiRequest string, params map[string]interface{
headers["Content-Type"] = "application/json"
var rawMsg json.RawMessage
err = c.SendPayload(&request.Item{
err = c.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: c.API.Endpoints.URL,
Headers: headers,
Body: bytes.NewBuffer(payload),
Result: &rawMsg,
AuthRequest: authenticated,
NonceEnabled: true,
Verbose: c.Verbose,
HTTPDebugging: c.HTTPDebugging,
HTTPRecording: c.HTTPRecording,

View File

@@ -117,8 +117,7 @@ func (c *COINUT) SetDefaults() {
}
c.Requester = request.New(c.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
c.API.Endpoints.URLDefault = coinutAPIURL
c.API.Endpoints.URL = c.API.Endpoints.URLDefault

View File

@@ -36,8 +36,7 @@ const (
func (e *Base) checkAndInitRequester() {
if e.Requester == nil {
e.Requester = request.New(e.Name,
new(http.Client),
nil)
new(http.Client))
}
}

View File

@@ -66,8 +66,7 @@ func TestHTTPClient(t *testing.T) {
b := Base{Name: "RAWR"}
b.Requester = request.New(b.Name,
new(http.Client),
nil)
new(http.Client))
b.SetHTTPClientTimeout(time.Second * 5)
if b.GetHTTPClient().Timeout != time.Second*5 {
@@ -92,8 +91,7 @@ func TestSetClientProxyAddress(t *testing.T) {
t.Parallel()
requester := request.New("rawr",
&http.Client{},
nil)
&http.Client{})
newBase := Base{
Name: "rawr",

View File

@@ -1,6 +1,7 @@
package exmo
import (
"context"
"errors"
"fmt"
"net/http"
@@ -300,7 +301,7 @@ func (e *EXMO) GetWalletHistory(date int64) (WalletHistory, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (e *EXMO) SendHTTPRequest(path string, result interface{}) error {
return e.SendPayload(&request.Item{
return e.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -339,7 +340,7 @@ func (e *EXMO) SendAuthenticatedHTTPRequest(method, endpoint string, vals url.Va
path := fmt.Sprintf("%s/v%s/%s", e.API.Endpoints.URL, exmoAPIVersion, endpoint)
return e.SendPayload(&request.Item{
return e.SendPayload(context.Background(), &request.Item{
Method: method,
Path: path,
Headers: headers,

View File

@@ -108,7 +108,7 @@ func (e *EXMO) SetDefaults() {
e.Requester = request.New(e.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
request.NewBasicRateLimit(exmoRateInterval, exmoRequestRate))
request.WithLimiter(request.NewBasicRateLimit(exmoRateInterval, exmoRequestRate)))
e.API.Endpoints.URLDefault = exmoAPIURL
e.API.Endpoints.URL = e.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package gateio
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -304,7 +305,7 @@ func (g *Gateio) CancelExistingOrder(orderID int64, symbol string) (bool, error)
// SendHTTPRequest sends an unauthenticated HTTP request
func (g *Gateio) SendHTTPRequest(path string, result interface{}) error {
return g.SendPayload(&request.Item{
return g.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -403,7 +404,7 @@ func (g *Gateio) SendAuthenticatedHTTPRequest(method, endpoint, param string, re
urlPath := fmt.Sprintf("%s/%s/%s", g.API.Endpoints.URL, gateioAPIVersion, endpoint)
var intermidiary json.RawMessage
err := g.SendPayload(&request.Item{
err := g.SendPayload(context.Background(), &request.Item{
Method: method,
Path: urlPath,
Headers: headers,

View File

@@ -115,8 +115,7 @@ func (g *Gateio) SetDefaults() {
}
g.Requester = request.New(g.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
g.API.Endpoints.URLDefault = gateioTradeURL
g.API.Endpoints.URL = g.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package gemini
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -342,7 +343,7 @@ func (g *Gemini) PostHeartbeat() (string, error) {
// SendHTTPRequest sends an unauthenticated request
func (g *Gemini) SendHTTPRequest(path string, result interface{}) error {
return g.SendPayload(&request.Item{
return g.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -387,7 +388,7 @@ func (g *Gemini) SendAuthenticatedHTTPRequest(method, path string, params map[st
headers["X-GEMINI-SIGNATURE"] = crypto.HexEncodeToString(hmac)
headers["Cache-Control"] = "no-cache"
return g.SendPayload(&request.Item{
return g.SendPayload(context.Background(), &request.Item{
Method: method,
Path: g.API.Endpoints.URL + "/v1/" + path,
Headers: headers,

View File

@@ -110,7 +110,7 @@ func (g *Gemini) SetDefaults() {
g.Requester = request.New(g.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
g.API.Endpoints.URLDefault = geminiAPIURL
g.API.Endpoints.URL = g.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package hitbtc
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
@@ -524,7 +525,7 @@ func (h *HitBTC) TransferBalance(currency, from, to string, amount float64) (boo
// SendHTTPRequest sends an unauthenticated HTTP request
func (h *HitBTC) SendHTTPRequest(path string, result interface{}) error {
return h.SendPayload(&request.Item{
return h.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -546,7 +547,7 @@ func (h *HitBTC) SendAuthenticatedHTTPRequest(method, endpoint string, values ur
path := fmt.Sprintf("%s/%s", h.API.Endpoints.URL, endpoint)
return h.SendPayload(&request.Item{
return h.SendPayload(context.Background(), &request.Item{
Method: method,
Path: path,
Headers: headers,

View File

@@ -117,7 +117,7 @@ func (h *HitBTC) SetDefaults() {
h.Requester = request.New(h.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
h.API.Endpoints.URLDefault = apiURL
h.API.Endpoints.URL = h.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package huobi
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -710,7 +711,7 @@ func (h *HUOBI) QueryWithdrawQuotas(cryptocurrency string) (WithdrawQuota, error
// SendHTTPRequest sends an unauthenticated HTTP request
func (h *HUOBI) SendHTTPRequest(path string, result interface{}) error {
return h.SendPayload(&request.Item{
return h.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -730,10 +731,11 @@ func (h *HUOBI) SendAuthenticatedHTTPRequest(method, endpoint string, values url
values = url.Values{}
}
now := time.Now()
values.Set("AccessKeyId", h.API.Credentials.Key)
values.Set("SignatureMethod", "HmacSHA256")
values.Set("SignatureVersion", "2")
values.Set("Timestamp", time.Now().UTC().Format("2006-01-02T15:04:05"))
values.Set("Timestamp", now.UTC().Format("2006-01-02T15:04:05"))
if isVersion2API {
endpoint = fmt.Sprintf("/v%s/%s", huobiAPIVersion2, endpoint)
@@ -765,8 +767,11 @@ func (h *HUOBI) SendAuthenticatedHTTPRequest(method, endpoint string, values url
body = encoded
}
// Time difference between your timestamp and standard should be less than 1 minute.
ctx, cancel := context.WithDeadline(context.Background(), now.Add(time.Minute))
defer cancel()
interim := json.RawMessage{}
err := h.SendPayload(&request.Item{
err := h.SendPayload(ctx, &request.Item{
Method: method,
Path: urlPath,
Headers: headers,

View File

@@ -115,7 +115,7 @@ func (h *HUOBI) SetDefaults() {
h.Requester = request.New(h.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
h.API.Endpoints.URLDefault = huobiAPIURL
h.API.Endpoints.URL = h.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package itbit
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -274,7 +275,7 @@ func (i *ItBit) WalletTransfer(walletID, sourceWallet, destWallet string, amount
// SendHTTPRequest sends an unauthenticated HTTP request
func (i *ItBit) SendHTTPRequest(path string, result interface{}) error {
return i.SendPayload(&request.Item{
return i.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -336,7 +337,7 @@ func (i *ItBit) SendAuthenticatedHTTPRequest(method, path string, params map[str
RequestID string `json:"requestId"`
}{}
err = i.SendPayload(&request.Item{
err = i.SendPayload(context.Background(), &request.Item{
Method: method,
Path: urlPath,
Headers: headers,

View File

@@ -98,8 +98,7 @@ func (i *ItBit) SetDefaults() {
}
i.Requester = request.New(i.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
i.API.Endpoints.URLDefault = itbitAPIURL
i.API.Endpoints.URL = i.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package kraken
import (
"context"
"errors"
"fmt"
"net/http"
@@ -859,7 +860,7 @@ func GetError(apiErrors []string) error {
// SendHTTPRequest sends an unauthenticated HTTP requests
func (k *Kraken) SendHTTPRequest(path string, result interface{}) error {
return k.SendPayload(&request.Item{
return k.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -895,7 +896,7 @@ func (k *Kraken) SendAuthenticatedHTTPRequest(method string, params url.Values,
headers["API-Key"] = k.API.Credentials.Key
headers["API-Sign"] = signature
return k.SendPayload(&request.Item{
return k.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: k.API.Endpoints.URL + path,
Headers: headers,

View File

@@ -127,7 +127,7 @@ func (k *Kraken) SetDefaults() {
k.Requester = request.New(k.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
request.NewBasicRateLimit(krakenRateInterval, krakenRequestRate))
request.WithLimiter(request.NewBasicRateLimit(krakenRateInterval, krakenRequestRate)))
k.API.Endpoints.URLDefault = krakenAPIURL
k.API.Endpoints.URL = k.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package lakebtc
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -268,7 +269,7 @@ func (l *LakeBTC) CreateWithdraw(amount float64, accountID string) (Withdraw, er
// SendHTTPRequest sends an unauthenticated http request
func (l *LakeBTC) SendHTTPRequest(path string, result interface{}) error {
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -308,7 +309,7 @@ func (l *LakeBTC) SendAuthenticatedHTTPRequest(method, params string, result int
headers["Authorization"] = "Basic " + crypto.Base64Encode([]byte(l.API.Credentials.Key+":"+crypto.HexEncodeToString(hmac)))
headers["Content-Type"] = "application/json-rpc"
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: l.API.Endpoints.URL,
Headers: headers,

View File

@@ -105,8 +105,7 @@ func (l *LakeBTC) SetDefaults() {
}
l.Requester = request.New(l.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
l.API.Endpoints.URLDefault = lakeBTCAPIURL
l.API.Endpoints.URL = l.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package lbank
import (
"bytes"
"context"
"crypto"
"crypto/rand"
"crypto/rsa"
@@ -494,7 +495,7 @@ func ErrorCapture(code int64) error {
// SendHTTPRequest sends an unauthenticated HTTP request
func (l *Lbank) SendHTTPRequest(path string, result interface{}) error {
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -564,7 +565,7 @@ func (l *Lbank) SendAuthHTTPRequest(method, endpoint string, vals url.Values, re
headers := make(map[string]string)
headers["Content-Type"] = "application/x-www-form-urlencoded"
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: method,
Path: endpoint,
Headers: headers,

View File

@@ -99,8 +99,7 @@ func (l *Lbank) SetDefaults() {
}
l.Requester = request.New(l.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
l.API.Endpoints.URLDefault = lbankAPIURL
l.API.Endpoints.URL = l.API.Endpoints.URLDefault

View File

@@ -2,6 +2,7 @@ package localbitcoins
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
@@ -730,7 +731,7 @@ func (l *LocalBitcoins) GetOrderbook(currency string) (Orderbook, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (l *LocalBitcoins) SendHTTPRequest(path string, result interface{}) error {
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -767,7 +768,7 @@ func (l *LocalBitcoins) SendAuthenticatedHTTPRequest(method, path string, params
path += "?" + encoded
}
return l.SendPayload(&request.Item{
return l.SendPayload(context.Background(), &request.Item{
Method: method,
Path: l.API.Endpoints.URL + path,
Headers: headers,

View File

@@ -98,8 +98,7 @@ func (l *LocalBitcoins) SetDefaults() {
}
l.Requester = request.New(l.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
nil)
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout))
l.API.Endpoints.URLDefault = localbitcoinsAPIURL
l.API.Endpoints.URL = l.API.Endpoints.URLDefault

View File

@@ -120,7 +120,7 @@ func (o *OKCoin) SetDefaults() {
o.Requester = request.New(o.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
// TODO: Specify each individual endpoint rate limits as per docs
request.NewBasicRateLimit(okCoinRateInterval, okCoinStandardRequestRate),
request.WithLimiter(request.NewBasicRateLimit(okCoinRateInterval, okCoinStandardRequestRate)),
)
o.API.Endpoints.URLDefault = okCoinAPIURL

View File

@@ -154,7 +154,7 @@ func (o *OKEX) SetDefaults() {
o.Requester = request.New(o.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
// TODO: Specify each individual endpoint rate limits as per docs
request.NewBasicRateLimit(okExRateInterval, okExRequestRate),
request.WithLimiter(request.NewBasicRateLimit(okExRateInterval, okExRequestRate)),
)
o.API.Endpoints.URLDefault = okExAPIURL

View File

@@ -2,6 +2,7 @@ package okgroup
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -562,7 +563,8 @@ func (o *OKGroup) SendHTTPRequest(httpMethod, requestType, requestPath string, d
o.Name)
}
utcTime := time.Now().UTC().Format(time.RFC3339)
now := time.Now()
utcTime := now.UTC().Format(time.RFC3339)
payload := []byte("")
if data != nil {
@@ -595,6 +597,9 @@ func (o *OKGroup) SendHTTPRequest(httpMethod, requestType, requestPath string, d
headers["OK-ACCESS-PASSPHRASE"] = o.API.Credentials.ClientID
}
// Requests that have a 30+ second difference between the timestamp and the API service time will be considered expired or rejected
ctx, cancel := context.WithDeadline(context.Background(), now.Add(30*time.Second))
defer cancel()
var intermediary json.RawMessage
type errCapFormat struct {
Error int64 `json:"error_code,omitempty"`
@@ -604,7 +609,7 @@ func (o *OKGroup) SendHTTPRequest(httpMethod, requestType, requestPath string, d
errCap := errCapFormat{}
errCap.Result = true
err = o.SendPayload(&request.Item{
err = o.SendPayload(ctx, &request.Item{
Method: strings.ToUpper(httpMethod),
Path: path,
Headers: headers,

View File

@@ -2,6 +2,7 @@ package poloniex
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -749,7 +750,7 @@ func (p *Poloniex) ToggleAutoRenew(orderNumber int64) (bool, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (p *Poloniex) SendHTTPRequest(path string, result interface{}) error {
return p.SendPayload(&request.Item{
return p.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -780,7 +781,7 @@ func (p *Poloniex) SendAuthenticatedHTTPRequest(method, endpoint string, values
path := fmt.Sprintf("%s/%s", p.API.Endpoints.URL, poloniexAPITradingEndpoint)
return p.SendPayload(&request.Item{
return p.SendPayload(context.Background(), &request.Item{
Method: method,
Path: path,
Headers: headers,

View File

@@ -115,7 +115,7 @@ func (p *Poloniex) SetDefaults() {
p.Requester = request.New(p.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
SetRateLimit())
request.WithLimiter(SetRateLimit()))
p.API.Endpoints.URLDefault = poloniexAPIURL
p.API.Endpoints.URL = p.API.Endpoints.URLDefault

View File

@@ -0,0 +1,22 @@
package request
import (
"time"
)
// DefaultBackoff is a default strategy for backoff after a retryable request failure.
func DefaultBackoff() Backoff {
return LinearBackoff(100*time.Millisecond, time.Second)
}
// LinearBackoff applies a backoff increasing by a base amount with each retry capped at a maximum duration.
func LinearBackoff(base, max time.Duration) Backoff {
return func(n int) time.Duration {
d := base * time.Duration(n)
if d > max {
return max
}
return d
}
}

View File

@@ -0,0 +1,79 @@
package request_test
import (
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
func TestLinearBackoff(t *testing.T) {
type args struct {
Backoff request.Backoff
}
type want struct {
Delays map[int]time.Duration
}
testTable := map[string]struct {
Args args
Want want
}{
"Default": {
Args: args{Backoff: request.DefaultBackoff()},
Want: want{Delays: map[int]time.Duration{
1: 100 * time.Millisecond,
2: 200 * time.Millisecond,
3: 300 * time.Millisecond,
4: 400 * time.Millisecond,
9: 900 * time.Millisecond,
10: time.Second,
11: time.Second,
}},
},
"Fixed": {
Args: args{Backoff: request.LinearBackoff(100*time.Millisecond, 100*time.Millisecond)},
Want: want{Delays: map[int]time.Duration{
1: 100 * time.Millisecond,
2: 100 * time.Millisecond,
3: 100 * time.Millisecond,
}},
},
"Quick Cap": {
Args: args{Backoff: request.LinearBackoff(400*time.Millisecond, time.Second)},
Want: want{Delays: map[int]time.Duration{
1: 400 * time.Millisecond,
2: 800 * time.Millisecond,
3: time.Second,
4: time.Second,
}},
},
"Slow Cap": {
Args: args{Backoff: request.LinearBackoff(50*time.Millisecond, time.Minute)},
Want: want{Delays: map[int]time.Duration{
1: 50 * time.Millisecond,
2: 100 * time.Millisecond,
3: 150 * time.Millisecond,
19: time.Second - 50*time.Millisecond,
20: time.Second,
21: time.Second + 50*time.Millisecond,
1199: time.Minute - 50*time.Millisecond,
1200: time.Minute,
1201: time.Minute,
}},
},
}
for name, tt := range testTable {
tt := tt
t.Run(name, func(t *testing.T) {
t.Parallel()
for n, exp := range tt.Want.Delays {
got := tt.Args.Backoff(n)
if got != exp {
t.Errorf("incorrect backoff duration\nexp: %s\ngot: %s", exp, got)
}
}
})
}
}

View File

@@ -64,8 +64,8 @@ func (r *Requester) InitiateRateLimit(e EndpointLimit) error {
return nil
}
if r.Limiter != nil {
return r.Limiter.Limit(e)
if r.limiter != nil {
return r.limiter.Limit(e)
}
return nil

View File

@@ -0,0 +1,22 @@
package request
// WithBackoff configures the backoff strategy for a Requester.
func WithBackoff(b Backoff) RequesterOption {
return func(r *Requester) {
r.backoff = b
}
}
// WithLimiter configures the rate limiter for a Requester.
func WithLimiter(l Limiter) RequesterOption {
return func(r *Requester) {
r.limiter = l
}
}
// WithRetryPolicy configures the retry policy for a Requester.
func WithRetryPolicy(p RetryPolicy) RequesterOption {
return func(r *Requester) {
r.retryPolicy = p
}
}

View File

@@ -1,9 +1,11 @@
package request
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
@@ -19,23 +21,30 @@ import (
)
// New returns a new Requester
func New(name string, httpRequester *http.Client, l Limiter) *Requester {
return &Requester{
HTTPClient: httpRequester,
Limiter: l,
Name: name,
timeoutRetryAttempts: TimeoutRetryAttempts,
timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout),
func New(name string, httpRequester *http.Client, opts ...RequesterOption) *Requester {
r := &Requester{
HTTPClient: httpRequester,
Name: name,
backoff: DefaultBackoff(),
retryPolicy: DefaultRetryPolicy,
maxRetries: MaxRetryAttempts,
timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout),
}
for _, o := range opts {
o(r)
}
return r
}
// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(i *Item) error {
func (r *Requester) SendPayload(ctx context.Context, i *Item) error {
if !i.NonceEnabled {
r.timedLock.LockForDuration()
}
req, err := i.validateRequest(r)
req, err := i.validateRequest(ctx, r)
if err != nil {
r.timedLock.UnlockIfLocked()
return err
@@ -61,7 +70,7 @@ func (r *Requester) SendPayload(i *Item) error {
}
// validateRequest validates the requester item fields
func (i *Item) validateRequest(r *Requester) (*http.Request, error) {
func (i *Item) validateRequest(ctx context.Context, r *Requester) (*http.Request, error) {
if r == nil || r.Name == "" {
return nil, errors.New("not initialised, SetDefaults() called before making request?")
}
@@ -74,7 +83,7 @@ func (i *Item) validateRequest(r *Requester) (*http.Request, error) {
return nil, errors.New("invalid path")
}
req, err := http.NewRequest(i.Method, i.Path, i.Body)
req, err := http.NewRequestWithContext(ctx, i.Method, i.Path, i.Body)
if err != nil {
return nil, err
}
@@ -122,8 +131,7 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error {
}
}
var timeoutError error
for i := 0; i < r.timeoutRetryAttempts+1; i++ {
for attempt := 1; ; attempt++ {
// Initiate a rate limit reservation and sleep on requested endpoint
err := r.InitiateRateLimit(p.Endpoint)
if err != nil {
@@ -131,18 +139,52 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error {
}
resp, err := r.HTTPClient.Do(req)
if err != nil {
if timeoutErr, ok := err.(net.Error); ok && timeoutErr.Timeout() {
if p.Verbose {
log.Errorf(log.RequestSys,
"%s request has timed-out retrying request, count %d",
r.Name,
i)
}
timeoutError = err
continue
if retry, checkErr := r.retryPolicy(resp, err); checkErr != nil {
return checkErr
} else if retry {
if err == nil {
// If the body isn't fully read, the connection cannot be re-used
r.drainBody(resp.Body)
}
return err
// Can't currently regenerate nonce and signatures with fresh values for retries, so for now, we must not retry
if p.NonceEnabled {
if timeoutErr, ok := err.(net.Error); !ok || !timeoutErr.Timeout() {
return fmt.Errorf("request.go error - unable to retry request using nonce, err: %v", err)
}
}
if attempt > r.maxRetries {
if err != nil {
return fmt.Errorf("request.go error - failed to retry request, err: %v", err)
}
return fmt.Errorf("request.go error - failed to retry request, status: %s", resp.Status)
}
after := RetryAfter(resp, time.Now())
backoff := r.backoff(attempt)
delay := backoff
if after > backoff {
delay = after
}
if d, ok := req.Context().Deadline(); ok && d.After(time.Now().Add(delay)) {
if err != nil {
return fmt.Errorf("request.go error - deadline would be exceeded by retry, err: %v", err)
}
return fmt.Errorf("request.go error - deadline would be exceeded by retry, status: %s", resp.Status)
}
if p.Verbose {
log.Errorf(log.RequestSys,
"%s request has failed. Retrying request in %s, attempt %d",
r.Name,
delay,
attempt)
}
time.Sleep(delay)
continue
}
contents, err := ioutil.ReadAll(resp.Body)
@@ -193,8 +235,6 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error {
}
return nil
}
return fmt.Errorf("request.go error - failed to retry request %s",
timeoutError)
}
// GetNonce returns a nonce for requests. This locks and enforces concurrent
@@ -237,3 +277,13 @@ func (r *Requester) SetProxy(p *url.URL) error {
}
return nil
}
func (r *Requester) drainBody(body io.ReadCloser) {
defer body.Close()
if _, err := io.Copy(ioutil.Discard, io.LimitReader(body, drainBodyLimit)); err != nil {
log.Errorf(log.RequestSys,
"%s failed to drain request body %s",
r.Name,
err)
}
}

View File

@@ -1,15 +1,20 @@
package request
import (
"context"
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -22,7 +27,9 @@ var testURL string
var serverLimit *rate.Limiter
func TestMain(m *testing.M) {
serverLimit = NewRateLimit(time.Millisecond*500, 1)
serverLimitInterval := time.Millisecond * 500
serverLimit = NewRateLimit(serverLimitInterval, 1)
serverLimitRetry := NewRateLimit(serverLimitInterval, 1)
sm := http.NewServeMux()
sm.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")
@@ -46,6 +53,22 @@ func TestMain(m *testing.M) {
}
io.WriteString(w, `{"response":true}`)
})
sm.HandleFunc("/rate-retry", func(w http.ResponseWriter, req *http.Request) {
if !serverLimitRetry.Allow() {
w.Header().Add("Retry-After", strconv.Itoa(int(math.Round(serverLimitInterval.Seconds()))))
http.Error(w,
http.StatusText(http.StatusTooManyRequests),
http.StatusTooManyRequests)
io.WriteString(w, `{"response":false}`)
return
}
io.WriteString(w, `{"response":true}`)
})
sm.HandleFunc("/always-retry", func(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Retry-After", time.Now().Format(time.RFC1123))
w.WriteHeader(http.StatusTooManyRequests)
io.WriteString(w, `{"response":false}`)
})
server := httptest.NewServer(sm)
testURL = server.URL
@@ -83,40 +106,40 @@ func TestCheckRequest(t *testing.T) {
t.Parallel()
r := New("TestRequest",
new(http.Client),
nil)
new(http.Client))
ctx := context.Background()
var check *Item
_, err := check.validateRequest(&Requester{})
_, err := check.validateRequest(ctx, &Requester{})
if err == nil {
t.Fatal(unexpected)
}
_, err = check.validateRequest(nil)
_, err = check.validateRequest(ctx, nil)
if err == nil {
t.Fatal(unexpected)
}
_, err = check.validateRequest(r)
_, err = check.validateRequest(ctx, r)
if err == nil {
t.Fatal(unexpected)
}
check = &Item{}
_, err = check.validateRequest(r)
_, err = check.validateRequest(ctx, r)
if err == nil {
t.Fatal(unexpected)
}
check.Path = testURL
check.Method = " " // Forces method check; "" automatically converts to GET
_, err = check.validateRequest(r)
_, err = check.validateRequest(ctx, r)
if err == nil {
t.Fatal(unexpected)
}
check.Method = http.MethodPost
_, err = check.validateRequest(r)
_, err = check.validateRequest(ctx, r)
if err != nil {
t.Fatal(err)
}
@@ -128,7 +151,7 @@ func TestCheckRequest(t *testing.T) {
// Test user agent set
r.UserAgent = "r00t axxs"
req, err := check.validateRequest(r)
req, err := check.validateRequest(ctx, r)
if err != nil {
t.Fatal(err)
}
@@ -174,28 +197,39 @@ func TestDoRequest(t *testing.T) {
t.Parallel()
r := New("test",
new(http.Client),
&globalshell)
WithLimiter(&globalshell))
ctx := context.Background()
err := r.SendPayload(&Item{})
err := r.SendPayload(ctx, &Item{})
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "invalid path") {
t.Fatal(err)
}
err = r.SendPayload(&Item{Method: http.MethodGet})
err = r.SendPayload(ctx, &Item{Method: http.MethodGet})
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "invalid path") {
t.Fatal(err)
}
err = r.SendPayload(&Item{
// Invalid/missing endpoint limit
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
})
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "cannot execute functionality") {
t.Fatal(err)
}
// force debug
err = r.SendPayload(&Item{
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
HTTPDebugging: true,
@@ -204,28 +238,39 @@ func TestDoRequest(t *testing.T) {
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "cannot execute functionality") {
t.Fatal(err)
}
// max request job ceiling
r.jobs = MaxRequestJobs
err = r.SendPayload(&Item{
Method: http.MethodGet,
Path: testURL,
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Endpoint: UnAuth,
})
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "max request jobs reached") {
t.Fatal(err)
}
// reset jobs
r.jobs = 0
// timeout checker
r.HTTPClient.Timeout = time.Millisecond * 50
err = r.SendPayload(&Item{
Method: http.MethodGet,
Path: testURL + "/timeout",
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL + "/timeout",
Endpoint: UnAuth,
})
if err == nil {
t.Fatal(unexpected)
}
if !strings.Contains(err.Error(), "failed to retry request") {
t.Fatal(err)
}
// reset timeout
r.HTTPClient.Timeout = 0
@@ -233,7 +278,7 @@ func TestDoRequest(t *testing.T) {
var resp struct {
Response bool `json:"response"`
}
err = r.SendPayload(&Item{
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Result: &resp,
@@ -250,7 +295,7 @@ func TestDoRequest(t *testing.T) {
var respErr struct {
Error bool `json:"error"`
}
err = r.SendPayload(&Item{
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Result: &respErr,
@@ -263,7 +308,8 @@ func TestDoRequest(t *testing.T) {
t.Fatal(unexpected)
}
// Check rate limit
// Check client side rate limit
var failed int32
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
@@ -271,7 +317,7 @@ func TestDoRequest(t *testing.T) {
var resp struct {
Response bool `json:"response"`
}
payloadError := r.SendPayload(&Item{
payloadError := r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL + "/rate",
Result: &resp,
@@ -280,21 +326,102 @@ func TestDoRequest(t *testing.T) {
})
wg.Done()
if payloadError != nil {
atomic.StoreInt32(&failed, 1)
log.Fatal(payloadError)
}
if !resp.Response {
atomic.StoreInt32(&failed, 1)
log.Fatal(unexpected)
}
}(&wg)
}
wg.Wait()
if failed != 0 {
t.Fatal("request failed")
}
}
func TestDoRequest_Retries(t *testing.T) {
t.Parallel()
backoff := func(n int) time.Duration {
return 0
}
r := New("test", new(http.Client), WithBackoff(backoff))
var failed int32
var wg sync.WaitGroup
wg.Add(4)
for i := 0; i < 4; i++ {
go func(wg *sync.WaitGroup) {
defer wg.Done()
var resp struct {
Response bool `json:"response"`
}
payloadError := r.SendPayload(context.Background(), &Item{
Method: http.MethodGet,
Path: testURL + "/rate-retry",
Result: &resp,
AuthRequest: true,
Endpoint: Auth,
})
if payloadError != nil {
atomic.StoreInt32(&failed, 1)
log.Fatal(payloadError)
}
if !resp.Response {
atomic.StoreInt32(&failed, 1)
log.Fatal(unexpected)
}
}(&wg)
}
wg.Wait()
if failed != 0 {
t.Fatal("request failed")
}
}
func TestDoRequest_RetryNonRecoverable(t *testing.T) {
t.Parallel()
backoff := func(n int) time.Duration {
return 0
}
r := New("test", new(http.Client), WithBackoff(backoff))
payloadError := r.SendPayload(context.Background(), &Item{
Method: http.MethodGet,
Path: testURL + "/always-retry",
})
if payloadError == nil {
t.Fatal("expected an error")
}
}
func TestDoRequest_NotRetryable(t *testing.T) {
t.Parallel()
retry := func(resp *http.Response, err error) (bool, error) {
return false, errors.New("not retryable")
}
backoff := func(n int) time.Duration {
return time.Duration(n) * time.Millisecond
}
r := New("test", new(http.Client), WithRetryPolicy(retry), WithBackoff(backoff))
payloadError := r.SendPayload(context.Background(), &Item{
Method: http.MethodGet,
Path: testURL + "/always-retry",
})
if payloadError == nil {
t.Fatal("expected an error")
}
}
func TestGetNonce(t *testing.T) {
t.Parallel()
r := New("test",
new(http.Client),
&globalshell)
WithLimiter(&globalshell))
n1 := r.GetNonce(false)
n2 := r.GetNonce(false)
@@ -304,7 +431,7 @@ func TestGetNonce(t *testing.T) {
r2 := New("test",
new(http.Client),
&globalshell)
WithLimiter(&globalshell))
n3 := r2.GetNonce(true)
n4 := r2.GetNonce(true)
if n3 == n4 {
@@ -316,7 +443,7 @@ func TestGetNonceMillis(t *testing.T) {
t.Parallel()
r := New("test",
new(http.Client),
&globalshell)
WithLimiter(&globalshell))
m1 := r.GetNonceMilli()
m2 := r.GetNonceMilli()
if m1 == m2 {
@@ -328,7 +455,7 @@ func TestSetProxy(t *testing.T) {
t.Parallel()
r := New("test",
new(http.Client),
&globalshell)
WithLimiter(&globalshell))
u, err := url.Parse("http://www.google.com")
if err != nil {
t.Fatal(err)
@@ -350,15 +477,16 @@ func TestSetProxy(t *testing.T) {
func TestBasicLimiter(t *testing.T) {
r := New("test",
new(http.Client),
NewBasicRateLimit(time.Second, 1))
WithLimiter(NewBasicRateLimit(time.Second, 1)))
i := Item{
Path: "http://www.google.com",
Method: http.MethodGet,
}
ctx := context.Background()
tn := time.Now()
_ = r.SendPayload(&i)
_ = r.SendPayload(&i)
_ = r.SendPayload(ctx, &i)
_ = r.SendPayload(ctx, &i)
if time.Since(tn) < time.Second {
t.Error("rate limit issues")
}
@@ -367,10 +495,11 @@ func TestBasicLimiter(t *testing.T) {
func TestEnableDisableRateLimit(t *testing.T) {
r := New("TestRequest",
new(http.Client),
NewBasicRateLimit(time.Minute, 1))
WithLimiter(NewBasicRateLimit(time.Minute, 1)))
ctx := context.Background()
var resp interface{}
err := r.SendPayload(&Item{
err := r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Result: &resp,
@@ -391,7 +520,7 @@ func TestEnableDisableRateLimit(t *testing.T) {
t.Fatal(err)
}
err = r.SendPayload(&Item{
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Result: &resp,
@@ -415,7 +544,7 @@ func TestEnableDisableRateLimit(t *testing.T) {
ti := time.NewTicker(time.Second)
c := make(chan struct{})
go func(c chan struct{}) {
err = r.SendPayload(&Item{
err = r.SendPayload(ctx, &Item{
Method: http.MethodGet,
Path: testURL,
Result: &resp,

View File

@@ -11,30 +11,33 @@ import (
// Const vars for rate limiter
const (
DefaultMaxRequestJobs int32 = 50
DefaultTimeoutRetryAttempts = 3
DefaultMutexLockTimeout = 50 * time.Millisecond
proxyTLSTimeout = 15 * time.Second
userAgent = "User-Agent"
DefaultMaxRequestJobs int32 = 50
DefaultMaxRetryAttempts = 3
DefaultMutexLockTimeout = 50 * time.Millisecond
drainBodyLimit = 100000
proxyTLSTimeout = 15 * time.Second
userAgent = "User-Agent"
)
// Vars for rate limiter
var (
MaxRequestJobs = DefaultMaxRequestJobs
TimeoutRetryAttempts = DefaultTimeoutRetryAttempts
MaxRequestJobs = DefaultMaxRequestJobs
MaxRetryAttempts = DefaultMaxRetryAttempts
)
// Requester struct for the request client
type Requester struct {
HTTPClient *http.Client
Limiter Limiter
Name string
UserAgent string
timeoutRetryAttempts int
jobs int32
Nonce nonce.Nonce
disableRateLimiter int32
timedLock *timedmutex.TimedMutex
HTTPClient *http.Client
limiter Limiter
Name string
UserAgent string
maxRetries int
jobs int32
Nonce nonce.Nonce
disableRateLimiter int32
backoff Backoff
retryPolicy RetryPolicy
timedLock *timedmutex.TimedMutex
}
// Item is a temp item for requests
@@ -52,3 +55,12 @@ type Item struct {
IsReserved bool
Endpoint EndpointLimit
}
// Backoff determines how long to wait between request attempts.
type Backoff func(n int) time.Duration
// RetryPolicy determines whether the request should be retried.
type RetryPolicy func(resp *http.Response, err error) (bool, error)
// RequesterOption is a function option that can be applied to configure a Requester when creating it.
type RequesterOption func(*Requester)

View File

@@ -0,0 +1,55 @@
package request
import (
"net"
"net/http"
"strconv"
"time"
)
const (
headerRetryAfter = "Retry-After"
)
// DefaultRetryPolicy determines whether the request should be retried, implemented with a default strategy.
func DefaultRetryPolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
if timeoutErr, ok := err.(net.Error); ok && timeoutErr.Timeout() {
return true, nil
}
return false, err
}
if resp.StatusCode == http.StatusTooManyRequests {
return true, nil
}
if resp.Header.Get(headerRetryAfter) != "" {
return true, nil
}
return false, nil
}
// RetryAfter parses the Retry-After header in the response to determine the minimum
// duration needed to wait before retrying.
func RetryAfter(resp *http.Response, now time.Time) time.Duration {
if resp == nil {
return 0
}
after := resp.Header.Get(headerRetryAfter)
if after == "" {
return 0
}
if sec, err := strconv.ParseInt(after, 10, 32); err == nil {
return time.Duration(sec) * time.Second
}
if when, err := time.Parse(time.RFC1123, after); err == nil {
return when.Sub(now)
}
return 0
}

View File

@@ -0,0 +1,124 @@
package request_test
import (
"net"
"net/http"
"reflect"
"testing"
"time"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
func TestDefaultRetryPolicy(t *testing.T) {
type args struct {
Error error
Response *http.Response
}
type want struct {
Error error
Retry bool
}
testTable := map[string]struct {
Args args
Want want
}{
"DNS Error": {
Args: args{Error: &net.DNSError{Err: "fake"}},
Want: want{Error: &net.DNSError{Err: "fake"}},
},
"DNS Timeout": {
Args: args{Error: &net.DNSError{Err: "fake", IsTimeout: true}},
Want: want{Retry: true},
},
"Too Many Requests": {
Args: args{Response: &http.Response{StatusCode: http.StatusTooManyRequests}},
Want: want{Retry: true},
},
"Not Found": {
Args: args{Response: &http.Response{StatusCode: http.StatusNotFound}},
},
"Retry After": {
Args: args{Response: &http.Response{StatusCode: http.StatusTeapot, Header: http.Header{"Retry-After": []string{"0.5"}}}},
Want: want{Retry: true},
},
}
for name, tt := range testTable {
tt := tt
t.Run(name, func(t *testing.T) {
t.Parallel()
retry, err := request.DefaultRetryPolicy(tt.Args.Response, tt.Args.Error)
if exp := tt.Want.Error; exp != nil {
if !reflect.DeepEqual(err, exp) {
t.Fatalf("unexpected error\nexp: %#v, got: %#v", exp, err)
}
return
}
if err != nil {
t.Fatalf("unexpected error\nexp: <nil>, got: %#v", err)
}
if tt.Want.Retry != retry {
t.Fatalf("incorrect retry flag\nexp: %v, got: %v", tt.Want.Retry, retry)
}
})
}
}
func TestRetryAfter(t *testing.T) {
now := time.Date(2020, time.April, 20, 13, 31, 13, 0, time.UTC)
type args struct {
Now time.Time
Response *http.Response
}
type want struct {
Delay time.Duration
}
testTable := map[string]struct {
Args args
Want want
}{
"No Response": {},
"Empty Header": {
Args: args{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Header: http.Header{"Retry-After": []string{""}}}},
},
"Partial Seconds": {
Args: args{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Header: http.Header{"Retry-After": []string{"0.5"}}}},
},
"Delay Seconds": {
Args: args{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Header: http.Header{"Retry-After": []string{"3"}}}},
Want: want{Delay: 3 * time.Second},
},
"Invalid HTTP Date RFC3339": {
Args: args{
Now: now,
Response: &http.Response{StatusCode: http.StatusTeapot, Header: http.Header{"Retry-After": []string{"2020-04-02T13:31:18Z"}}},
},
},
"Valid HTTP Date": {
Args: args{
Now: now,
Response: &http.Response{StatusCode: http.StatusTeapot, Header: http.Header{"Retry-After": []string{"Mon, 20 Apr 2020 13:31:18 GMT"}}},
},
Want: want{Delay: 5 * time.Second},
},
}
for name, tt := range testTable {
tt := tt
t.Run(name, func(t *testing.T) {
t.Parallel()
delay := request.RetryAfter(tt.Args.Response, tt.Args.Now)
if exp := tt.Want.Delay; delay != exp {
t.Fatalf("unexpected delay\nexp: %v, got: %v", exp, delay)
}
})
}
}

View File

@@ -1,6 +1,7 @@
package yobit
import (
"context"
"errors"
"fmt"
"net/http"
@@ -257,7 +258,7 @@ func (y *Yobit) RedeemCoupon(coupon string) (RedeemCoupon, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (y *Yobit) SendHTTPRequest(path string, result interface{}) error {
return y.SendPayload(&request.Item{
return y.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -297,7 +298,7 @@ func (y *Yobit) SendAuthenticatedHTTPRequest(path string, params url.Values, res
headers["Sign"] = crypto.HexEncodeToString(hmac)
headers["Content-Type"] = "application/x-www-form-urlencoded"
return y.SendPayload(&request.Item{
return y.SendPayload(context.Background(), &request.Item{
Method: http.MethodPost,
Path: apiPrivateURL,
Headers: headers,

View File

@@ -105,7 +105,7 @@ func (y *Yobit) SetDefaults() {
y.Requester = request.New(y.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
// Server responses are cached every 2 seconds.
request.NewBasicRateLimit(time.Second, 1))
request.WithLimiter(request.NewBasicRateLimit(time.Second, 1)))
y.API.Endpoints.URLDefault = apiPublicURL
y.API.Endpoints.URL = y.API.Endpoints.URLDefault

View File

@@ -1,6 +1,7 @@
package zb
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -277,7 +278,7 @@ func (z *ZB) GetCryptoAddress(currency currency.Code) (UserAddress, error) {
// SendHTTPRequest sends an unauthenticated HTTP request
func (z *ZB) SendHTTPRequest(path string, result interface{}) error {
return z.SendPayload(&request.Item{
return z.SendPayload(context.Background(), &request.Item{
Method: http.MethodGet,
Path: path,
Result: result,
@@ -299,7 +300,8 @@ func (z *ZB) SendAuthenticatedHTTPRequest(httpMethod string, params url.Values,
[]byte(params.Encode()),
[]byte(crypto.Sha1ToHex(z.API.Credentials.Secret)))
params.Set("reqTime", fmt.Sprintf("%d", convert.UnixMillis(time.Now())))
now := time.Now()
params.Set("reqTime", fmt.Sprintf("%d", convert.UnixMillis(now)))
params.Set("sign", fmt.Sprintf("%x", hmac))
urlPath := fmt.Sprintf("%s/%s?%s",
@@ -314,7 +316,10 @@ func (z *ZB) SendAuthenticatedHTTPRequest(httpMethod string, params url.Values,
Message string `json:"message"`
}{}
err := z.SendPayload(&request.Item{
// Expiry of timestamp doesn't appear to be documented, so making a reasonable assumption
ctx, cancel := context.WithDeadline(context.Background(), now.Add(15*time.Second))
defer cancel()
err := z.SendPayload(ctx, &request.Item{
Method: httpMethod,
Path: urlPath,
Body: strings.NewReader(""),

View File

@@ -115,7 +115,7 @@ func (z *ZB) SetDefaults() {
z.Requester = request.New(z.Name,
common.NewHTTPClientWithTimeout(exchange.DefaultHTTPTimeout),
// TODO: Implement full rate limit for endpoints
request.NewBasicRateLimit(zbRateInterval, zbReqRate))
request.WithLimiter(request.NewBasicRateLimit(zbRateInterval, zbReqRate)))
z.API.Endpoints.URLDefault = zbTradeURL
z.API.Endpoints.URL = z.API.Endpoints.URLDefault