mirror of
https://github.com/d0zingcat/gocryptotrader.git
synced 2026-05-13 15:09:42 +00:00
orderbook/buffer: data integrity and resubscription pass (#910)
* orderbook/buffer: data integrity and resubscription pass * btcmarkets: REMOVE THAT LIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIINE!!!!!!!!!!!!!!!!! * buffer: reinstate publish, refaactor, invalidate more and comments * buffer/orderbook: improve update and snapshot performance. Move Update type to orderbook package to util. pointer through entire function calls. (cleanup). Change action string to uint8 for easier comparison. Add parsing helper. Update current test benchmark comments. * dispatch: change publish func to variadic id param * dispatch: remove sender receiver wait time as this adds overhead and complexity. update tests. * dispatch: don't create pointers for every job container * rpcserver: fix assertion issues with data publishing change * linter: fixes * glorious: nits addr * depth: change validation handling to incorporate and store err * linter: fix more issues * dispatch: fix race * travis: update before fetching * depth: wrap and return wrapped error in invalidate call and fix tests * btcmarkets: fix commenting * workflow: check * workflow: check * orderbook: check error * buffer/depth: return invalidation error and fix tests * gctcli: display errors on orderbook streams * buffer: remove unused types * orderbook/bitmex: shift function to bitmex * orderbook: Add specific comments to unexported functions that don't have locking require locking. * orderbook: restrict published data functionality to orderbook.Outbound interface * common: add assertion failure helper for error * dispatch: remove atomics, add mutex protection, remove add/remove worker, redo main tests * dispatch: export function * engine: revert and change sub logger to manager * engine: remove old test * dispatch: add common variable ;) * btcmarket: don't overflow int in tests on 32bit systems * ci: force 1.17.7 usage for go * Revert "ci: force 1.17.7 usage for go" This reverts commit af2f95563bf218cf2b9f36a9fcf3258e2c6a2d91. * golangci: bump version add and remove linter items * Revert "golangci: bump version add and remove linter items" This reverts commit 3c98bffc9d030e39faca0387ea40c151df2ab06b. * dispatch: remove unsused mutex from mux * order: slight optimizations * nits: glorious * dispatch: fix regression on uuid generation and input inline with master * linter: fix * linter: fix * glorious: nit - rm slice segration * account: fix test after merge * coinbasepro: revert change * account: close channel instead of needing a receiver, push alert in routine to prepare for waiter. Co-authored-by: Ryan O'Hara-Reid <ryan.oharareid@thrasher.io>
This commit is contained in:
@@ -4,156 +4,149 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
"github.com/thrasher-corp/gocryptotrader/log"
|
||||
)
|
||||
|
||||
// ErrNotRunning defines an error when the dispatcher is not running
|
||||
var ErrNotRunning = errors.New("dispatcher not running")
|
||||
var (
|
||||
// ErrNotRunning defines an error when the dispatcher is not running
|
||||
ErrNotRunning = errors.New("dispatcher not running")
|
||||
|
||||
errDispatcherNotInitialized = errors.New("dispatcher not initialised")
|
||||
errDispatcherAlreadyRunning = errors.New("dispatcher already running")
|
||||
errDispatchShutdown = errors.New("dispatcher did not shutdown properly, routines failed to close")
|
||||
errDispatcherUUIDNotFoundInRouteList = errors.New("dispatcher uuid not found in route list")
|
||||
errTypeAssertionFailure = errors.New("type assertion failure")
|
||||
errChannelNotFoundInUUIDRef = errors.New("dispatcher channel not found in uuid reference slice")
|
||||
errUUIDCollision = errors.New("dispatcher collision detected, uuid already exists")
|
||||
errDispatcherJobsAtLimit = errors.New("dispatcher jobs at limit")
|
||||
errChannelIsNil = errors.New("channel is nil")
|
||||
errUUIDGeneratorFunctionIsNil = errors.New("UUID generator function is nil")
|
||||
|
||||
limitMessage = "%w [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x, or increase the jobs limit via --dispatchjobslimit=x"
|
||||
)
|
||||
|
||||
// Name is an exported subsystem name
|
||||
const Name = "dispatch"
|
||||
|
||||
func init() {
|
||||
dispatcher = &Dispatcher{
|
||||
dispatcher = NewDispatcher()
|
||||
}
|
||||
|
||||
// NewDispatcher creates a new Dispatcher for relaying data.
|
||||
func NewDispatcher() *Dispatcher {
|
||||
return &Dispatcher{
|
||||
routes: make(map[uuid.UUID][]chan interface{}),
|
||||
outbound: sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Create unbuffered channel for data pass
|
||||
return make(chan interface{})
|
||||
},
|
||||
New: getChan,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func getChan() interface{} {
|
||||
// Create unbuffered channel for data pass
|
||||
return make(chan interface{})
|
||||
}
|
||||
|
||||
// Start starts the dispatch system by spawning workers and allocating memory
|
||||
func Start(workers, jobsLimit int) error {
|
||||
if dispatcher == nil {
|
||||
return errors.New(errNotInitialised)
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
return dispatcher.start(workers, jobsLimit)
|
||||
}
|
||||
|
||||
// Stop attempts to stop the dispatch service, this will close all pipe channels
|
||||
// flush job list and drop all workers
|
||||
func Stop() error {
|
||||
if dispatcher == nil {
|
||||
return errors.New(errNotInitialised)
|
||||
}
|
||||
|
||||
log.Debugln(log.DispatchMgr, "Dispatch manager shutting down...")
|
||||
|
||||
mtx.Lock()
|
||||
defer mtx.Unlock()
|
||||
return dispatcher.stop()
|
||||
}
|
||||
|
||||
// IsRunning checks to see if the dispatch service is running
|
||||
func IsRunning() bool {
|
||||
if dispatcher == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return dispatcher.isRunning()
|
||||
}
|
||||
|
||||
// DropWorker drops a worker routine
|
||||
func DropWorker() error {
|
||||
if dispatcher == nil {
|
||||
return errors.New(errNotInitialised)
|
||||
}
|
||||
|
||||
dispatcher.dropWorker()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SpawnWorker starts a new worker routine
|
||||
func SpawnWorker() error {
|
||||
if dispatcher == nil {
|
||||
return errors.New(errNotInitialised)
|
||||
}
|
||||
return dispatcher.spawnWorker()
|
||||
}
|
||||
|
||||
// start compares atomic running value, sets defaults, overides with
|
||||
// configuration, then spawns workers
|
||||
func (d *Dispatcher) start(workers, channelCapacity int) error {
|
||||
if atomic.LoadUint32(&d.running) == 1 {
|
||||
return errors.New("dispatcher already running")
|
||||
if d == nil {
|
||||
return errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
if d.running {
|
||||
return errDispatcherAlreadyRunning
|
||||
}
|
||||
|
||||
d.running = true
|
||||
|
||||
if workers < 1 {
|
||||
log.Warn(log.DispatchMgr,
|
||||
"Dispatcher: workers cannot be zero, using default values")
|
||||
log.Warnf(log.DispatchMgr,
|
||||
"workers cannot be zero, using default value %d\n",
|
||||
DefaultMaxWorkers)
|
||||
workers = DefaultMaxWorkers
|
||||
}
|
||||
if channelCapacity < 1 {
|
||||
log.Warn(log.DispatchMgr,
|
||||
"Dispatcher: jobs limit cannot be zero, using default values")
|
||||
log.Warnf(log.DispatchMgr,
|
||||
"jobs limit cannot be zero, using default values %d\n",
|
||||
DefaultJobsLimit)
|
||||
channelCapacity = DefaultJobsLimit
|
||||
}
|
||||
d.jobs = make(chan *job, channelCapacity)
|
||||
d.maxWorkers = int32(workers)
|
||||
d.shutdown = make(chan *sync.WaitGroup)
|
||||
d.jobs = make(chan job, channelCapacity)
|
||||
d.maxWorkers = workers
|
||||
d.shutdown = make(chan struct{})
|
||||
|
||||
if atomic.LoadInt32(&d.count) != 0 {
|
||||
return errors.New("dispatcher leaked workers found")
|
||||
for i := 0; i < d.maxWorkers; i++ {
|
||||
d.wg.Add(1)
|
||||
go d.relayer()
|
||||
}
|
||||
|
||||
for i := int32(0); i < d.maxWorkers; i++ {
|
||||
err := d.spawnWorker()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
atomic.SwapUint32(&d.running, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// stop stops the service and shuts down all worker routines
|
||||
func (d *Dispatcher) stop() error {
|
||||
if !atomic.CompareAndSwapUint32(&d.running, 1, 0) {
|
||||
if d == nil {
|
||||
return errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
d.m.Lock()
|
||||
defer d.m.Unlock()
|
||||
|
||||
if !d.running {
|
||||
return ErrNotRunning
|
||||
}
|
||||
|
||||
d.running = false
|
||||
|
||||
// Stop all jobs
|
||||
close(d.jobs)
|
||||
|
||||
// Release finished workers
|
||||
close(d.shutdown)
|
||||
ch := make(chan struct{})
|
||||
timer := time.NewTimer(1 * time.Second)
|
||||
defer func() {
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
|
||||
d.rMtx.Lock()
|
||||
for key, pipes := range d.routes {
|
||||
for i := range pipes {
|
||||
// Boot off receivers waiting on pipes.
|
||||
close(pipes[i])
|
||||
}
|
||||
}()
|
||||
go func(ch chan struct{}) { d.wg.Wait(); ch <- struct{}{} }(ch)
|
||||
// Flush all pipes, re-subscription will need to occur.
|
||||
d.routes[key] = nil
|
||||
}
|
||||
d.rMtx.Unlock()
|
||||
|
||||
ch := make(chan struct{})
|
||||
timer := time.NewTimer(time.Second)
|
||||
go func(ch chan<- struct{}) { d.wg.Wait(); ch <- struct{}{} }(ch)
|
||||
select {
|
||||
case <-ch:
|
||||
// close all routes
|
||||
for key := range d.routes {
|
||||
for i := range d.routes[key] {
|
||||
close(d.routes[key][i])
|
||||
}
|
||||
|
||||
d.routes[key] = nil
|
||||
}
|
||||
|
||||
for len(d.jobs) != 0 { // drain jobs channel for old data
|
||||
<-d.jobs
|
||||
}
|
||||
|
||||
log.Debugln(log.DispatchMgr, "Dispatch manager shutdown.")
|
||||
|
||||
return nil
|
||||
case <-timer.C:
|
||||
return errors.New(errShutdownRoutines)
|
||||
return errDispatchShutdown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,80 +155,29 @@ func (d *Dispatcher) isRunning() bool {
|
||||
if d == nil {
|
||||
return false
|
||||
}
|
||||
return atomic.LoadUint32(&d.running) == 1
|
||||
}
|
||||
|
||||
// dropWorker deallocates a worker routine
|
||||
func (d *Dispatcher) dropWorker() {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
d.shutdown <- &wg
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// spawnWorker allocates a new worker for job processing
|
||||
func (d *Dispatcher) spawnWorker() error {
|
||||
if atomic.LoadInt32(&d.count) >= d.maxWorkers {
|
||||
return errors.New("dispatcher cannot spawn more workers; ceiling reached")
|
||||
}
|
||||
var spawnWg sync.WaitGroup
|
||||
spawnWg.Add(1)
|
||||
go d.relayer(&spawnWg)
|
||||
spawnWg.Wait()
|
||||
return nil
|
||||
d.m.RLock()
|
||||
defer d.m.RUnlock()
|
||||
return d.running
|
||||
}
|
||||
|
||||
// relayer routine relays communications across the defined routes
|
||||
func (d *Dispatcher) relayer(i *sync.WaitGroup) {
|
||||
atomic.AddInt32(&d.count, 1)
|
||||
d.wg.Add(1)
|
||||
timeout := time.NewTimer(0)
|
||||
i.Done()
|
||||
func (d *Dispatcher) relayer() {
|
||||
for {
|
||||
select {
|
||||
case j := <-d.jobs:
|
||||
d.rMtx.RLock()
|
||||
if _, ok := d.routes[j.ID]; !ok {
|
||||
d.rMtx.RUnlock()
|
||||
continue
|
||||
}
|
||||
// Channel handshake timeout feature if a channel is blocked for any
|
||||
// period of time due to an issue with the receiving routine.
|
||||
// This will wait on channel then fall over to the next route when
|
||||
// the timer actuates and continue over the route list. Have to
|
||||
// iterate across full length of routes so every routine can get
|
||||
// their new info, cannot be buffered as we dont want to have an old
|
||||
// orderbook etc contained in a buffered channel when a routine
|
||||
// actually is ready for a receive.
|
||||
// TODO: Need to consider optimal timer length
|
||||
for i := range d.routes[j.ID] {
|
||||
if !timeout.Stop() { // Stop timer before reset
|
||||
// Drain channel if timer has already actuated
|
||||
if pipes, ok := d.routes[j.ID]; ok {
|
||||
for i := range pipes {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
case pipes[i] <- j.Data:
|
||||
default:
|
||||
// no receiver; don't wait. This limits complexity.
|
||||
}
|
||||
}
|
||||
|
||||
timeout.Reset(DefaultHandshakeTimeout)
|
||||
select {
|
||||
case d.routes[j.ID][i] <- j.Data:
|
||||
case <-timeout.C:
|
||||
}
|
||||
}
|
||||
d.rMtx.RUnlock()
|
||||
|
||||
case v := <-d.shutdown:
|
||||
if !timeout.Stop() {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
atomic.AddInt32(&d.count, -1)
|
||||
if v != nil {
|
||||
v.Done()
|
||||
}
|
||||
case <-d.shutdown:
|
||||
d.wg.Done()
|
||||
return
|
||||
}
|
||||
@@ -244,131 +186,151 @@ func (d *Dispatcher) relayer(i *sync.WaitGroup) {
|
||||
|
||||
// publish relays data to the subscribed subsystems
|
||||
func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error {
|
||||
if d == nil {
|
||||
return errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
if id.IsNil() {
|
||||
return errIDNotSet
|
||||
}
|
||||
|
||||
if data == nil {
|
||||
return errors.New("dispatcher data cannot be nil")
|
||||
return errNoData
|
||||
}
|
||||
|
||||
if id == (uuid.UUID{}) {
|
||||
return errors.New("dispatcher uuid not set")
|
||||
}
|
||||
d.m.RLock()
|
||||
defer d.m.RUnlock()
|
||||
|
||||
if atomic.LoadUint32(&d.running) == 0 {
|
||||
if !d.running {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a new job to publish
|
||||
newJob := &job{
|
||||
Data: data,
|
||||
ID: id,
|
||||
}
|
||||
|
||||
// Push job on stack here
|
||||
select {
|
||||
case d.jobs <- newJob:
|
||||
case d.jobs <- job{data, id}: // Push job into job channel.
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("dispatcher jobs at limit [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x"+
|
||||
", or increase the jobs limit via --dispatchjobslimit=x",
|
||||
return fmt.Errorf(limitMessage,
|
||||
errDispatcherJobsAtLimit,
|
||||
len(d.jobs),
|
||||
atomic.LoadInt32(&d.count))
|
||||
d.maxWorkers)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe subscribes a system and returns a communication chan, this does not
|
||||
// ensure initial push. If your routine is out of sync with heartbeat and the
|
||||
// system does not get a change, its up to you to in turn get initial state.
|
||||
func (d *Dispatcher) subscribe(id uuid.UUID) (chan interface{}, error) {
|
||||
if atomic.LoadUint32(&d.running) == 0 {
|
||||
return nil, errors.New(errNotInitialised)
|
||||
// ensure initial push.
|
||||
func (d *Dispatcher) subscribe(id uuid.UUID) (<-chan interface{}, error) {
|
||||
if d == nil {
|
||||
return nil, errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
// Read lock to read route list
|
||||
d.rMtx.RLock()
|
||||
if _, ok := d.routes[id]; !ok {
|
||||
d.rMtx.RUnlock()
|
||||
return nil, errors.New("dispatcher uuid not found in route list")
|
||||
if id.IsNil() {
|
||||
return nil, errIDNotSet
|
||||
}
|
||||
|
||||
d.m.RLock()
|
||||
defer d.m.RUnlock()
|
||||
|
||||
if !d.running {
|
||||
return nil, ErrNotRunning
|
||||
}
|
||||
|
||||
d.rMtx.Lock()
|
||||
defer d.rMtx.Unlock()
|
||||
if _, ok := d.routes[id]; !ok {
|
||||
return nil, errDispatcherUUIDNotFoundInRouteList
|
||||
}
|
||||
d.rMtx.RUnlock()
|
||||
|
||||
// Get an unused channel from the channel pool
|
||||
unusedChan, ok := d.outbound.Get().(chan interface{})
|
||||
ch, ok := d.outbound.Get().(chan interface{})
|
||||
if !ok {
|
||||
return nil, errors.New("unable to type assert unusedChan")
|
||||
return nil, errTypeAssertionFailure
|
||||
}
|
||||
|
||||
// Lock for writing to the route list
|
||||
d.rMtx.Lock()
|
||||
d.routes[id] = append(d.routes[id], unusedChan)
|
||||
d.rMtx.Unlock()
|
||||
|
||||
return unusedChan, nil
|
||||
d.routes[id] = append(d.routes[id], ch)
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// Unsubscribe unsubs a routine from the dispatcher
|
||||
func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan chan interface{}) error {
|
||||
if atomic.LoadUint32(&d.running) == 0 {
|
||||
func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan <-chan interface{}) error {
|
||||
if d == nil {
|
||||
return errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
if id.IsNil() {
|
||||
return errIDNotSet
|
||||
}
|
||||
|
||||
if usedChan == nil {
|
||||
return errChannelIsNil
|
||||
}
|
||||
|
||||
d.m.RLock()
|
||||
defer d.m.RUnlock()
|
||||
|
||||
if !d.running {
|
||||
// reference will already be released in the stop function
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read lock to read route list
|
||||
d.rMtx.RLock()
|
||||
if _, ok := d.routes[id]; !ok {
|
||||
d.rMtx.RUnlock()
|
||||
return errors.New("dispatcher uuid does not reference any channels")
|
||||
}
|
||||
d.rMtx.RUnlock()
|
||||
|
||||
// Lock for write to delete references
|
||||
d.rMtx.Lock()
|
||||
for i := range d.routes[id] {
|
||||
if d.routes[id][i] != usedChan {
|
||||
defer d.rMtx.Unlock()
|
||||
pipes, ok := d.routes[id]
|
||||
if !ok {
|
||||
return errDispatcherUUIDNotFoundInRouteList
|
||||
}
|
||||
|
||||
for i := range pipes {
|
||||
if pipes[i] != usedChan {
|
||||
continue
|
||||
}
|
||||
// Delete individual reference
|
||||
d.routes[id][i] = d.routes[id][len(d.routes[id])-1]
|
||||
d.routes[id][len(d.routes[id])-1] = nil
|
||||
d.routes[id] = d.routes[id][:len(d.routes[id])-1]
|
||||
|
||||
d.rMtx.Unlock()
|
||||
pipes[i] = pipes[len(pipes)-1]
|
||||
pipes[len(pipes)-1] = nil
|
||||
d.routes[id] = pipes[:len(pipes)-1]
|
||||
|
||||
// Drain and put the used chan back in pool; only if it is not closed.
|
||||
select {
|
||||
case _, ok := <-usedChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
case _, ok = <-usedChan:
|
||||
default:
|
||||
}
|
||||
|
||||
d.outbound.Put(usedChan)
|
||||
if ok {
|
||||
d.outbound.Put(usedChan)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
d.rMtx.Unlock()
|
||||
return errors.New("dispatcher channel not found in uuid reference slice")
|
||||
return errChannelNotFoundInUUIDRef
|
||||
}
|
||||
|
||||
// GetNewID returns a new ID
|
||||
func (d *Dispatcher) getNewID() (uuid.UUID, error) {
|
||||
func (d *Dispatcher) getNewID(genFn func() (uuid.UUID, error)) (uuid.UUID, error) {
|
||||
if d == nil {
|
||||
return uuid.Nil, errDispatcherNotInitialized
|
||||
}
|
||||
|
||||
if genFn == nil {
|
||||
return uuid.Nil, errUUIDGeneratorFunctionIsNil
|
||||
}
|
||||
|
||||
// Continue to allow the generation, input and return of UUIDs even if
|
||||
// service is not currently enabled.
|
||||
|
||||
d.m.RLock()
|
||||
defer d.m.RUnlock()
|
||||
|
||||
// Generate new uuid
|
||||
newID, err := uuid.NewV4()
|
||||
newID, err := genFn()
|
||||
if err != nil {
|
||||
return uuid.UUID{}, err
|
||||
return uuid.Nil, err
|
||||
}
|
||||
|
||||
// Check to see if it already exists
|
||||
d.rMtx.RLock()
|
||||
if _, ok := d.routes[newID]; ok {
|
||||
d.rMtx.RUnlock()
|
||||
return newID, errors.New("dispatcher collision detected, uuid already exists")
|
||||
}
|
||||
d.rMtx.RUnlock()
|
||||
|
||||
// Write the key into system
|
||||
d.rMtx.Lock()
|
||||
defer d.rMtx.Unlock()
|
||||
// Check to see if it already exists
|
||||
if _, ok := d.routes[newID]; ok {
|
||||
return uuid.Nil, errUUIDCollision
|
||||
}
|
||||
// Write the key into system
|
||||
d.routes[newID] = nil
|
||||
d.rMtx.Unlock()
|
||||
|
||||
return newID, nil
|
||||
}
|
||||
|
||||
@@ -1,233 +1,452 @@
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
var mux *Mux
|
||||
var (
|
||||
errTest = errors.New("test error")
|
||||
nonEmptyUUID = [uuid.Size]byte{108, 105, 99, 107, 77, 121, 72, 97, 105, 114, 121, 66, 97, 108, 108, 115}
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
err := Start(DefaultMaxWorkers, 0)
|
||||
func TestGlobalDispatcher(t *testing.T) {
|
||||
err := Start(0, 0)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
cpyDispatch = dispatcher
|
||||
mux = GetNewMux()
|
||||
cpyMux = mux
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
var cpyDispatch *Dispatcher
|
||||
var cpyMux *Mux
|
||||
|
||||
func TestDispatcher(t *testing.T) {
|
||||
dispatcher = nil
|
||||
err := Stop()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = Start(10, 0)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
if IsRunning() {
|
||||
t.Error("should be false")
|
||||
}
|
||||
|
||||
err = DropWorker()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = SpawnWorker()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
dispatcher = cpyDispatch
|
||||
|
||||
if !IsRunning() {
|
||||
t.Error("should be true")
|
||||
}
|
||||
|
||||
err = Start(10, 0)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = DropWorker()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = DropWorker()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = SpawnWorker()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = SpawnWorker()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = SpawnWorker()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
running := IsRunning()
|
||||
if !running {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", IsRunning(), true)
|
||||
}
|
||||
|
||||
err = Stop()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = Stop()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = Start(0, 20)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if cap(dispatcher.jobs) != 20 {
|
||||
t.Errorf("Expected jobs limit to be %v, is %v", 20, cap(dispatcher.jobs))
|
||||
}
|
||||
payload := "something"
|
||||
|
||||
err = dispatcher.publish(uuid.UUID{}, &payload)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = dispatcher.publish(uuid.UUID{}, nil)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
id, err := dispatcher.getNewID()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = dispatcher.publish(id, &payload)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = dispatcher.stop()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
err = dispatcher.publish(id, &payload)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
_, err = dispatcher.subscribe(id)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
err = dispatcher.start(10, -1)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if cap(dispatcher.jobs) != DefaultJobsLimit {
|
||||
t.Errorf("Expected jobs limit to be %v, is %v", DefaultJobsLimit, cap(dispatcher.jobs))
|
||||
}
|
||||
someID, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
_, err = dispatcher.subscribe(someID)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
|
||||
randomChan := make(chan interface{})
|
||||
err = dispatcher.unsubscribe(someID, randomChan)
|
||||
if err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
|
||||
err = dispatcher.unsubscribe(id, randomChan)
|
||||
if err == nil {
|
||||
t.Error("Expected error")
|
||||
}
|
||||
|
||||
close(randomChan)
|
||||
err = dispatcher.unsubscribe(id, randomChan)
|
||||
if err == nil {
|
||||
t.Error("Expected error")
|
||||
running = IsRunning()
|
||||
if running {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", IsRunning(), false)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMux(t *testing.T) {
|
||||
mux = nil
|
||||
_, err := mux.Subscribe(uuid.UUID{})
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
func TestStartStop(t *testing.T) {
|
||||
t.Parallel()
|
||||
var d *Dispatcher
|
||||
|
||||
if d.isRunning() {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
||||
}
|
||||
|
||||
err = mux.Unsubscribe(uuid.UUID{}, nil)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
err := d.stop()
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
err = mux.Publish(nil, nil)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
err = d.start(10, 0)
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
_, err = mux.GetID()
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
}
|
||||
mux = cpyMux
|
||||
d = NewDispatcher()
|
||||
|
||||
err = mux.Publish(nil, nil)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
err = d.stop()
|
||||
if !errors.Is(err, ErrNotRunning) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNotRunning)
|
||||
}
|
||||
|
||||
payload := "string"
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
if d.isRunning() {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
||||
}
|
||||
|
||||
err = mux.Publish([]uuid.UUID{id}, &payload)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
err = d.start(1, 100)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
_, err = mux.Subscribe(uuid.UUID{})
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
if !d.isRunning() {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), true)
|
||||
}
|
||||
|
||||
_, err = mux.Subscribe(id)
|
||||
if err == nil {
|
||||
t.Error("error cannot be nil")
|
||||
err = d.start(0, 0)
|
||||
if !errors.Is(err, errDispatcherAlreadyRunning) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherAlreadyRunning)
|
||||
}
|
||||
|
||||
// Add route option
|
||||
id, err := d.getNewID(uuid.NewV4)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
// Add pipe
|
||||
_, err = d.subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
// Max out jobs channel
|
||||
for x := 0; x < 99; x++ {
|
||||
err = d.publish(id, "woah-nelly")
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
}
|
||||
|
||||
err = d.stop()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
if d.isRunning() {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", d.isRunning(), false)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
t.Parallel()
|
||||
var d *Dispatcher
|
||||
_, err := d.subscribe(uuid.Nil)
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
d = NewDispatcher()
|
||||
|
||||
_, err = d.subscribe(uuid.Nil)
|
||||
if !errors.Is(err, errIDNotSet) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
||||
}
|
||||
|
||||
_, err = d.subscribe(nonEmptyUUID)
|
||||
if !errors.Is(err, ErrNotRunning) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, ErrNotRunning)
|
||||
}
|
||||
|
||||
err = d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
id, err := d.getNewID(uuid.NewV4)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
_, err = d.subscribe(nonEmptyUUID)
|
||||
if !errors.Is(err, errDispatcherUUIDNotFoundInRouteList) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherUUIDNotFoundInRouteList)
|
||||
}
|
||||
|
||||
d.outbound.New = func() interface{} { return "omg" }
|
||||
_, err = d.subscribe(id)
|
||||
if !errors.Is(err, errTypeAssertionFailure) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errTypeAssertionFailure)
|
||||
}
|
||||
|
||||
d.outbound.New = getChan
|
||||
ch, err := d.subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
if ch == nil {
|
||||
t.Fatal("expected channel value")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
t.Parallel()
|
||||
var d *Dispatcher
|
||||
|
||||
err := d.unsubscribe(uuid.Nil, nil)
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
d = NewDispatcher()
|
||||
|
||||
err = d.unsubscribe(uuid.Nil, nil)
|
||||
if !errors.Is(err, errIDNotSet) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
||||
}
|
||||
|
||||
err = d.unsubscribe(nonEmptyUUID, nil)
|
||||
if !errors.Is(err, errChannelIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errChannelIsNil)
|
||||
}
|
||||
|
||||
// will return nil if not running
|
||||
err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{}))
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.unsubscribe(nonEmptyUUID, make(<-chan interface{}))
|
||||
if !errors.Is(err, errDispatcherUUIDNotFoundInRouteList) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherUUIDNotFoundInRouteList)
|
||||
}
|
||||
|
||||
id, err := d.getNewID(uuid.NewV4)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.unsubscribe(id, make(<-chan interface{}))
|
||||
if !errors.Is(err, errChannelNotFoundInUUIDRef) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errChannelNotFoundInUUIDRef)
|
||||
}
|
||||
|
||||
// Skip over this when matching pipes
|
||||
_, err = d.subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
ch, err := d.subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.unsubscribe(id, ch)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
t.Parallel()
|
||||
var d *Dispatcher
|
||||
|
||||
err := d.publish(uuid.Nil, nil)
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
d = NewDispatcher()
|
||||
|
||||
err = d.publish(nonEmptyUUID, "lol")
|
||||
if !errors.Is(err, nil) { // If not running, don't send back an error.
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.start(2, 10)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
err = d.publish(uuid.Nil, nil)
|
||||
if !errors.Is(err, errIDNotSet) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
||||
}
|
||||
|
||||
err = d.publish(nonEmptyUUID, nil)
|
||||
if !errors.Is(err, errNoData) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errNoData)
|
||||
}
|
||||
|
||||
// max out worker processing
|
||||
for x := 0; x < 100; x++ {
|
||||
err2 := d.publish(nonEmptyUUID, "lol")
|
||||
if !errors.Is(err2, nil) {
|
||||
err = err2
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !errors.Is(err, errDispatcherJobsAtLimit) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublishReceive(t *testing.T) {
|
||||
t.Parallel()
|
||||
d := NewDispatcher()
|
||||
if err := d.start(0, 0); !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
id, err := d.getNewID(uuid.NewV4)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
incoming, err := d.subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
go func(d *Dispatcher, id uuid.UUID) {
|
||||
for x := 0; x < 10; x++ {
|
||||
err2 := d.publish(id, "WOW")
|
||||
if !errors.Is(err2, nil) {
|
||||
panic(err2)
|
||||
}
|
||||
}
|
||||
}(d, id)
|
||||
|
||||
data, ok := (<-incoming).(string)
|
||||
if !ok {
|
||||
t.Fatal("type assertion failure expected string")
|
||||
}
|
||||
|
||||
if data != "WOW" {
|
||||
t.Fatal("unexpected value")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNewID(t *testing.T) {
|
||||
t.Parallel()
|
||||
var d *Dispatcher
|
||||
|
||||
_, err := d.getNewID(uuid.NewV4)
|
||||
if !errors.Is(err, errDispatcherNotInitialized) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherNotInitialized)
|
||||
}
|
||||
|
||||
d = NewDispatcher()
|
||||
|
||||
err = d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
_, err = d.getNewID(nil)
|
||||
if !errors.Is(err, errUUIDGeneratorFunctionIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errUUIDGeneratorFunctionIsNil)
|
||||
}
|
||||
|
||||
_, err = d.getNewID(func() (uuid.UUID, error) { return uuid.Nil, errTest })
|
||||
if !errors.Is(err, errTest) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errTest)
|
||||
}
|
||||
|
||||
_, err = d.getNewID(func() (uuid.UUID, error) { return [uuid.Size]byte{254}, nil })
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
_, err = d.getNewID(func() (uuid.UUID, error) { return [uuid.Size]byte{254}, nil })
|
||||
if !errors.Is(err, errUUIDCollision) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errUUIDCollision)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMux(t *testing.T) {
|
||||
t.Parallel()
|
||||
var mux *Mux
|
||||
_, err := mux.Subscribe(uuid.Nil)
|
||||
if !errors.Is(err, errMuxIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
||||
}
|
||||
|
||||
err = mux.Unsubscribe(uuid.Nil, nil)
|
||||
if !errors.Is(err, errMuxIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
||||
}
|
||||
|
||||
err = mux.Publish(nil)
|
||||
if !errors.Is(err, errMuxIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
||||
}
|
||||
|
||||
_, err = mux.GetID()
|
||||
if !errors.Is(err, errMuxIsNil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errMuxIsNil)
|
||||
}
|
||||
|
||||
d := NewDispatcher()
|
||||
err = d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
mux = GetNewMux(d)
|
||||
|
||||
err = mux.Publish(nil)
|
||||
if !errors.Is(err, errNoData) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errNoData)
|
||||
}
|
||||
|
||||
err = mux.Publish("lol")
|
||||
if !errors.Is(err, errNoIDs) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errNoIDs)
|
||||
}
|
||||
|
||||
id, err := mux.GetID()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
_, err = mux.Subscribe(uuid.Nil)
|
||||
if !errors.Is(err, errIDNotSet) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, errIDNotSet)
|
||||
}
|
||||
|
||||
pipe, err := mux.Subscribe(id)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
|
||||
var errChan = make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// Makes sure receiver is waiting for update
|
||||
go func(ch <-chan interface{}, errChan chan error, wg *sync.WaitGroup) {
|
||||
wg.Done()
|
||||
response, ok := (<-ch).(string)
|
||||
if !ok {
|
||||
errChan <- errors.New("type assertion failure")
|
||||
return
|
||||
}
|
||||
|
||||
if response != "string" {
|
||||
errChan <- errors.New("unexpected return")
|
||||
return
|
||||
}
|
||||
errChan <- nil
|
||||
}(pipe.C, errChan, &wg)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
payload := "string"
|
||||
go func(payload string) {
|
||||
err2 := mux.Publish(payload, id)
|
||||
if err2 != nil {
|
||||
fmt.Println(err2)
|
||||
}
|
||||
}(payload)
|
||||
|
||||
err = <-errChan
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = pipe.Release()
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMuxSubscribe(t *testing.T) {
|
||||
t.Parallel()
|
||||
d := NewDispatcher()
|
||||
err := d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
mux := GetNewMux(d)
|
||||
itemID, err := mux.GetID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -250,7 +469,14 @@ func TestSubscribe(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
func TestMuxPublish(t *testing.T) {
|
||||
t.Parallel()
|
||||
d := NewDispatcher()
|
||||
err := d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
mux := GetNewMux(d)
|
||||
itemID, err := mux.GetID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -261,41 +487,32 @@ func TestPublish(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func(wg *sync.WaitGroup) {
|
||||
wg.Done()
|
||||
for {
|
||||
_, ok := <-pipe.C
|
||||
if !ok {
|
||||
pErr := pipe.Release()
|
||||
if pErr != nil {
|
||||
t.Error(pErr)
|
||||
}
|
||||
wg.Done()
|
||||
return
|
||||
go func(mux *Mux) {
|
||||
for i := 0; i < 100; i++ {
|
||||
errMux := mux.Publish(i, itemID)
|
||||
if errMux != nil {
|
||||
t.Error(errMux)
|
||||
}
|
||||
}
|
||||
}(&wg)
|
||||
wg.Wait()
|
||||
wg.Add(1)
|
||||
mainPayload := "PAYLOAD"
|
||||
for i := 0; i < 100; i++ {
|
||||
errMux := mux.Publish([]uuid.UUID{itemID}, &mainPayload)
|
||||
if errMux != nil {
|
||||
t.Error(errMux)
|
||||
}
|
||||
}
|
||||
}(mux)
|
||||
|
||||
<-pipe.C
|
||||
|
||||
// Shut down dispatch system
|
||||
err = Stop()
|
||||
err = d.stop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// 2363419 468.7 ns/op 142 B/op 1 allocs/op
|
||||
func BenchmarkSubscribe(b *testing.B) {
|
||||
d := NewDispatcher()
|
||||
err := d.start(0, 0)
|
||||
if !errors.Is(err, nil) {
|
||||
b.Fatalf("received: '%v' but expected: '%v'", err, nil)
|
||||
}
|
||||
mux := GetNewMux(d)
|
||||
newID, err := mux.GetID()
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
|
||||
@@ -17,14 +17,10 @@ const (
|
||||
// DefaultHandshakeTimeout defines a workers max length of time to wait on a
|
||||
// an unbuffered channel for a receiver before moving on to next route
|
||||
DefaultHandshakeTimeout = 200 * time.Nanosecond
|
||||
|
||||
errNotInitialised = "dispatcher not initialised"
|
||||
errShutdownRoutines = "dispatcher did not shutdown properly, routines failed to close"
|
||||
)
|
||||
|
||||
// dispatcher is our main in memory instance with a stop/start mtx below
|
||||
var dispatcher *Dispatcher
|
||||
var mtx sync.Mutex
|
||||
|
||||
// Dispatcher defines an internal subsystem communication/change state publisher
|
||||
type Dispatcher struct {
|
||||
@@ -33,30 +29,30 @@ type Dispatcher struct {
|
||||
// then publish the data across the full registered channels for that uuid.
|
||||
// See relayer() method below.
|
||||
routes map[uuid.UUID][]chan interface{}
|
||||
|
||||
// rMtx protects the routes variable ensuring acceptable read/write access
|
||||
rMtx sync.RWMutex
|
||||
|
||||
// Persistent buffered job queue for relayers
|
||||
jobs chan *job
|
||||
jobs chan job
|
||||
|
||||
// Dynamic channel pool; returns an unbuffered channel for routes map
|
||||
outbound sync.Pool
|
||||
|
||||
// MaxWorkers defines max worker ceiling
|
||||
maxWorkers int32
|
||||
// Atomic values -----------------------
|
||||
// Worker counter
|
||||
count int32
|
||||
maxWorkers int
|
||||
|
||||
// Dispatch status
|
||||
running uint32
|
||||
running bool
|
||||
|
||||
// Unbufferd shutdown chan, sync wg for ensuring concurrency when only
|
||||
// dropping a single relayer routine
|
||||
shutdown chan *sync.WaitGroup
|
||||
shutdown chan struct{}
|
||||
|
||||
// Relayer shutdown tracking
|
||||
wg sync.WaitGroup
|
||||
|
||||
// dispatcher write protection
|
||||
m sync.RWMutex
|
||||
}
|
||||
|
||||
// job defines a relaying job associated with a ticket which allows routing to
|
||||
@@ -76,7 +72,7 @@ type Mux struct {
|
||||
// Pipe defines an outbound object to the desired routine
|
||||
type Pipe struct {
|
||||
// Channel to get all our lovely informations
|
||||
C chan interface{}
|
||||
C <-chan interface{}
|
||||
// ID to tracked system
|
||||
id uuid.UUID
|
||||
// Reference to multiplexer
|
||||
|
||||
@@ -2,25 +2,35 @@ package dispatch
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
// GetNewMux returns a new multiplexer to track subsystem updates
|
||||
func GetNewMux() *Mux {
|
||||
return &Mux{d: dispatcher}
|
||||
var (
|
||||
errMuxIsNil = errors.New("mux is nil")
|
||||
errIDNotSet = errors.New("id not set")
|
||||
errNoData = errors.New("data payload is nil")
|
||||
errNoIDs = errors.New("no IDs to publish data to")
|
||||
)
|
||||
|
||||
// GetNewMux returns a new multiplexer to track subsystem updates, if nil
|
||||
// dispatcher provided it will default to the global Dispatcher.
|
||||
func GetNewMux(d *Dispatcher) *Mux {
|
||||
if d == nil {
|
||||
d = dispatcher
|
||||
}
|
||||
return &Mux{d: d}
|
||||
}
|
||||
|
||||
// Subscribe takes in a package defined signature element pointing to an ID set
|
||||
// and returns the associated pipe
|
||||
func (m *Mux) Subscribe(id uuid.UUID) (Pipe, error) {
|
||||
if m == nil {
|
||||
return Pipe{}, errors.New("mux is nil")
|
||||
return Pipe{}, errMuxIsNil
|
||||
}
|
||||
|
||||
if id == (uuid.UUID{}) {
|
||||
return Pipe{}, errors.New("id not set")
|
||||
if id.IsNil() {
|
||||
return Pipe{}, errIDNotSet
|
||||
}
|
||||
|
||||
ch, err := m.d.subscribe(id)
|
||||
@@ -32,29 +42,30 @@ func (m *Mux) Subscribe(id uuid.UUID) (Pipe, error) {
|
||||
}
|
||||
|
||||
// Unsubscribe returns channel to the pool for the full signature set
|
||||
func (m *Mux) Unsubscribe(id uuid.UUID, ch chan interface{}) error {
|
||||
func (m *Mux) Unsubscribe(id uuid.UUID, ch <-chan interface{}) error {
|
||||
if m == nil {
|
||||
return errors.New("mux is nil")
|
||||
return errMuxIsNil
|
||||
}
|
||||
return m.d.unsubscribe(id, ch)
|
||||
}
|
||||
|
||||
// Publish takes in a persistent memory address and dispatches changes to
|
||||
// required pipes. Data should be of *type.
|
||||
func (m *Mux) Publish(ids []uuid.UUID, data interface{}) error {
|
||||
// required pipes.
|
||||
func (m *Mux) Publish(data interface{}, ids ...uuid.UUID) error {
|
||||
if m == nil {
|
||||
return errors.New("mux is nil")
|
||||
return errMuxIsNil
|
||||
}
|
||||
|
||||
if data == nil {
|
||||
return errors.New("data payload is nil")
|
||||
return errNoData
|
||||
}
|
||||
|
||||
cpy := reflect.ValueOf(data).Elem().Interface()
|
||||
if len(ids) == 0 {
|
||||
return errNoIDs
|
||||
}
|
||||
|
||||
for i := range ids {
|
||||
// Create copy to not interfere with stored value
|
||||
err := m.d.publish(ids[i], &cpy)
|
||||
err := m.d.publish(ids[i], data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -65,9 +76,9 @@ func (m *Mux) Publish(ids []uuid.UUID, data interface{}) error {
|
||||
// GetID a new unique ID to track routing information in the dispatch system
|
||||
func (m *Mux) GetID() (uuid.UUID, error) {
|
||||
if m == nil {
|
||||
return uuid.UUID{}, errors.New("mux is nil")
|
||||
return uuid.UUID{}, errMuxIsNil
|
||||
}
|
||||
return m.d.getNewID()
|
||||
return m.d.getNewID(uuid.NewV4)
|
||||
}
|
||||
|
||||
// Release returns the channel to the communications pool to be reused
|
||||
|
||||
Reference in New Issue
Block a user