Improve request package and adjust bitfinex rate limiter

Fixes: https://github.com/thrasher-/gocryptotrader/issues/147
This commit is contained in:
Adrian Gallagher
2018-07-17 16:24:41 +10:00
parent 950d66e394
commit a5f51328d4
2 changed files with 110 additions and 31 deletions

View File

@@ -35,8 +35,8 @@ func TestSetup(t *testing.T) {
}
b.AuthenticatedAPISupport = true
// custom rate limit for testing
b.Requester.SetRateLimit(true, time.Second*20, 1)
b.Requester.SetRateLimit(false, time.Second*20, 1)
b.Requester.SetRateLimit(true, time.Millisecond*300, 1)
b.Requester.SetRateLimit(false, time.Millisecond*300, 1)
}
func TestGetPlatformStatus(t *testing.T) {

View File

@@ -15,14 +15,20 @@ import (
var supportedMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS", "CONNECT"}
const (
maxRequestJobs = 50
)
// Requester struct for the request client
type Requester struct {
HTTPClient *http.Client
UnauthLimit *RateLimit
AuthLimit *RateLimit
Name string
Cycle time.Time
m sync.Mutex
HTTPClient *http.Client
UnauthLimit *RateLimit
AuthLimit *RateLimit
Name string
Cycle time.Time
m sync.Mutex
Jobs chan Job
WorkerStarted bool
}
// RateLimit struct
@@ -33,6 +39,25 @@ type RateLimit struct {
Mutex sync.Mutex
}
// JobResult holds a request job result
type JobResult struct {
Error error
Result interface{}
}
// Job holds a request job
type Job struct {
Request *http.Request
Method string
Path string
Headers map[string]string
Body io.Reader
Result interface{}
JobResult chan *JobResult
AuthRequest bool
Verbose bool
}
// NewRateLimit creates a new RateLimit
func NewRateLimit(d time.Duration, rate int) *RateLimit {
return &RateLimit{Duration: d, Rate: rate}
@@ -124,7 +149,7 @@ func (r *Requester) IncrementRequests(auth bool) {
return
}
reqs := r.AuthLimit.GetRequests()
reqs := r.UnauthLimit.GetRequests()
reqs++
r.UnauthLimit.SetRequests(reqs)
}
@@ -170,6 +195,7 @@ func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Cli
UnauthLimit: unauthLimit,
AuthLimit: authLimit,
Name: name,
Jobs: make(chan Job, maxRequestJobs),
}
}
@@ -189,6 +215,8 @@ func (r *Requester) IsValidCycle(auth bool) bool {
return true
}
}
r.StartCycle()
return false
}
@@ -208,7 +236,7 @@ func (r *Requester) checkRequest(method, path string, body io.Reader, headers ma
// DoRequest performs a HTTP/HTTPS request with the supplied params
func (r *Requester) DoRequest(req *http.Request, method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error {
if verbose {
log.Printf("%s exchange request path: %s", r.Name, path)
log.Printf("%s exchange request path: %s requires rate limiter: %v", r.Name, path, r.RequiresRateLimiter())
}
resp, err := r.HTTPClient.Do(req)
@@ -244,6 +272,46 @@ func (r *Requester) DoRequest(req *http.Request, method, path string, headers ma
return nil
}
func (r *Requester) worker() {
for {
for x := range r.Jobs {
if !r.IsRateLimited(x.AuthRequest) {
r.IncrementRequests(x.AuthRequest)
err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose)
x.JobResult <- &JobResult{
Error: err,
Result: x.Result,
}
} else {
limit := r.GetRateLimit(x.AuthRequest)
diff := limit.GetDuration() - time.Since(r.Cycle)
if x.Verbose {
log.Printf("%s request. Rate limited! Sleeping for %v", r.Name, diff)
}
time.Sleep(diff)
for {
if !r.IsRateLimited(x.AuthRequest) {
r.IncrementRequests(x.AuthRequest)
if x.Verbose {
log.Printf("%s request. No longer rate limited! Doing request", r.Name)
}
err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose)
x.JobResult <- &JobResult{
Error: err,
Result: x.Result,
}
break
}
}
}
}
}
}
// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error {
if r == nil || r.Name == "" {
@@ -267,33 +335,44 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string,
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
}
if len(r.Jobs) == maxRequestJobs {
return errors.New("max request jobs reached")
}
r.m.Lock()
if r.Cycle.IsZero() || !r.IsValidCycle(authRequest) {
if !r.WorkerStarted {
r.StartCycle()
r.WorkerStarted = true
go r.worker()
}
r.m.Unlock()
if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) {
r.IncrementRequests(authRequest)
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
jobResult := make(chan *JobResult)
newJob := Job{
Request: req,
Method: method,
Path: path,
Headers: headers,
Body: body,
Result: result,
JobResult: jobResult,
AuthRequest: authRequest,
Verbose: verbose,
}
r.m.Lock()
for r.IsRateLimited(authRequest) {
limit := r.GetRateLimit(authRequest)
diff := limit.GetDuration() - time.Since(r.Cycle)
log.Printf("%s IS RATE LIMITED. SLEEPING FOR %v", r.Name, diff)
time.Sleep(diff)
if !r.IsValidCycle(authRequest) {
r.StartCycle()
}
if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) {
r.IncrementRequests(authRequest)
r.m.Unlock()
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
}
if verbose {
log.Printf("%s request. Attaching new job.", r.Name)
}
return nil
r.Jobs <- newJob
if verbose {
log.Printf("%s request. Waiting for job to complete.", r.Name)
}
resp := <-newJob.JobResult
if verbose {
log.Printf("%s request. Job complete.", r.Name)
}
return resp.Error
}