From 596be31b6a327dc2979de9a9b885cfa70b438121 Mon Sep 17 00:00:00 2001 From: Adrian Gallagher Date: Wed, 23 Oct 2019 11:30:40 +1100 Subject: [PATCH] dispatcher: Use int32 for atomic operations to prevent crash on ARM 32bit systems (#370) See https://github.com/golang/go/issues/599 --- dispatch/dispatch.go | 18 +++++++++--------- dispatch/dispatch_types.go | 4 ++-- engine/engine_types.go | 2 +- main.go | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 8fb8b096..b7e5f0f4 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -25,7 +25,7 @@ func init() { } // Start starts the dispatch system by spawning workers and allocating memory -func Start(workers int64) error { +func Start(workers int) error { if dispatcher == nil { return errors.New(errNotInitialised) } @@ -78,7 +78,7 @@ func SpawnWorker() error { // start compares atomic running value, sets defaults, overides with // configuration, then spawns workers -func (d *Dispatcher) start(workers int64) error { +func (d *Dispatcher) start(workers int) error { if atomic.LoadUint32(&d.running) == 1 { return errors.New(errAlreadyStarted) } @@ -89,14 +89,14 @@ func (d *Dispatcher) start(workers int64) error { workers = DefaultMaxWorkers } - d.maxWorkers = workers + d.maxWorkers = int32(workers) d.shutdown = make(chan *sync.WaitGroup) - if atomic.LoadInt64(&d.count) != 0 { + if atomic.LoadInt32(&d.count) != 0 { return errors.New("dispatcher leaked workers found") } - for i := int64(0); i < d.maxWorkers; i++ { + for i := int32(0); i < d.maxWorkers; i++ { err := d.spawnWorker() if err != nil { return err @@ -162,7 +162,7 @@ func (d *Dispatcher) dropWorker() { // spawnWorker allocates a new worker for job processing func (d *Dispatcher) spawnWorker() error { - if atomic.LoadInt64(&d.count) >= d.maxWorkers { + if atomic.LoadInt32(&d.count) >= d.maxWorkers { return errors.New("dispatcher cannot spawn more workers; ceiling reached") } var spawnWg sync.WaitGroup @@ -174,7 +174,7 @@ func (d *Dispatcher) spawnWorker() error { // Relayer routine relays communications across the defined routes func (d *Dispatcher) relayer(i *sync.WaitGroup) { - atomic.AddInt64(&d.count, 1) + atomic.AddInt32(&d.count, 1) d.wg.Add(1) timeout := time.NewTimer(0) i.Done() @@ -219,7 +219,7 @@ func (d *Dispatcher) relayer(i *sync.WaitGroup) { default: } } - atomic.AddInt64(&d.count, -1) + atomic.AddInt32(&d.count, -1) if v != nil { v.Done() } @@ -255,7 +255,7 @@ func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error { default: return fmt.Errorf("dispatcher buffer at max capacity [%d] current worker count [%d], spawn more workers via --dispatchworkers=x", len(d.jobs), - atomic.LoadInt64(&d.count)) + atomic.LoadInt32(&d.count)) } return nil diff --git a/dispatch/dispatch_types.go b/dispatch/dispatch_types.go index b52bb42f..05ce744f 100644 --- a/dispatch/dispatch_types.go +++ b/dispatch/dispatch_types.go @@ -46,11 +46,11 @@ type Dispatcher struct { outbound sync.Pool // MaxWorkers defines max worker ceiling - maxWorkers int64 + maxWorkers int32 // Atomic values ----------------------- // Worker counter - count int64 + count int32 // Dispatch status running uint32 diff --git a/engine/engine_types.go b/engine/engine_types.go index 0372df9a..11d3a3ec 100644 --- a/engine/engine_types.go +++ b/engine/engine_types.go @@ -64,5 +64,5 @@ type Settings struct { // Dispatch system settings EnableDispatcher bool - DispatchMaxWorkerAmount int64 + DispatchMaxWorkerAmount int } diff --git a/main.go b/main.go index 99b1fd1d..4b9b41fa 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,7 @@ func main() { flag.DurationVar(&settings.EventManagerDelay, "eventmanagerdelay", time.Duration(0), "sets the event managers sleep delay between event checking") flag.BoolVar(&settings.EnableNTPClient, "ntpclient", true, "enables the NTP client to check system clock drift") flag.BoolVar(&settings.EnableDispatcher, "dispatch", true, "enables the dispatch system") - flag.Int64Var(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit") + flag.IntVar(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit") // Forex provider settings flag.BoolVar(&settings.EnableCurrencyConverter, "currencyconverter", false, "overrides config and sets up foreign exchange Currency Converter")