Files
gocryptotrader/exchanges/request/request.go
Andrew 7fccc03164 (Logger) Rename package to log (#444)
* renamed package to log to stop side import requirement

* reverted comment changes

* reverted comment changes

* one more reverting wording back to logger

* wording changes on comments
2020-02-12 18:09:56 +11:00

237 lines
5.5 KiB
Go

package request
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"sync/atomic"
"time"
"github.com/thrasher-corp/gocryptotrader/common/timedmutex"
"github.com/thrasher-corp/gocryptotrader/exchanges/mock"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/log"
)
// New returns a new Requester
func New(name string, httpRequester *http.Client, l Limiter) *Requester {
return &Requester{
HTTPClient: httpRequester,
Limiter: l,
Name: name,
timeoutRetryAttempts: TimeoutRetryAttempts,
timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout),
}
}
// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(i *Item) error {
if !i.NonceEnabled {
r.timedLock.LockForDuration()
}
req, err := i.validateRequest(r)
if err != nil {
r.timedLock.UnlockIfLocked()
return err
}
if i.HTTPDebugging {
// Err not evaluated due to validation check above
dump, _ := httputil.DumpRequestOut(req, true)
log.Debugf(log.RequestSys, "DumpRequest:\n%s", dump)
}
if atomic.LoadInt32(&r.jobs) >= MaxRequestJobs {
r.timedLock.UnlockIfLocked()
return errors.New("max request jobs reached")
}
atomic.AddInt32(&r.jobs, 1)
err = r.doRequest(req, i)
atomic.AddInt32(&r.jobs, -1)
r.timedLock.UnlockIfLocked()
return err
}
// validateRequest validates the requester item fields
func (i *Item) validateRequest(r *Requester) (*http.Request, error) {
if r == nil || r.Name == "" {
return nil, errors.New("not initialised, SetDefaults() called before making request?")
}
if i == nil {
return nil, errors.New("request item cannot be nil")
}
if i.Path == "" {
return nil, errors.New("invalid path")
}
req, err := http.NewRequest(i.Method, i.Path, i.Body)
if err != nil {
return nil, err
}
for k, v := range i.Headers {
req.Header.Add(k, v)
}
if r.UserAgent != "" && req.Header.Get(userAgent) == "" {
req.Header.Add(userAgent, r.UserAgent)
}
return req, nil
}
// DoRequest performs a HTTP/HTTPS request with the supplied params
func (r *Requester) doRequest(req *http.Request, p *Item) error {
if p == nil {
return errors.New("request item cannot be nil")
}
if p.Verbose {
log.Debugf(log.RequestSys,
"%s request path: %s",
r.Name,
p.Path)
for k, d := range req.Header {
log.Debugf(log.RequestSys,
"%s request header [%s]: %s",
r.Name,
k,
d)
}
log.Debugf(log.RequestSys,
"%s request type: %s",
r.Name,
req.Method)
if p.Body != nil {
log.Debugf(log.RequestSys,
"%s request body: %v",
r.Name,
p.Body)
}
}
var timeoutError error
for i := 0; i < r.timeoutRetryAttempts+1; i++ {
// Initiate a rate limit reservation and sleep on requested endpoint
err := r.InitiateRateLimit(p.Endpoint)
if err != nil {
return err
}
resp, err := r.HTTPClient.Do(req)
if err != nil {
if timeoutErr, ok := err.(net.Error); ok && timeoutErr.Timeout() {
if p.Verbose {
log.Errorf(log.RequestSys,
"%s request has timed-out retrying request, count %d",
r.Name,
i)
}
timeoutError = err
continue
}
return err
}
contents, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if p.HTTPRecording {
// This dumps http responses for future mocking implementations
err = mock.HTTPRecord(resp, r.Name, contents)
if err != nil {
return fmt.Errorf("mock recording failure %s", err)
}
}
if resp.StatusCode < http.StatusOK ||
resp.StatusCode > http.StatusAccepted {
return fmt.Errorf("%s unsuccessful HTTP status code: %d raw response: %s",
r.Name,
resp.StatusCode,
string(contents))
}
if p.HTTPDebugging {
dump, err := httputil.DumpResponse(resp, false)
if err != nil {
log.Errorf(log.RequestSys, "DumpResponse invalid response: %v:", err)
}
log.Debugf(log.RequestSys, "DumpResponse Headers (%v):\n%s", p.Path, dump)
log.Debugf(log.RequestSys, "DumpResponse Body (%v):\n %s", p.Path, string(contents))
}
resp.Body.Close()
if p.Verbose {
log.Debugf(log.RequestSys,
"HTTP status: %s, Code: %v",
resp.Status,
resp.StatusCode)
if !p.HTTPDebugging {
log.Debugf(log.RequestSys,
"%s raw response: %s",
r.Name,
string(contents))
}
}
return json.Unmarshal(contents, p.Result)
}
return fmt.Errorf("request.go error - failed to retry request %s",
timeoutError)
}
// GetNonce returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel
func (r *Requester) GetNonce(isNano bool) nonce.Value {
r.timedLock.LockForDuration()
if r.Nonce.Get() == 0 {
if isNano {
r.Nonce.Set(time.Now().UnixNano())
} else {
r.Nonce.Set(time.Now().Unix())
}
return r.Nonce.Get()
}
r.Nonce.Inc()
return r.Nonce.Get()
}
// GetNonceMilli returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel this is for millisecond
func (r *Requester) GetNonceMilli() nonce.Value {
r.timedLock.LockForDuration()
if r.Nonce.Get() == 0 {
r.Nonce.Set(time.Now().UnixNano() / int64(time.Millisecond))
return r.Nonce.Get()
}
r.Nonce.Inc()
return r.Nonce.Get()
}
// SetProxy sets a proxy address to the client transport
func (r *Requester) SetProxy(p *url.URL) error {
if p.String() == "" {
return errors.New("no proxy URL supplied")
}
r.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyURL(p),
TLSHandshakeTimeout: proxyTLSTimeout,
}
return nil
}