mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-14 07:26:47 +00:00
* drop common uuid v4 func and imported package as needed * removed common functions regarding json marshal and unmarshal and used the json package directly. WRT unmarshal it was calling reflect and converted to string which is also checked in the JSON package so it was doing a double up, this will be a tiny gain as it was directly used in the requester package for all our outbound requests. * add in string * explicitly throw away return error value * atleast return the error that websocket initialise returns * return error when not connected * fix comment * Adds comments * move package declarations * drop append whenever we call supported * remove unused import * Change incorrect spelling * fix tests * fix go import issue
513 lines
13 KiB
Go
513 lines
13 KiB
Go
package request
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/thrasher-corp/gocryptotrader/common"
|
|
"github.com/thrasher-corp/gocryptotrader/common/timedmutex"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/mock"
|
|
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
|
|
log "github.com/thrasher-corp/gocryptotrader/logger"
|
|
)
|
|
|
|
// NewRateLimit creates a new RateLimit
|
|
func NewRateLimit(d time.Duration, rate int) *RateLimit {
|
|
return &RateLimit{Duration: d, Rate: rate}
|
|
}
|
|
|
|
// String returns the rate limiter in string notation
|
|
func (r *RateLimit) String() string {
|
|
return fmt.Sprintf("Rate limiter set to %d requests per %v", r.Rate, r.Duration)
|
|
}
|
|
|
|
// GetRate returns the ratelimit rate
|
|
func (r *RateLimit) GetRate() int {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
return r.Rate
|
|
}
|
|
|
|
// SetRate sets the ratelimit rate
|
|
func (r *RateLimit) SetRate(rate int) {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
r.Rate = rate
|
|
}
|
|
|
|
// GetRequests returns the number of requests for the ratelimit
|
|
func (r *RateLimit) GetRequests() int {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
return r.Requests
|
|
}
|
|
|
|
// SetRequests sets requests counter for the rateliit
|
|
func (r *RateLimit) SetRequests(l int) {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
r.Requests = l
|
|
}
|
|
|
|
// SetDuration sets the duration for the ratelimit
|
|
func (r *RateLimit) SetDuration(d time.Duration) {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
r.Duration = d
|
|
}
|
|
|
|
// GetDuration gets the duration for the ratelimit
|
|
func (r *RateLimit) GetDuration() time.Duration {
|
|
r.Mutex.Lock()
|
|
defer r.Mutex.Unlock()
|
|
return r.Duration
|
|
}
|
|
|
|
// StartCycle restarts the cycle time and requests counters
|
|
func (r *Requester) StartCycle() {
|
|
r.Cycle = time.Now()
|
|
r.AuthLimit.SetRequests(0)
|
|
r.UnauthLimit.SetRequests(0)
|
|
}
|
|
|
|
// IsRateLimited returns whether or not the request Requester is rate limited
|
|
func (r *Requester) IsRateLimited(auth bool) bool {
|
|
if auth {
|
|
if r.AuthLimit.GetRequests() >= r.AuthLimit.GetRate() && r.IsValidCycle(auth) {
|
|
return true
|
|
}
|
|
} else {
|
|
if r.UnauthLimit.GetRequests() >= r.UnauthLimit.GetRate() && r.IsValidCycle(auth) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// RequiresRateLimiter returns whether or not the request Requester requires a rate limiter
|
|
func (r *Requester) RequiresRateLimiter() bool {
|
|
if DisableRateLimiter {
|
|
return false
|
|
}
|
|
|
|
if r.AuthLimit.GetRate() != 0 || r.UnauthLimit.GetRate() != 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IncrementRequests increments the ratelimiter request counter for either auth or unauth
|
|
// requests
|
|
func (r *Requester) IncrementRequests(auth bool) {
|
|
if auth {
|
|
reqs := r.AuthLimit.GetRequests()
|
|
reqs++
|
|
r.AuthLimit.SetRequests(reqs)
|
|
return
|
|
}
|
|
|
|
reqs := r.UnauthLimit.GetRequests()
|
|
reqs++
|
|
r.UnauthLimit.SetRequests(reqs)
|
|
}
|
|
|
|
// DecrementRequests decrements the ratelimiter request counter for either auth or unauth
|
|
// requests
|
|
func (r *Requester) DecrementRequests(auth bool) {
|
|
if auth {
|
|
reqs := r.AuthLimit.GetRequests()
|
|
reqs--
|
|
r.AuthLimit.SetRequests(reqs)
|
|
return
|
|
}
|
|
|
|
reqs := r.AuthLimit.GetRequests()
|
|
reqs--
|
|
r.UnauthLimit.SetRequests(reqs)
|
|
}
|
|
|
|
// SetRateLimit sets the request Requester ratelimiter
|
|
func (r *Requester) SetRateLimit(auth bool, duration time.Duration, rate int) {
|
|
if auth {
|
|
r.AuthLimit.SetRate(rate)
|
|
r.AuthLimit.SetDuration(duration)
|
|
return
|
|
}
|
|
r.UnauthLimit.SetRate(rate)
|
|
r.UnauthLimit.SetDuration(duration)
|
|
}
|
|
|
|
// GetRateLimit gets the request Requester ratelimiter
|
|
func (r *Requester) GetRateLimit(auth bool) *RateLimit {
|
|
if auth {
|
|
return r.AuthLimit
|
|
}
|
|
return r.UnauthLimit
|
|
}
|
|
|
|
// SetTimeoutRetryAttempts sets the amount of times the job will be retried
|
|
// if it times out
|
|
func (r *Requester) SetTimeoutRetryAttempts(n int) error {
|
|
if n < 0 {
|
|
return errors.New("routines.go error - timeout retry attempts cannot be less than zero")
|
|
}
|
|
r.timeoutRetryAttempts = n
|
|
return nil
|
|
}
|
|
|
|
// New returns a new Requester
|
|
func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Client) *Requester {
|
|
return &Requester{
|
|
HTTPClient: httpRequester,
|
|
UnauthLimit: unauthLimit,
|
|
AuthLimit: authLimit,
|
|
Name: name,
|
|
Jobs: make(chan Job, MaxRequestJobs),
|
|
timeoutRetryAttempts: TimeoutRetryAttempts,
|
|
timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout),
|
|
}
|
|
}
|
|
|
|
// IsValidMethod returns whether the supplied method is supported
|
|
func IsValidMethod(method string) bool {
|
|
return common.StringDataCompareInsensitive(supportedMethods, method)
|
|
}
|
|
|
|
// IsValidCycle checks to see whether the current request cycle is valid or not
|
|
func (r *Requester) IsValidCycle(auth bool) bool {
|
|
if auth {
|
|
if time.Since(r.Cycle) < r.AuthLimit.GetDuration() {
|
|
return true
|
|
}
|
|
} else {
|
|
if time.Since(r.Cycle) < r.UnauthLimit.GetDuration() {
|
|
return true
|
|
}
|
|
}
|
|
|
|
r.StartCycle()
|
|
return false
|
|
}
|
|
|
|
func (r *Requester) checkRequest(method, path string, body io.Reader, headers map[string]string) (*http.Request, error) {
|
|
req, err := http.NewRequest(method, path, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for k, v := range headers {
|
|
req.Header.Add(k, v)
|
|
}
|
|
|
|
if r.UserAgent != "" && req.Header.Get("User-Agent") == "" {
|
|
req.Header.Add("User-Agent", r.UserAgent)
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
// DoRequest performs a HTTP/HTTPS request with the supplied params
|
|
func (r *Requester) DoRequest(req *http.Request, path string, body io.Reader, result interface{}, authRequest, verbose, httpDebug, httpRecord bool) error {
|
|
if verbose {
|
|
log.Debugf(log.Global,
|
|
"%s exchange request path: %s requires rate limiter: %v",
|
|
r.Name,
|
|
path,
|
|
r.RequiresRateLimiter())
|
|
|
|
for k, d := range req.Header {
|
|
log.Debugf(log.Global, "%s exchange request header [%s]: %s", r.Name, k, d)
|
|
}
|
|
log.Debugf(log.Global,
|
|
"%s exchange request type: %s", r.Name, req.Method)
|
|
log.Debugf(log.Global,
|
|
"%s exchange request body: %v", r.Name, body)
|
|
}
|
|
|
|
var timeoutError error
|
|
for i := 0; i < r.timeoutRetryAttempts+1; i++ {
|
|
resp, err := r.HTTPClient.Do(req)
|
|
if err != nil {
|
|
if timeoutErr, ok := err.(net.Error); ok && timeoutErr.Timeout() {
|
|
if verbose {
|
|
log.Errorf(log.ExchangeSys, "%s request has timed-out retrying request, count %d",
|
|
r.Name,
|
|
i)
|
|
}
|
|
timeoutError = err
|
|
continue
|
|
}
|
|
|
|
if r.RequiresRateLimiter() {
|
|
r.DecrementRequests(authRequest)
|
|
}
|
|
return err
|
|
}
|
|
if resp == nil {
|
|
if r.RequiresRateLimiter() {
|
|
r.DecrementRequests(authRequest)
|
|
}
|
|
return errors.New("resp is nil")
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
switch resp.Header.Get("Content-Encoding") {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(resp.Body)
|
|
defer reader.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
case "json":
|
|
reader = resp.Body
|
|
|
|
default:
|
|
switch {
|
|
case strings.Contains(resp.Header.Get("Content-Type"), "application/json"):
|
|
reader = resp.Body
|
|
|
|
default:
|
|
if verbose {
|
|
log.Warnf(log.ExchangeSys,
|
|
"%s request response content type differs from JSON; received %v [path: %s]\n",
|
|
r.Name,
|
|
resp.Header.Get("Content-Type"),
|
|
path)
|
|
}
|
|
reader = resp.Body
|
|
}
|
|
}
|
|
|
|
contents, err := ioutil.ReadAll(reader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if httpRecord {
|
|
// This dumps http responses for future mocking implementations
|
|
err = mock.HTTPRecord(resp, r.Name, contents)
|
|
if err != nil {
|
|
return fmt.Errorf("mock recording failure %s", err)
|
|
}
|
|
}
|
|
|
|
if resp.StatusCode != 200 && resp.StatusCode != 201 && resp.StatusCode != 202 {
|
|
err = fmt.Errorf("unsuccessful HTTP status code: %d", resp.StatusCode)
|
|
if verbose {
|
|
err = fmt.Errorf("%s\n%s", err.Error(),
|
|
fmt.Sprintf("%s exchange raw response: %s", r.Name, string(contents)))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
if httpDebug {
|
|
dump, err := httputil.DumpResponse(resp, false)
|
|
if err != nil {
|
|
log.Errorf(log.Global, "DumpResponse invalid response: %v:", err)
|
|
}
|
|
log.Debugf(log.Global, "DumpResponse Headers (%v):\n%s", path, dump)
|
|
log.Debugf(log.Global, "DumpResponse Body (%v):\n %s", path, string(contents))
|
|
}
|
|
|
|
resp.Body.Close()
|
|
if verbose {
|
|
log.Debugf(log.ExchangeSys, "HTTP status: %s, Code: %v", resp.Status, resp.StatusCode)
|
|
if !httpDebug {
|
|
log.Debugf(log.ExchangeSys, "%s exchange raw response: %s", r.Name, string(contents))
|
|
}
|
|
}
|
|
|
|
if result != nil {
|
|
return json.Unmarshal(contents, result)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
return fmt.Errorf("request.go error - failed to retry request %s",
|
|
timeoutError)
|
|
}
|
|
|
|
func (r *Requester) worker() {
|
|
for {
|
|
for x := range r.Jobs {
|
|
if !r.IsRateLimited(x.AuthRequest) {
|
|
r.IncrementRequests(x.AuthRequest)
|
|
|
|
err := r.DoRequest(x.Request, x.Path, x.Body, x.Result, x.AuthRequest, x.Verbose, x.HTTPDebugging, x.Record)
|
|
x.JobResult <- &JobResult{
|
|
Error: err,
|
|
Result: x.Result,
|
|
}
|
|
} else {
|
|
limit := r.GetRateLimit(x.AuthRequest)
|
|
diff := limit.GetDuration() - time.Since(r.Cycle)
|
|
if x.Verbose {
|
|
log.Debugf(log.ExchangeSys, "%s request. Rate limited! Sleeping for %v", r.Name, diff)
|
|
}
|
|
time.Sleep(diff)
|
|
|
|
for {
|
|
if r.IsRateLimited(x.AuthRequest) {
|
|
time.Sleep(time.Millisecond)
|
|
continue
|
|
}
|
|
r.IncrementRequests(x.AuthRequest)
|
|
|
|
if x.Verbose {
|
|
log.Debugf(log.ExchangeSys, "%s request. No longer rate limited! Doing request", r.Name)
|
|
}
|
|
|
|
err := r.DoRequest(x.Request, x.Path, x.Body, x.Result, x.AuthRequest, x.Verbose, x.HTTPDebugging, x.Record)
|
|
x.JobResult <- &JobResult{
|
|
Error: err,
|
|
Result: x.Result,
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendPayload handles sending HTTP/HTTPS requests
|
|
func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, nonceEnabled, verbose, httpDebugging, record bool) error {
|
|
if !nonceEnabled {
|
|
r.timedLock.LockForDuration()
|
|
}
|
|
|
|
if r == nil || r.Name == "" {
|
|
r.timedLock.UnlockIfLocked()
|
|
return errors.New("not initiliased, SetDefaults() called before making request?")
|
|
}
|
|
|
|
if !IsValidMethod(method) {
|
|
r.timedLock.UnlockIfLocked()
|
|
return fmt.Errorf("incorrect method supplied %s: supported %s", method, supportedMethods)
|
|
}
|
|
|
|
if path == "" {
|
|
r.timedLock.UnlockIfLocked()
|
|
return errors.New("invalid path")
|
|
}
|
|
|
|
req, err := r.checkRequest(method, path, body, headers)
|
|
if err != nil {
|
|
r.timedLock.UnlockIfLocked()
|
|
return err
|
|
}
|
|
|
|
if httpDebugging {
|
|
dump, err := httputil.DumpRequestOut(req, true)
|
|
if err != nil {
|
|
log.Errorf(log.Global,
|
|
"DumpRequest invalid response %v:", err)
|
|
}
|
|
log.Debugf(log.Global,
|
|
"DumpRequest:\n%s", dump)
|
|
}
|
|
|
|
if !r.RequiresRateLimiter() {
|
|
r.timedLock.UnlockIfLocked()
|
|
return r.DoRequest(req, path, body, result, authRequest, verbose, httpDebugging, record)
|
|
}
|
|
|
|
if len(r.Jobs) == MaxRequestJobs {
|
|
r.timedLock.UnlockIfLocked()
|
|
return errors.New("max request jobs reached")
|
|
}
|
|
|
|
r.m.Lock()
|
|
if !r.WorkerStarted {
|
|
r.StartCycle()
|
|
r.WorkerStarted = true
|
|
go r.worker()
|
|
}
|
|
r.m.Unlock()
|
|
|
|
jobResult := make(chan *JobResult)
|
|
|
|
newJob := Job{
|
|
Request: req,
|
|
Method: method,
|
|
Path: path,
|
|
Headers: headers,
|
|
Body: body,
|
|
Result: result,
|
|
JobResult: jobResult,
|
|
AuthRequest: authRequest,
|
|
Verbose: verbose,
|
|
HTTPDebugging: httpDebugging,
|
|
Record: record,
|
|
}
|
|
|
|
if verbose {
|
|
log.Debugf(log.ExchangeSys, "%s request. Attaching new job.", r.Name)
|
|
}
|
|
r.Jobs <- newJob
|
|
r.timedLock.UnlockIfLocked()
|
|
|
|
if verbose {
|
|
log.Debugf(log.ExchangeSys, "%s request. Waiting for job to complete.", r.Name)
|
|
}
|
|
resp := <-newJob.JobResult
|
|
|
|
if verbose {
|
|
log.Debugf(log.ExchangeSys, "%s request. Job complete.", r.Name)
|
|
}
|
|
|
|
return resp.Error
|
|
}
|
|
|
|
// GetNonce returns a nonce for requests. This locks and enforces concurrent
|
|
// nonce FIFO on the buffered job channel
|
|
func (r *Requester) GetNonce(isNano bool) nonce.Value {
|
|
r.timedLock.LockForDuration()
|
|
if r.Nonce.Get() == 0 {
|
|
if isNano {
|
|
r.Nonce.Set(time.Now().UnixNano())
|
|
} else {
|
|
r.Nonce.Set(time.Now().Unix())
|
|
}
|
|
return r.Nonce.Get()
|
|
}
|
|
r.Nonce.Inc()
|
|
return r.Nonce.Get()
|
|
}
|
|
|
|
// GetNonceMilli returns a nonce for requests. This locks and enforces concurrent
|
|
// nonce FIFO on the buffered job channel this is for millisecond
|
|
func (r *Requester) GetNonceMilli() nonce.Value {
|
|
r.timedLock.LockForDuration()
|
|
if r.Nonce.Get() == 0 {
|
|
r.Nonce.Set(time.Now().UnixNano() / int64(time.Millisecond))
|
|
return r.Nonce.Get()
|
|
}
|
|
r.Nonce.Inc()
|
|
return r.Nonce.Get()
|
|
}
|
|
|
|
// SetProxy sets a proxy address to the client transport
|
|
func (r *Requester) SetProxy(p *url.URL) error {
|
|
if p.String() == "" {
|
|
return errors.New("no proxy URL supplied")
|
|
}
|
|
|
|
r.HTTPClient.Transport = &http.Transport{
|
|
Proxy: http.ProxyURL(p),
|
|
TLSHandshakeTimeout: proxyTLSTimeout,
|
|
}
|
|
return nil
|
|
}
|