diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index a8d3f051..323f196d 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/gofrs/uuid" @@ -166,14 +167,23 @@ func (d *Dispatcher) relayer() { for { select { case j := <-d.jobs: + if j.ID.IsNil() { + // empty jobs from `channelCapacity` length are sent upon shutdown + // every real job created has an ID set + continue + } d.rMtx.RLock() - if pipes, ok := d.routes[j.ID]; ok { - for i := range pipes { - select { - case pipes[i] <- j.Data: - default: - // no receiver; don't wait. This limits complexity. - } + pipes, ok := d.routes[j.ID] + if !ok { + log.Warnf(log.DispatchMgr, "%v: %v\n", errDispatcherUUIDNotFoundInRouteList, j.ID) + d.rMtx.RUnlock() + continue + } + for i := range pipes { + select { + case pipes[i] <- j.Data: + default: + // no receiver; don't wait. This limits complexity. } } d.rMtx.RUnlock() @@ -247,6 +257,7 @@ func (d *Dispatcher) subscribe(id uuid.UUID) (chan interface{}, error) { } d.routes[id] = append(d.routes[id], ch) + atomic.AddInt32(&d.subscriberCount, 1) return ch, nil } @@ -287,6 +298,7 @@ func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan chan interface{}) error pipes[i] = pipes[len(pipes)-1] pipes[len(pipes)-1] = nil d.routes[id] = pipes[:len(pipes)-1] + atomic.AddInt32(&d.subscriberCount, -1) // Drain and put the used chan back in pool; only if it is not closed. select { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 566d9df5..36a09118 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -246,7 +246,7 @@ func TestPublish(t *testing.T) { d = NewDispatcher() - err = d.publish(nonEmptyUUID, "lol") + err = d.publish(nonEmptyUUID, "test") if !errors.Is(err, nil) { // If not running, don't send back an error. t.Fatalf("received: '%v' but expected: '%v'", err, nil) } @@ -266,15 +266,17 @@ func TestPublish(t *testing.T) { t.Fatalf("received: '%v' but expected: '%v'", err, errNoData) } - // max out worker processing - for x := 0; x < 100; x++ { - err2 := d.publish(nonEmptyUUID, "lol") + // demonstrate job limit error + d.routes[nonEmptyUUID] = []chan interface{}{ + make(chan interface{}), + } + for x := 0; x < 200; x++ { + err2 := d.publish(nonEmptyUUID, "test") if !errors.Is(err2, nil) { err = err2 break } } - if !errors.Is(err, errDispatcherJobsAtLimit) { t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit) } @@ -492,6 +494,19 @@ func TestMuxPublish(t *testing.T) { t.Fatal(err) } + // demonstrate that jobs do not get published when the limit should be reached + // but there is no listener associated with job + for x := 0; x < 200; x++ { + err2 := mux.Publish("test", itemID) + if !errors.Is(err2, nil) { + err = err2 + break + } + } + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + pipe, err := mux.Subscribe(itemID) if err != nil { t.Error(err) @@ -508,6 +523,35 @@ func TestMuxPublish(t *testing.T) { <-pipe.Channel() + // demonstrate that jobs can be limited when subscribed + for x := 0; x < 200; x++ { + err2 := mux.Publish("test", itemID) + if !errors.Is(err2, nil) { + err = err2 + break + } + } + if !errors.Is(err, errDispatcherJobsAtLimit) { + t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit) + } + + // demonstrate that jobs go back to not being sent after unsubscribing + err = mux.Unsubscribe(itemID, pipe.C) + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + + for x := 0; x < 200; x++ { + err2 := mux.Publish("test", itemID) + if !errors.Is(err2, nil) { + err = err2 + break + } + } + if !errors.Is(err, nil) { + t.Fatalf("received: '%v' but expected: '%v'", err, nil) + } + // Shut down dispatch system err = d.stop() if err != nil { @@ -515,7 +559,7 @@ func TestMuxPublish(t *testing.T) { } } -// 2363419 468.7 ns/op 142 B/op 1 allocs/op +// 13636467 84.26 ns/op 141 B/op 1 allocs/op func BenchmarkSubscribe(b *testing.B) { d := NewDispatcher() err := d.start(0, 0) diff --git a/dispatch/dispatch_types.go b/dispatch/dispatch_types.go index 32f64167..779c28f3 100644 --- a/dispatch/dispatch_types.go +++ b/dispatch/dispatch_types.go @@ -53,6 +53,9 @@ type Dispatcher struct { // dispatcher write protection m sync.RWMutex + // subscriberCount atomically stores the amount of subscription endpoints + // to verify whether to send out any jobs + subscriberCount int32 } // job defines a relaying job associated with a ticket which allows routing to diff --git a/dispatch/mux.go b/dispatch/mux.go index 516f20f0..6bb451ec 100644 --- a/dispatch/mux.go +++ b/dispatch/mux.go @@ -2,6 +2,7 @@ package dispatch import ( "errors" + "sync/atomic" "github.com/gofrs/uuid" ) @@ -63,6 +64,9 @@ func (m *Mux) Publish(data interface{}, ids ...uuid.UUID) error { if len(ids) == 0 { return errNoIDs } + if atomic.LoadInt32(&m.d.subscriberCount) == 0 { + return nil + } for i := range ids { err := m.d.publish(ids[i], data)