dispatch: Fix intermittent TestMuxPublish issue (#1412)

* Dispatch: Assertify TestMuxPublish

* Dispatch: Fix errDispatcherJobsAtLimit test fails

This test would fail intermittently when the jobs queue drained quickly
enough.
This sets the overload ceiling based on the default settings, and seems
a safe way of ensuring we get an error every time.

It adds a done channel guard around the goro test because otherwise
we'll get a panic occassionally when the goro outlives TestMaxPublish

* Dispatch: Add test for Publish receiving data

* Dispatch: Publish to all subscribers
This commit is contained in:
Gareth Kirwan
2023-12-28 04:54:36 +01:00
committed by GitHub
parent 0a9fdbcdad
commit 9986e80e2d
2 changed files with 42 additions and 49 deletions

View File

@@ -180,11 +180,9 @@ func (d *Dispatcher) relayer() {
continue
}
for i := range pipes {
select {
case pipes[i] <- j.Data:
default:
// no receiver; don't wait. This limits complexity.
}
go func(p chan any) {
p <- j.Data
}(pipes[i])
}
d.rMtx.RUnlock()
case <-d.shutdown:

View File

@@ -3,10 +3,13 @@ package dispatch
import (
"errors"
"fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
)
var (
@@ -485,78 +488,70 @@ func TestMuxPublish(t *testing.T) {
t.Parallel()
d := NewDispatcher()
err := d.start(0, 0)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
assert.NoError(t, err, "start should not error")
mux := GetNewMux(d)
itemID, err := mux.GetID()
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "GetID should not error")
// demonstrate that jobs do not get published when the limit should be reached
// but there is no listener associated with job
for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
overloadCeiling := DefaultMaxWorkers * DefaultJobsLimit * 2
for i := 0; i < overloadCeiling; i++ {
err = mux.Publish("test", itemID)
if !assert.NoError(t, err, "Publish should not error when over limit but no listeners") {
break
}
}
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
ready := make(chan any)
demux := make(chan any, 1)
pipe, err := mux.Subscribe(itemID)
if err != nil {
t.Error(err)
}
assert.NoError(t, err, "Subscribe should not error")
go func(mux *Mux) {
// Subscribers must be actively selecting in order to receive anything
go func() {
close(ready)
i := <-pipe.c
demux <- i
close(demux)
}()
go func() {
<-ready // Ensure listener is ready before starting
for i := 0; i < 100; i++ {
errMux := mux.Publish(i, itemID)
if errMux != nil {
t.Error(errMux)
if !assert.NoError(t, errMux, "Publish should not error within limits") {
return
}
}
}(mux)
}()
<-pipe.Channel()
assert.Eventually(t, func() bool { return len(demux) >= 1 }, time.Second, time.Millisecond*10, "Subscriber should eventually get at least one message")
// demonstrate that jobs can be limited when subscribed
for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
// Published data gets consumed from .jobs to the worker channels, so we're looking to push more than it's consumed and prevent the select reading them too quickly
runtime.LockOSThread()
for i := 0; i < overloadCeiling; i++ {
if err = mux.Publish("test", itemID); err != nil {
break
}
}
if !errors.Is(err, errDispatcherJobsAtLimit) {
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit)
}
assert.ErrorIs(t, err, errDispatcherJobsAtLimit, "Publish should error when more published than expected")
runtime.UnlockOSThread()
// demonstrate that jobs go back to not being sent after unsubscribing
err = mux.Unsubscribe(itemID, pipe.c)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
assert.NoError(t, err, "Unsubscribe should not error")
for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
for i := 0; i < overloadCeiling; i++ {
if err = mux.Publish("test", itemID); err != nil {
break
}
}
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
assert.NoError(t, err, "Publish should not error after Unsubscribe when over limit")
// Shut down dispatch system
err = d.stop()
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "stop should not error")
}
// 13636467 84.26 ns/op 141 B/op 1 allocs/op