diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 323f196d..ec5f1df4 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -180,11 +180,9 @@ func (d *Dispatcher) relayer() { continue } for i := range pipes { - select { - case pipes[i] <- j.Data: - default: - // no receiver; don't wait. This limits complexity. - } + go func(p chan any) { + p <- j.Data + }(pipes[i]) } d.rMtx.RUnlock() case <-d.shutdown: diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 81443ec1..472a8640 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -3,10 +3,13 @@ package dispatch import ( "errors" "fmt" + "runtime" "sync" "testing" + "time" "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" ) var ( @@ -485,78 +488,70 @@ func TestMuxPublish(t *testing.T) { t.Parallel() d := NewDispatcher() err := d.start(0, 0) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected: '%v'", err, nil) - } + assert.NoError(t, err, "start should not error") + mux := GetNewMux(d) itemID, err := mux.GetID() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err, "GetID should not error") - // demonstrate that jobs do not get published when the limit should be reached - // but there is no listener associated with job - for x := 0; x < 200; x++ { - err2 := mux.Publish("test", itemID) - if !errors.Is(err2, nil) { - err = err2 + overloadCeiling := DefaultMaxWorkers * DefaultJobsLimit * 2 + + for i := 0; i < overloadCeiling; i++ { + err = mux.Publish("test", itemID) + if !assert.NoError(t, err, "Publish should not error when over limit but no listeners") { break } } - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected: '%v'", err, nil) - } + ready := make(chan any) + demux := make(chan any, 1) pipe, err := mux.Subscribe(itemID) - if err != nil { - t.Error(err) - } + assert.NoError(t, err, "Subscribe should not error") - go func(mux *Mux) { + // Subscribers must be actively selecting in order to receive anything + go func() { + close(ready) + i := <-pipe.c + demux <- i + close(demux) + }() + + go func() { + <-ready // Ensure listener is ready before starting for i := 0; i < 100; i++ { errMux := mux.Publish(i, itemID) - if errMux != nil { - t.Error(errMux) + if !assert.NoError(t, errMux, "Publish should not error within limits") { + return } } - }(mux) + }() - <-pipe.Channel() + assert.Eventually(t, func() bool { return len(demux) >= 1 }, time.Second, time.Millisecond*10, "Subscriber should eventually get at least one message") // demonstrate that jobs can be limited when subscribed - for x := 0; x < 200; x++ { - err2 := mux.Publish("test", itemID) - if !errors.Is(err2, nil) { - err = err2 + // Published data gets consumed from .jobs to the worker channels, so we're looking to push more than it's consumed and prevent the select reading them too quickly + runtime.LockOSThread() + for i := 0; i < overloadCeiling; i++ { + if err = mux.Publish("test", itemID); err != nil { break } } - if !errors.Is(err, errDispatcherJobsAtLimit) { - t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit) - } + assert.ErrorIs(t, err, errDispatcherJobsAtLimit, "Publish should error when more published than expected") + runtime.UnlockOSThread() - // demonstrate that jobs go back to not being sent after unsubscribing err = mux.Unsubscribe(itemID, pipe.c) - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected: '%v'", err, nil) - } + assert.NoError(t, err, "Unsubscribe should not error") - for x := 0; x < 200; x++ { - err2 := mux.Publish("test", itemID) - if !errors.Is(err2, nil) { - err = err2 + for i := 0; i < overloadCeiling; i++ { + if err = mux.Publish("test", itemID); err != nil { break } } - if !errors.Is(err, nil) { - t.Fatalf("received: '%v' but expected: '%v'", err, nil) - } + assert.NoError(t, err, "Publish should not error after Unsubscribe when over limit") // Shut down dispatch system err = d.stop() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err, "stop should not error") } // 13636467 84.26 ns/op 141 B/op 1 allocs/op