diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 36f7b34a..33720816 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -35,8 +35,8 @@ func TestSetup(t *testing.T) { } b.AuthenticatedAPISupport = true // custom rate limit for testing - b.Requester.SetRateLimit(true, time.Second*20, 1) - b.Requester.SetRateLimit(false, time.Second*20, 1) + b.Requester.SetRateLimit(true, time.Millisecond*300, 1) + b.Requester.SetRateLimit(false, time.Millisecond*300, 1) } func TestGetPlatformStatus(t *testing.T) { diff --git a/exchanges/request/request.go b/exchanges/request/request.go index 52e41e50..d26db363 100644 --- a/exchanges/request/request.go +++ b/exchanges/request/request.go @@ -15,14 +15,20 @@ import ( var supportedMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS", "CONNECT"} +const ( + maxRequestJobs = 50 +) + // Requester struct for the request client type Requester struct { - HTTPClient *http.Client - UnauthLimit *RateLimit - AuthLimit *RateLimit - Name string - Cycle time.Time - m sync.Mutex + HTTPClient *http.Client + UnauthLimit *RateLimit + AuthLimit *RateLimit + Name string + Cycle time.Time + m sync.Mutex + Jobs chan Job + WorkerStarted bool } // RateLimit struct @@ -33,6 +39,25 @@ type RateLimit struct { Mutex sync.Mutex } +// JobResult holds a request job result +type JobResult struct { + Error error + Result interface{} +} + +// Job holds a request job +type Job struct { + Request *http.Request + Method string + Path string + Headers map[string]string + Body io.Reader + Result interface{} + JobResult chan *JobResult + AuthRequest bool + Verbose bool +} + // NewRateLimit creates a new RateLimit func NewRateLimit(d time.Duration, rate int) *RateLimit { return &RateLimit{Duration: d, Rate: rate} @@ -124,7 +149,7 @@ func (r *Requester) IncrementRequests(auth bool) { return } - reqs := r.AuthLimit.GetRequests() + reqs := r.UnauthLimit.GetRequests() reqs++ r.UnauthLimit.SetRequests(reqs) } @@ -170,6 +195,7 @@ func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Cli UnauthLimit: unauthLimit, AuthLimit: authLimit, Name: name, + Jobs: make(chan Job, maxRequestJobs), } } @@ -189,6 +215,8 @@ func (r *Requester) IsValidCycle(auth bool) bool { return true } } + + r.StartCycle() return false } @@ -208,7 +236,7 @@ func (r *Requester) checkRequest(method, path string, body io.Reader, headers ma // DoRequest performs a HTTP/HTTPS request with the supplied params func (r *Requester) DoRequest(req *http.Request, method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error { if verbose { - log.Printf("%s exchange request path: %s", r.Name, path) + log.Printf("%s exchange request path: %s requires rate limiter: %v", r.Name, path, r.RequiresRateLimiter()) } resp, err := r.HTTPClient.Do(req) @@ -244,6 +272,46 @@ func (r *Requester) DoRequest(req *http.Request, method, path string, headers ma return nil } +func (r *Requester) worker() { + for { + for x := range r.Jobs { + if !r.IsRateLimited(x.AuthRequest) { + r.IncrementRequests(x.AuthRequest) + + err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose) + x.JobResult <- &JobResult{ + Error: err, + Result: x.Result, + } + } else { + limit := r.GetRateLimit(x.AuthRequest) + diff := limit.GetDuration() - time.Since(r.Cycle) + if x.Verbose { + log.Printf("%s request. Rate limited! Sleeping for %v", r.Name, diff) + } + time.Sleep(diff) + + for { + if !r.IsRateLimited(x.AuthRequest) { + r.IncrementRequests(x.AuthRequest) + + if x.Verbose { + log.Printf("%s request. No longer rate limited! Doing request", r.Name) + } + + err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose) + x.JobResult <- &JobResult{ + Error: err, + Result: x.Result, + } + break + } + } + } + } + } +} + // SendPayload handles sending HTTP/HTTPS requests func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error { if r == nil || r.Name == "" { @@ -267,33 +335,44 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string, return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose) } + if len(r.Jobs) == maxRequestJobs { + return errors.New("max request jobs reached") + } + r.m.Lock() - if r.Cycle.IsZero() || !r.IsValidCycle(authRequest) { + if !r.WorkerStarted { r.StartCycle() + r.WorkerStarted = true + go r.worker() } r.m.Unlock() - if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) { - r.IncrementRequests(authRequest) - return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose) + jobResult := make(chan *JobResult) + + newJob := Job{ + Request: req, + Method: method, + Path: path, + Headers: headers, + Body: body, + Result: result, + JobResult: jobResult, + AuthRequest: authRequest, + Verbose: verbose, } - r.m.Lock() - for r.IsRateLimited(authRequest) { - limit := r.GetRateLimit(authRequest) - diff := limit.GetDuration() - time.Since(r.Cycle) - log.Printf("%s IS RATE LIMITED. SLEEPING FOR %v", r.Name, diff) - time.Sleep(diff) - - if !r.IsValidCycle(authRequest) { - r.StartCycle() - } - - if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) { - r.IncrementRequests(authRequest) - r.m.Unlock() - return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose) - } + if verbose { + log.Printf("%s request. Attaching new job.", r.Name) } - return nil + r.Jobs <- newJob + + if verbose { + log.Printf("%s request. Waiting for job to complete.", r.Name) + } + resp := <-newJob.JobResult + + if verbose { + log.Printf("%s request. Job complete.", r.Name) + } + return resp.Error }