From 242b02c382b1742b6c052a6de0b479484bb88d03 Mon Sep 17 00:00:00 2001 From: Scott Date: Tue, 29 Oct 2019 14:00:45 +1100 Subject: [PATCH] (Engine) Bugfix: Unlocking an unlocked mutex PANIC + Increase dispatcher job capacity via commandline (#371) * Removes lock unlock timer and instead sets unlocks between getting a nonce and sending a payload. Increases dispatch channel buffer to deal with len(enabledCurrencies) > ~100 * Adds additional comments to help explain the situation * Fixes bug that could unlock mutex too early * Fixes LIES where Gemini gets a nonce and then proceeds to declare it doesn't get a nonce causing an unrecoverable lock * Fun new concept! The creation of a tested timed mutex. Unlocking an unlocked mutex cannot occur and response can be checked to verify whether the mutex was unlocked from timeout or command. * Adds new cmd parameter "dispatchjobbuffer" * Expands comments and renames benchmark. Makes `Timer` property private * Happy little linters * Renames jobBuffer and all related instances to jobs limit * Tiny error message update * Grammatical fix and setting dispatch.Start to use defaults --- common/timedmutex/timed_mutex.go | 77 ++++++++++++++++ common/timedmutex/timed_mutex_test.go | 86 ++++++++++++++++++ common/timedmutex/timed_mutex_types.go | 15 ++++ dispatch/dispatch.go | 19 ++-- dispatch/dispatch_test.go | 19 ++-- dispatch/dispatch_types.go | 5 +- engine/engine.go | 4 +- engine/engine_types.go | 1 + engine/helpers.go | 2 +- exchanges/gemini/gemini.go | 2 +- exchanges/orderbook/orderbook_test.go | 2 +- exchanges/request/request.go | 119 +++---------------------- exchanges/request/request_test.go | 11 +-- exchanges/request/request_types.go | 75 ++++++++++++++++ exchanges/ticker/ticker_test.go | 2 +- main.go | 1 + 16 files changed, 301 insertions(+), 139 deletions(-) create mode 100644 common/timedmutex/timed_mutex.go create mode 100644 common/timedmutex/timed_mutex_test.go create mode 100644 common/timedmutex/timed_mutex_types.go create mode 100644 exchanges/request/request_types.go diff --git a/common/timedmutex/timed_mutex.go b/common/timedmutex/timed_mutex.go new file mode 100644 index 00000000..6f70d762 --- /dev/null +++ b/common/timedmutex/timed_mutex.go @@ -0,0 +1,77 @@ +package timedmutex + +import ( + "sync" + "time" +) + +// NewTimedMutex creates a new timed mutex with a +// specified duration +func NewTimedMutex(length time.Duration) *TimedMutex { + return &TimedMutex{ + duration: length, + } +} + +// LockForDuration will start a timer, lock the mutex, +// then allow the caller to continue +// After the duration, the mutex will be unlocked +func (t *TimedMutex) LockForDuration() { + var wg sync.WaitGroup + wg.Add(1) + go t.lockAndSetTimer(&wg) + wg.Wait() +} + +func (t *TimedMutex) lockAndSetTimer(wg *sync.WaitGroup) { + t.mtx.Lock() + t.setTimer() + wg.Done() +} + +// UnlockIfLocked will unlock the mutex if its currently locked +// Will return true if successfully unlocked +func (t *TimedMutex) UnlockIfLocked() bool { + if t.isTimerNil() { + return false + } + + if !t.stopTimer() { + return false + } + t.mtx.Unlock() + return true +} + +// stopTimer will return true if timer has been stopped by this command +// If the timer has expired, clear the channel +func (t *TimedMutex) stopTimer() bool { + t.timerLock.Lock() + defer t.timerLock.Unlock() + if !t.timer.Stop() { + select { + case <-t.timer.C: + default: + } + return false + } + return true +} + +// isTimerNil safely read locks to detect nil +func (t *TimedMutex) isTimerNil() bool { + t.timerLock.RLock() + defer t.timerLock.RUnlock() + return t.timer == nil +} + +// setTimer safely locks and sets a timer +// which will automatically execute a mutex unlock +// once timer expires +func (t *TimedMutex) setTimer() { + t.timerLock.Lock() + t.timer = time.AfterFunc(t.duration, func() { + t.mtx.Unlock() + }) + t.timerLock.Unlock() +} diff --git a/common/timedmutex/timed_mutex_test.go b/common/timedmutex/timed_mutex_test.go new file mode 100644 index 00000000..bf9c3941 --- /dev/null +++ b/common/timedmutex/timed_mutex_test.go @@ -0,0 +1,86 @@ +package timedmutex + +import ( + "testing" + "time" +) + +func BenchmarkTimedMutexTime(b *testing.B) { + tm := NewTimedMutex(20 * time.Millisecond) + for i := 0; i < b.N; i++ { + tm.LockForDuration() + } +} + +func TestConsistencyOfPanicFreeUnlock(t *testing.T) { + t.Parallel() + duration := 20 * time.Millisecond + tm := NewTimedMutex(duration) + for i := 1; i <= 50; i++ { + testUnlockTime := time.Duration(i) * time.Millisecond + tm.LockForDuration() + time.Sleep(testUnlockTime) + tm.UnlockIfLocked() + } +} + +func TestUnlockAfterTimeout(t *testing.T) { + t.Parallel() + tm := NewTimedMutex(time.Second) + tm.LockForDuration() + time.Sleep(2 * time.Second) + wasUnlocked := tm.UnlockIfLocked() + if wasUnlocked { + t.Error("Mutex should have been unlocked by timeout, not command") + } +} + +func TestUnlockBeforeTimeout(t *testing.T) { + t.Parallel() + tm := NewTimedMutex(2 * time.Second) + tm.LockForDuration() + time.Sleep(time.Second) + wasUnlocked := tm.UnlockIfLocked() + if !wasUnlocked { + t.Error("Mutex should have been unlocked by command, not timeout") + } +} + +// TestUnlockAtSameTimeAsTimeout this test ensures +// that even if the timeout and the command occur at +// the same time, no panics occur. The result of the +// 'who' unlocking this doesn't matter, so long as +// the unlock occurs without this test panicking +func TestUnlockAtSameTimeAsTimeout(t *testing.T) { + t.Parallel() + duration := time.Second + tm := NewTimedMutex(duration) + tm.LockForDuration() + time.Sleep(duration) + tm.UnlockIfLocked() +} + +func TestMultipleUnlocks(t *testing.T) { + t.Parallel() + tm := NewTimedMutex(10 * time.Second) + tm.LockForDuration() + wasUnlocked := tm.UnlockIfLocked() + if !wasUnlocked { + t.Error("Mutex should have been unlocked by command, not timeout") + } + wasUnlocked = tm.UnlockIfLocked() + if wasUnlocked { + t.Error("Mutex should have been already unlocked by command") + } + wasUnlocked = tm.UnlockIfLocked() + if wasUnlocked { + t.Error("Mutex should have been already unlocked by command") + } +} + +func TestJustWaitItOut(t *testing.T) { + t.Parallel() + tm := NewTimedMutex(1 * time.Second) + tm.LockForDuration() + time.Sleep(2 * time.Second) +} diff --git a/common/timedmutex/timed_mutex_types.go b/common/timedmutex/timed_mutex_types.go new file mode 100644 index 00000000..841ebcdc --- /dev/null +++ b/common/timedmutex/timed_mutex_types.go @@ -0,0 +1,15 @@ +package timedmutex + +import ( + "sync" + "time" +) + +// TimedMutex is a blocking mutex which will unlock +// after a specified time +type TimedMutex struct { + mtx sync.Mutex + timerLock sync.RWMutex + timer *time.Timer + duration time.Duration +} diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index b7e5f0f4..63ee5cfe 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -14,7 +14,6 @@ import ( func init() { dispatcher = &Dispatcher{ routes: make(map[uuid.UUID][]chan interface{}), - jobs: make(chan *job, DefaultJobBuffer), outbound: sync.Pool{ New: func() interface{} { // Create unbuffered channel for data pass @@ -25,14 +24,14 @@ func init() { } // Start starts the dispatch system by spawning workers and allocating memory -func Start(workers int) error { +func Start(workers, jobsLimit int) error { if dispatcher == nil { return errors.New(errNotInitialised) } mtx.Lock() defer mtx.Unlock() - return dispatcher.start(workers) + return dispatcher.start(workers, jobsLimit) } // Stop attempts to stop the dispatch service, this will close all pipe channels @@ -78,17 +77,22 @@ func SpawnWorker() error { // start compares atomic running value, sets defaults, overides with // configuration, then spawns workers -func (d *Dispatcher) start(workers int) error { +func (d *Dispatcher) start(workers, channelCapacity int) error { if atomic.LoadUint32(&d.running) == 1 { return errors.New(errAlreadyStarted) } if workers < 1 { log.Warn(log.DispatchMgr, - "Dispatcher: workers cannot be zero using default values") + "Dispatcher: workers cannot be zero, using default values") workers = DefaultMaxWorkers } - + if channelCapacity < 1 { + log.Warn(log.DispatchMgr, + "Dispatcher: jobs limit cannot be zero, using default values") + channelCapacity = DefaultJobsLimit + } + d.jobs = make(chan *job, channelCapacity) d.maxWorkers = int32(workers) d.shutdown = make(chan *sync.WaitGroup) @@ -253,7 +257,8 @@ func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error { select { case d.jobs <- newJob: default: - return fmt.Errorf("dispatcher buffer at max capacity [%d] current worker count [%d], spawn more workers via --dispatchworkers=x", + return fmt.Errorf("dispatcher jobs at limit [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x"+ + ", or increase the jobs limit via --dispatchjobslimit=x", len(d.jobs), atomic.LoadInt32(&d.count)) } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index cdbf1aaa..ee39af81 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -12,7 +12,7 @@ import ( var mux *Mux func TestMain(m *testing.M) { - err := Start(DefaultMaxWorkers) + err := Start(DefaultMaxWorkers, 0) if err != nil { fmt.Println(err) os.Exit(1) @@ -34,11 +34,10 @@ func TestDispatcher(t *testing.T) { t.Error("error cannot be nil") } - err = Start(10) + err = Start(10, 0) if err == nil { t.Error("error cannot be nil") } - if IsRunning() { t.Error("should be false") } @@ -59,7 +58,7 @@ func TestDispatcher(t *testing.T) { t.Error("should be true") } - err = Start(10) + err = Start(10, 0) if err == nil { t.Error("error cannot be nil") } @@ -99,11 +98,13 @@ func TestDispatcher(t *testing.T) { t.Error("error cannot be nil") } - err = Start(0) + err = Start(0, 20) if err != nil { t.Error(err) } - + if cap(dispatcher.jobs) != 20 { + t.Errorf("Expected jobs limit to be %v, is %v", 20, cap(dispatcher.jobs)) + } payload := "something" err = dispatcher.publish(uuid.UUID{}, &payload) @@ -141,11 +142,13 @@ func TestDispatcher(t *testing.T) { t.Error("error cannot be nil") } - err = dispatcher.start(10) + err = dispatcher.start(10, -1) if err != nil { t.Error(err) } - + if cap(dispatcher.jobs) != DefaultJobsLimit { + t.Errorf("Expected jobs limit to be %v, is %v", DefaultJobsLimit, cap(dispatcher.jobs)) + } someID, err := uuid.NewV4() if err != nil { t.Error(err) diff --git a/dispatch/dispatch_types.go b/dispatch/dispatch_types.go index 05ce744f..1b9d7d99 100644 --- a/dispatch/dispatch_types.go +++ b/dispatch/dispatch_types.go @@ -8,8 +8,8 @@ import ( ) const ( - // DefaultJobBuffer defines a maxiumum amount of jobs allowed in channel - DefaultJobBuffer = 100 + // DefaultJobsLimit defines a maxiumum amount of jobs allowed in channel + DefaultJobsLimit = 100 // DefaultMaxWorkers is the package default worker ceiling amount DefaultMaxWorkers = 10 @@ -47,7 +47,6 @@ type Dispatcher struct { // MaxWorkers defines max worker ceiling maxWorkers int32 - // Atomic values ----------------------- // Worker counter count int32 diff --git a/engine/engine.go b/engine/engine.go index c97153db..f69514d2 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -207,6 +207,7 @@ func ValidateSettings(b *Engine, s *Settings) { b.Settings.GlobalHTTPProxy = s.GlobalHTTPProxy b.Settings.DispatchMaxWorkerAmount = s.DispatchMaxWorkerAmount + b.Settings.DispatchJobsLimit = s.DispatchJobsLimit } // PrintSettings returns the engine settings @@ -237,6 +238,7 @@ func PrintSettings(s *Settings) { log.Debugf(log.Global, "\t Enable Database manager: %v", s.EnableDatabaseManager) log.Debugf(log.Global, "\t Enable dispatcher: %v", s.EnableDispatcher) log.Debugf(log.Global, "\t Dispatch package max worker amount: %d", s.DispatchMaxWorkerAmount) + log.Debugf(log.Global, "\t Dispatch package jobs limit: %d", s.DispatchJobsLimit) log.Debugf(log.Global, "- FOREX SETTINGS:") log.Debugf(log.Global, "\t Enable currency conveter: %v", s.EnableCurrencyConverter) log.Debugf(log.Global, "\t Enable currency layer: %v", s.EnableCurrencyLayer) @@ -274,7 +276,7 @@ func (e *Engine) Start() error { } if e.Settings.EnableDispatcher { - if err := dispatch.Start(e.Settings.DispatchMaxWorkerAmount); err != nil { + if err := dispatch.Start(e.Settings.DispatchMaxWorkerAmount, e.Settings.DispatchJobsLimit); err != nil { log.Errorf(log.DispatchMgr, "Dispatcher unable to start: %v", err) } } diff --git a/engine/engine_types.go b/engine/engine_types.go index 11d3a3ec..da1ba5ad 100644 --- a/engine/engine_types.go +++ b/engine/engine_types.go @@ -65,4 +65,5 @@ type Settings struct { // Dispatch system settings EnableDispatcher bool DispatchMaxWorkerAmount int + DispatchJobsLimit int } diff --git a/engine/helpers.go b/engine/helpers.go index 4ef42aa5..d628b2a4 100644 --- a/engine/helpers.go +++ b/engine/helpers.go @@ -116,7 +116,7 @@ func SetSubsystem(subsys string, enable bool) error { Bot.ExchangeCurrencyPairManager.Stop() case "dispatch": if enable { - return dispatch.Start(Bot.Settings.DispatchMaxWorkerAmount) + return dispatch.Start(Bot.Settings.DispatchMaxWorkerAmount, Bot.Settings.DispatchJobsLimit) } return dispatch.Stop() } diff --git a/exchanges/gemini/gemini.go b/exchanges/gemini/gemini.go index 0e11a441..794677e4 100644 --- a/exchanges/gemini/gemini.go +++ b/exchanges/gemini/gemini.go @@ -397,7 +397,7 @@ func (g *Gemini) SendAuthenticatedHTTPRequest(method, path string, params map[st nil, result, true, - false, + true, g.Verbose, g.HTTPDebugging, g.HTTPRecording) diff --git a/exchanges/orderbook/orderbook_test.go b/exchanges/orderbook/orderbook_test.go index 9389ad08..9c750478 100644 --- a/exchanges/orderbook/orderbook_test.go +++ b/exchanges/orderbook/orderbook_test.go @@ -15,7 +15,7 @@ import ( ) func TestMain(m *testing.M) { - err := dispatch.Start(1) + err := dispatch.Start(1, dispatch.DefaultJobsLimit) if err != nil { log.Fatal(err) } diff --git a/exchanges/request/request.go b/exchanges/request/request.go index 51cc651b..66d0cae9 100644 --- a/exchanges/request/request.go +++ b/exchanges/request/request.go @@ -11,80 +11,15 @@ import ( "net/http/httputil" "net/url" "strings" - "sync" "time" "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/common/timedmutex" "github.com/thrasher-corp/gocryptotrader/exchanges/mock" "github.com/thrasher-corp/gocryptotrader/exchanges/nonce" log "github.com/thrasher-corp/gocryptotrader/logger" ) -var supportedMethods = []string{http.MethodGet, http.MethodPost, http.MethodHead, - http.MethodPut, http.MethodDelete, http.MethodOptions, http.MethodConnect} - -// Const vars for rate limiter -const ( - DefaultMaxRequestJobs = 50 - DefaultTimeoutRetryAttempts = 3 - - proxyTLSTimeout = 15 * time.Second -) - -// Vars for rate limiter -var ( - MaxRequestJobs = DefaultMaxRequestJobs - TimeoutRetryAttempts = DefaultTimeoutRetryAttempts - DisableRateLimiter bool -) - -// Requester struct for the request client -type Requester struct { - HTTPClient *http.Client - UnauthLimit *RateLimit - AuthLimit *RateLimit - Name string - UserAgent string - Cycle time.Time - timeoutRetryAttempts int - m sync.Mutex - Jobs chan Job - disengage chan struct{} - WorkerStarted bool - Nonce nonce.Nonce - fifoLock sync.Mutex - DisableRateLimiter bool -} - -// RateLimit struct -type RateLimit struct { - Duration time.Duration - Rate int - Requests int - Mutex sync.Mutex -} - -// JobResult holds a request job result -type JobResult struct { - Error error - Result interface{} -} - -// Job holds a request job -type Job struct { - Request *http.Request - Method string - Path string - Headers map[string]string - Body io.Reader - Result interface{} - JobResult chan *JobResult - AuthRequest bool - Verbose bool - HTTPDebugging bool - Record bool -} - // NewRateLimit creates a new RateLimit func NewRateLimit(d time.Duration, rate int) *RateLimit { return &RateLimit{Duration: d, Rate: rate} @@ -237,8 +172,8 @@ func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Cli AuthLimit: authLimit, Name: name, Jobs: make(chan Job, MaxRequestJobs), - disengage: make(chan struct{}, 1), timeoutRetryAttempts: TimeoutRetryAttempts, + timedLock: timedmutex.NewTimedMutex(DefaultMutexLockTimeout), } } @@ -443,27 +378,27 @@ func (r *Requester) worker() { // SendPayload handles sending HTTP/HTTPS requests func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, nonceEnabled, verbose, httpDebugging, record bool) error { if !nonceEnabled { - r.lock() + r.timedLock.LockForDuration() } if r == nil || r.Name == "" { - r.unlock() + r.timedLock.UnlockIfLocked() return errors.New("not initiliased, SetDefaults() called before making request?") } if !IsValidMethod(method) { - r.unlock() + r.timedLock.UnlockIfLocked() return fmt.Errorf("incorrect method supplied %s: supported %s", method, supportedMethods) } if path == "" { - r.unlock() + r.timedLock.UnlockIfLocked() return errors.New("invalid path") } req, err := r.checkRequest(method, path, body, headers) if err != nil { - r.unlock() + r.timedLock.UnlockIfLocked() return err } @@ -478,12 +413,12 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string, } if !r.RequiresRateLimiter() { - r.unlock() + r.timedLock.UnlockIfLocked() return r.DoRequest(req, path, body, result, authRequest, verbose, httpDebugging, record) } if len(r.Jobs) == MaxRequestJobs { - r.unlock() + r.timedLock.UnlockIfLocked() return errors.New("max request jobs reached") } @@ -515,7 +450,7 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string, log.Debugf(log.ExchangeSys, "%s request. Attaching new job.", r.Name) } r.Jobs <- newJob - r.unlock() + r.timedLock.UnlockIfLocked() if verbose { log.Debugf(log.ExchangeSys, "%s request. Waiting for job to complete.", r.Name) @@ -532,7 +467,7 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string, // GetNonce returns a nonce for requests. This locks and enforces concurrent // nonce FIFO on the buffered job channel func (r *Requester) GetNonce(isNano bool) nonce.Value { - r.lock() + r.timedLock.LockForDuration() if r.Nonce.Get() == 0 { if isNano { r.Nonce.Set(time.Now().UnixNano()) @@ -548,7 +483,7 @@ func (r *Requester) GetNonce(isNano bool) nonce.Value { // GetNonceMilli returns a nonce for requests. This locks and enforces concurrent // nonce FIFO on the buffered job channel this is for millisecond func (r *Requester) GetNonceMilli() nonce.Value { - r.lock() + r.timedLock.LockForDuration() if r.Nonce.Get() == 0 { r.Nonce.Set(time.Now().UnixNano() / int64(time.Millisecond)) return r.Nonce.Get() @@ -569,33 +504,3 @@ func (r *Requester) SetProxy(p *url.URL) error { } return nil } - -// lock locks and sets up an issue timer, if something errors out of scope it -// automatically unlocks -func (r *Requester) lock() { - if r.disengage == nil { - r.disengage = make(chan struct{}, 1) - } - var wg sync.WaitGroup - r.fifoLock.Lock() - wg.Add(1) - go func() { - timer := time.NewTimer(50 * time.Millisecond) - wg.Done() - select { - case <-timer.C: - log.Errorf(log.ExchangeSys, "Unlocking due to possible error for %s", r.Name) - r.fifoLock.Unlock() - - case <-r.disengage: - return - } - }() - wg.Wait() -} - -// unlock unlocks mtx and shuts down a timer -func (r *Requester) unlock() { - r.disengage <- struct{}{} - r.fifoLock.Unlock() -} diff --git a/exchanges/request/request_test.go b/exchanges/request/request_test.go index 9acf9a58..37b94077 100644 --- a/exchanges/request/request_test.go +++ b/exchanges/request/request_test.go @@ -199,16 +199,9 @@ func TestCheckRequest(t *testing.T) { } func TestDoRequest(t *testing.T) { - var test = new(Requester) - err := test.SendPayload(http.MethodGet, "https://www.google.com", nil, nil, nil, false, false, true, false, false) - if err == nil { - t.Fatal("Expected error") - } - r := New("", NewRateLimit(time.Second*10, 5), NewRateLimit(time.Second*20, 100), new(http.Client)) - r.Name = "bitfinex" - err = r.SendPayload("BLAH", "https://www.google.com", nil, nil, nil, false, false, true, false, false) + err := r.SendPayload("BLAH", "https://www.google.com", nil, nil, nil, false, false, true, false, false) if err == nil { t.Fatal("Expected error") } @@ -321,7 +314,7 @@ func TestDoRequest(t *testing.T) { } func BenchmarkRequestLockMech(b *testing.B) { - var r = new(Requester) + r := New("", NewRateLimit(time.Second*10, 5), NewRateLimit(time.Second*20, 100), new(http.Client)) var meep interface{} for n := 0; n < b.N; n++ { r.SendPayload(http.MethodGet, "127.0.0.1", nil, nil, &meep, false, false, false, false, false) diff --git a/exchanges/request/request_types.go b/exchanges/request/request_types.go new file mode 100644 index 00000000..f35fe3cb --- /dev/null +++ b/exchanges/request/request_types.go @@ -0,0 +1,75 @@ +package request + +import ( + "io" + "net/http" + "sync" + "time" + + "github.com/thrasher-corp/gocryptotrader/common/timedmutex" + "github.com/thrasher-corp/gocryptotrader/exchanges/nonce" +) + +var supportedMethods = []string{http.MethodGet, http.MethodPost, http.MethodHead, + http.MethodPut, http.MethodDelete, http.MethodOptions, http.MethodConnect} + +// Const vars for rate limiter +const ( + DefaultMaxRequestJobs = 50 + DefaultTimeoutRetryAttempts = 3 + DefaultMutexLockTimeout = 50 * time.Millisecond + proxyTLSTimeout = 15 * time.Second +) + +// Vars for rate limiter +var ( + MaxRequestJobs = DefaultMaxRequestJobs + TimeoutRetryAttempts = DefaultTimeoutRetryAttempts + DisableRateLimiter bool +) + +// Requester struct for the request client +type Requester struct { + HTTPClient *http.Client + UnauthLimit *RateLimit + AuthLimit *RateLimit + Name string + UserAgent string + Cycle time.Time + timeoutRetryAttempts int + m sync.Mutex + Jobs chan Job + WorkerStarted bool + Nonce nonce.Nonce + DisableRateLimiter bool + timedLock *timedmutex.TimedMutex +} + +// RateLimit struct +type RateLimit struct { + Duration time.Duration + Rate int + Requests int + Mutex sync.Mutex +} + +// JobResult holds a request job result +type JobResult struct { + Error error + Result interface{} +} + +// Job holds a request job +type Job struct { + Request *http.Request + Method string + Path string + Headers map[string]string + Body io.Reader + Result interface{} + JobResult chan *JobResult + AuthRequest bool + Verbose bool + HTTPDebugging bool + Record bool +} diff --git a/exchanges/ticker/ticker_test.go b/exchanges/ticker/ticker_test.go index 9cc686d9..328e13f1 100644 --- a/exchanges/ticker/ticker_test.go +++ b/exchanges/ticker/ticker_test.go @@ -15,7 +15,7 @@ import ( ) func TestMain(m *testing.M) { - err := dispatch.Start(1) + err := dispatch.Start(1, dispatch.DefaultJobsLimit) if err != nil { log.Fatal(err) } diff --git a/main.go b/main.go index 4b9b41fa..10816b3e 100644 --- a/main.go +++ b/main.go @@ -49,6 +49,7 @@ func main() { flag.BoolVar(&settings.EnableNTPClient, "ntpclient", true, "enables the NTP client to check system clock drift") flag.BoolVar(&settings.EnableDispatcher, "dispatch", true, "enables the dispatch system") flag.IntVar(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit") + flag.IntVar(&settings.DispatchJobsLimit, "dispatchjobslimit", dispatch.DefaultJobsLimit, "sets the dispatch package max jobs limit") // Forex provider settings flag.BoolVar(&settings.EnableCurrencyConverter, "currencyconverter", false, "overrides config and sets up foreign exchange Currency Converter")