Merge remote-tracking branch 'origin/main' into trig-187f
This commit is contained in:
commit
b33f372085
@ -6,12 +6,23 @@ import (
|
||||
)
|
||||
|
||||
// StartSweeperWithIntervalForTest exposes startSweeperWithInterval to
|
||||
// the external test package. The production code uses StartSweeper
|
||||
// the external test package. The production code uses StartSeper
|
||||
// (which pins the canonical SweepInterval); tests pin a short interval
|
||||
// to exercise the ticker-driven cycle without burning real wall-clock
|
||||
// time. The Go convention `export_test.go` keeps this seam OUT of the
|
||||
// production binary — files ending in _test.go are stripped at build
|
||||
// time, so this re-export only exists during `go test`.
|
||||
func StartSweeperWithIntervalForTest(ctx context.Context, storage Storage, ackRetention, interval time.Duration) {
|
||||
startSweeperWithInterval(ctx, storage, ackRetention, interval)
|
||||
startSweeperWithInterval(ctx, storage, ackRetention, interval, nil)
|
||||
}
|
||||
|
||||
// StartSweeperForTest starts the sweeper and returns a done channel
|
||||
// that is closed exactly once when the loop exits. Tests MUST receive
|
||||
// from done before returning so the goroutine has fully terminated and
|
||||
// the shared metric counters are stable for the next test's baseline
|
||||
// capture (issue #86).
|
||||
func StartSweeperForTest(ctx context.Context, storage Storage, ackRetention time.Duration) chan struct{} {
|
||||
done := make(chan struct{})
|
||||
go startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, done)
|
||||
return done
|
||||
}
|
||||
|
||||
@ -66,15 +66,21 @@ const sweepDeadline = 30 * time.Second
|
||||
// to exercise the ticker-driven sweep path without burning real wall-
|
||||
// clock time.
|
||||
func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) {
|
||||
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval)
|
||||
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil)
|
||||
}
|
||||
|
||||
// startSweeperWithInterval is the test-friendly variant of StartSweeper
|
||||
// — same loop, but the cadence is caller-specified. Production code
|
||||
// should use StartSweeper to keep the SweepInterval constant pinned.
|
||||
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration) {
|
||||
// If done is non-nil it is closed exactly once when the loop exits,
|
||||
// allowing tests to wait for the goroutine to fully terminate before
|
||||
// asserting on shared metric counters (issue #86).
|
||||
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) {
|
||||
if storage == nil {
|
||||
log.Println("pendinguploads sweeper: storage is nil — sweeper disabled")
|
||||
if done != nil {
|
||||
close(done)
|
||||
}
|
||||
return
|
||||
}
|
||||
if ackRetention == 0 {
|
||||
@ -86,6 +92,12 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention
|
||||
)
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
defer func() {
|
||||
log.Println("pendinguploads sweeper: shutdown")
|
||||
if done != nil {
|
||||
close(done)
|
||||
}
|
||||
}()
|
||||
// Run once immediately so a platform restart cleans up any rows
|
||||
// that became eligible while we were down — don't make the
|
||||
// operator wait 5 minutes for the first sweep.
|
||||
@ -93,9 +105,16 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("pendinguploads sweeper: shutdown")
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Guard: ctx may have been cancelled between the ticker firing
|
||||
// and this case being selected (MPMC channel; close-to-ready race).
|
||||
// Calling sweepOnce with an already-cancelled ctx would increment
|
||||
// the error counter on shutdown — polluting the next test's
|
||||
// baseline (issue #86 full-suite failure).
|
||||
if ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
sweepOnce(ctx, storage, ackRetention)
|
||||
}
|
||||
}
|
||||
|
||||
@ -136,7 +136,7 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
if got := store.calls.Load(); got < 1 {
|
||||
t.Errorf("expected at least one immediate sweep, got %d", got)
|
||||
@ -145,6 +145,10 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) {
|
||||
if store.gotRetention.Load() != 3600 {
|
||||
t.Errorf("retention seconds = %d, want 3600", store.gotRetention.Load())
|
||||
}
|
||||
// #86 fix: ensure goroutine has exited before the next test's
|
||||
// metricDelta() baseline capture.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) {
|
||||
@ -152,23 +156,22 @@ func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go pendinguploads.StartSweeper(ctx, store, 0)
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, 0)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
want := int64(pendinguploads.DefaultAckRetention.Seconds())
|
||||
if store.gotRetention.Load() != want {
|
||||
t.Errorf("retention = %d, want default %d", store.gotRetention.Load(), want)
|
||||
}
|
||||
// #86 fix.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestStartSweeper_ContextCancelStopsLoop(t *testing.T) {
|
||||
store := newFakeSweepStorage([]pendinguploads.SweepResult{{}}, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
pendinguploads.StartSweeper(ctx, store, time.Second)
|
||||
close(done)
|
||||
}()
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Second)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
cancel()
|
||||
|
||||
@ -187,14 +190,17 @@ func TestStartSweeperWithInterval_TickerFiresAdditionalCycles(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go pendinguploads.StartSweeperWithIntervalForTest(ctx, store, time.Hour, 30*time.Millisecond)
|
||||
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
|
||||
// Immediate cycle + at least one tick-driven cycle.
|
||||
store.waitForCycle(t, 2, 2*time.Second)
|
||||
|
||||
if got := store.calls.Load(); got < 2 {
|
||||
t.Errorf("expected ≥2 cycles (immediate + 1 tick), got %d", got)
|
||||
}
|
||||
// #86 fix: drain the done channel so the goroutine is fully gone
|
||||
// before the next test's metricDelta() baseline capture.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
|
||||
@ -217,7 +223,7 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
|
||||
// waitForCycle is too early.
|
||||
_, _, deltaError := metricDelta(t)
|
||||
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
|
||||
|
||||
// Wait for the first (errored) cycle.
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
@ -226,11 +232,13 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
|
||||
// stops the loop on the next select pass with no in-flight metric
|
||||
// writes outstanding.
|
||||
waitForMetricDelta(t, deltaError, 1, 2*time.Second)
|
||||
// Cancel — the goroutine returns cleanly, proving the error path
|
||||
// didn't crash the loop. Without this fix the goroutine would have
|
||||
// either panicked (process abort visible at exit) or stuck (this
|
||||
// cancel + done-channel pattern would deadlock instead).
|
||||
// Cancel and wait for the goroutine to fully exit (#86 fix).
|
||||
// Without the done-channel wait, the goroutine races with the next
|
||||
// test's metricDelta() baseline capture — the next test may see
|
||||
// error=1 from this test still "in flight", throwing off its
|
||||
// deltaError assertion.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
// metricDelta returns a function that, when called, returns how much
|
||||
@ -265,7 +273,7 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
|
||||
// Poll for the success counters to settle — closes the cycleDone-
|
||||
@ -278,6 +286,15 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) {
|
||||
if got := deltaError(); got != 0 {
|
||||
t.Errorf("error counter delta = %d, want 0", got)
|
||||
}
|
||||
|
||||
// #86 fix: drain the done channel so the goroutine is fully gone
|
||||
// before the next test's metricDelta() baseline capture. Without this
|
||||
// the previous test's goroutine could still be mid-Sweep (blocked on
|
||||
// the fake's results channel) and its eventual return would mutate
|
||||
// the shared error/acked counters after the next test has already
|
||||
// snapshot its baseline.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
|
||||
@ -290,7 +307,7 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
|
||||
// Poll for the error counter to settle — cycleDone fires inside
|
||||
@ -300,4 +317,9 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
|
||||
// though the metric WILL be 1 a few ms later. See
|
||||
// waitForMetricDelta comment.
|
||||
waitForMetricDelta(t, deltaError, 1, 2*time.Second)
|
||||
|
||||
// #86 fix: ensure the goroutine has fully exited before the next
|
||||
// test's metricDelta() baseline capture.
|
||||
cancel()
|
||||
<-done
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user