Update request.go to fix concurrency nonce issues (#285)

* Updates nonce generation to adhere to fifo channel buffer before request executes by routine

* removed unused variables, lns etc

* Fix requested changes and added in timer that disengages lock if out of scope error occurs

* Fixed woopsy daisy issue

* Add benchmark, reduce time in force to unlock before stack insertion, add nil check for edge case

* Remove unusued waitgroup field

* use return nonce.Value and method, rm redundant nonce code, fix tests.

* Fix linter issue: unnecessary conversion
This commit is contained in:
Ryan O'Hara-Reid
2019-05-06 13:46:34 +10:00
committed by Adrian Gallagher
parent 1967507d40
commit 35b94268e0
46 changed files with 312 additions and 258 deletions

View File

@@ -13,6 +13,7 @@ import (
"time"
"github.com/thrasher-/gocryptotrader/common"
"github.com/thrasher-/gocryptotrader/exchanges/nonce"
log "github.com/thrasher-/gocryptotrader/logger"
)
@@ -36,7 +37,10 @@ type Requester struct {
timeoutRetryAttempts int
m sync.Mutex
Jobs chan Job
disengage chan struct{}
WorkerStarted bool
Nonce nonce.Nonce
fifoLock sync.Mutex
}
// RateLimit struct
@@ -214,6 +218,7 @@ func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Cli
AuthLimit: authLimit,
Name: name,
Jobs: make(chan Job, maxRequestJobs),
disengage: make(chan struct{}, 1),
timeoutRetryAttempts: defaultTimeoutRetryAttempts,
}
}
@@ -391,29 +396,39 @@ func (r *Requester) worker() {
}
// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error {
func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, nonceEnabled, verbose bool) error {
if !nonceEnabled {
r.lock()
}
if r == nil || r.Name == "" {
r.unlock()
return errors.New("not initiliased, SetDefaults() called before making request?")
}
if !IsValidMethod(method) {
r.unlock()
return fmt.Errorf("incorrect method supplied %s: supported %s", method, supportedMethods)
}
if path == "" {
r.unlock()
return errors.New("invalid path")
}
req, err := r.checkRequest(method, path, body, headers)
if err != nil {
r.unlock()
return err
}
if !r.RequiresRateLimiter() {
r.unlock()
return r.DoRequest(req, path, body, result, authRequest, verbose)
}
if len(r.Jobs) == maxRequestJobs {
r.unlock()
return errors.New("max request jobs reached")
}
@@ -443,6 +458,7 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string,
log.Debugf("%s request. Attaching new job.", r.Name)
}
r.Jobs <- newJob
r.unlock()
if verbose {
log.Debugf("%s request. Waiting for job to complete.", r.Name)
@@ -455,6 +471,34 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string,
return resp.Error
}
// GetNonce returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel
func (r *Requester) GetNonce(isNano bool) nonce.Value {
r.lock()
if r.Nonce.Get() == 0 {
if isNano {
r.Nonce.Set(time.Now().UnixNano())
} else {
r.Nonce.Set(time.Now().Unix())
}
return r.Nonce.Get()
}
r.Nonce.Inc()
return r.Nonce.Get()
}
// GetNonceMilli returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel this is for millisecond
func (r *Requester) GetNonceMilli() nonce.Value {
r.lock()
if r.Nonce.Get() == 0 {
r.Nonce.Set(time.Now().UnixNano() / int64(time.Millisecond))
return r.Nonce.Get()
}
r.Nonce.Inc()
return r.Nonce.Get()
}
// SetProxy sets a proxy address to the client transport
func (r *Requester) SetProxy(p *url.URL) error {
if p.String() == "" {
@@ -467,3 +511,33 @@ func (r *Requester) SetProxy(p *url.URL) error {
}
return nil
}
// lock locks and sets up an issue timer, if something errors out of scope it
// automatically unlocks
func (r *Requester) lock() {
if r.disengage == nil {
r.disengage = make(chan struct{}, 1)
}
var wg sync.WaitGroup
r.fifoLock.Lock()
wg.Add(1)
go func() {
timer := time.NewTimer(50 * time.Millisecond)
wg.Done()
select {
case <-timer.C:
log.Errorf("Unlocking due to possible error for %s", r.Name)
r.fifoLock.Unlock()
case <-r.disengage:
return
}
}()
wg.Wait()
}
// unlock unlocks mtx and shuts down a timer
func (r *Requester) unlock() {
r.disengage <- struct{}{}
r.fifoLock.Unlock()
}

View File

@@ -199,8 +199,8 @@ func TestCheckRequest(t *testing.T) {
}
func TestDoRequest(t *testing.T) {
var test *Requester
err := test.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, true)
var test = new(Requester)
err := test.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, false, true)
if err == nil {
t.Fatal("not iniitalised")
}
@@ -211,17 +211,17 @@ func TestDoRequest(t *testing.T) {
}
r.Name = "bitfinex"
err = r.SendPayload("BLAH", "https://www.google.com", nil, nil, nil, false, true)
err = r.SendPayload("BLAH", "https://www.google.com", nil, nil, nil, false, false, true)
if err == nil {
t.Fatal("unexpected values")
}
err = r.SendPayload(http.MethodGet, "", nil, nil, nil, false, true)
err = r.SendPayload(http.MethodGet, "", nil, nil, nil, false, false, true)
if err == nil {
t.Fatal("unexpected values")
}
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, true)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, false, true)
if err != nil {
t.Fatal("unexpected values")
}
@@ -233,7 +233,7 @@ func TestDoRequest(t *testing.T) {
r.SetRateLimit(false, time.Second, 0)
r.SetRateLimit(true, time.Second, 0)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, true)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, false, true)
if err != nil {
t.Fatal("unexpected values")
}
@@ -250,7 +250,7 @@ func TestDoRequest(t *testing.T) {
t.Fatal("unexepcted values")
}
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, true)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, false, true)
if err != nil {
t.Fatal("unexpected values")
}
@@ -261,27 +261,27 @@ func TestDoRequest(t *testing.T) {
t.Fatal("unexepcted values")
}
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, true, true)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, true, false, true)
if err != nil {
t.Fatal("unexpected values")
}
var result interface{}
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, result, false, true)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, result, false, false, true)
if err != nil {
t.Fatal(err)
}
headers := make(map[string]string)
headers["content-type"] = "content/text"
err = r.SendPayload(http.MethodPost, "https://bitfinex.com", headers, nil, result, false, true)
err = r.SendPayload(http.MethodPost, "https://bitfinex.com", headers, nil, result, false, false, true)
if err != nil {
t.Fatal(err)
}
r.StartCycle()
r.UnauthLimit.SetRequests(100)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, result, false, false)
err = r.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, result, false, false, false)
if err != nil {
t.Fatal("unexpected values")
}
@@ -297,7 +297,7 @@ func TestDoRequest(t *testing.T) {
}
r.HTTPClient.Timeout = 1 * time.Second
err = r.SendPayload(http.MethodPost, "https://httpstat.us/200?sleep=20000", nil, nil, nil, false, true)
err = r.SendPayload(http.MethodPost, "https://httpstat.us/200?sleep=20000", nil, nil, nil, false, false, true)
if err == nil {
t.Fatal(err)
}
@@ -322,3 +322,11 @@ func TestDoRequest(t *testing.T) {
t.Error("failed to set proxy")
}
}
func BenchmarkRequestLockMech(b *testing.B) {
var r = new(Requester)
var meep interface{}
for n := 0; n < b.N; n++ {
r.SendPayload(http.MethodGet, "127.0.0.1", nil, nil, &meep, false, false, false)
}
}