diff --git a/workspace-server/internal/pendinguploads/export_test.go b/workspace-server/internal/pendinguploads/export_test.go index c758b629..b34d655d 100644 --- a/workspace-server/internal/pendinguploads/export_test.go +++ b/workspace-server/internal/pendinguploads/export_test.go @@ -6,12 +6,23 @@ import ( ) // StartSweeperWithIntervalForTest exposes startSweeperWithInterval to -// the external test package. The production code uses StartSweeper +// the external test package. The production code uses StartSeper // (which pins the canonical SweepInterval); tests pin a short interval // to exercise the ticker-driven cycle without burning real wall-clock // time. The Go convention `export_test.go` keeps this seam OUT of the // production binary — files ending in _test.go are stripped at build // time, so this re-export only exists during `go test`. func StartSweeperWithIntervalForTest(ctx context.Context, storage Storage, ackRetention, interval time.Duration) { - startSweeperWithInterval(ctx, storage, ackRetention, interval) + startSweeperWithInterval(ctx, storage, ackRetention, interval, nil) +} + +// StartSweeperForTest starts the sweeper and returns a done channel +// that is closed exactly once when the loop exits. Tests MUST receive +// from done before returning so the goroutine has fully terminated and +// the shared metric counters are stable for the next test's baseline +// capture (issue #86). +func StartSweeperForTest(ctx context.Context, storage Storage, ackRetention time.Duration) chan struct{} { + done := make(chan struct{}) + go startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, done) + return done } diff --git a/workspace-server/internal/pendinguploads/sweeper.go b/workspace-server/internal/pendinguploads/sweeper.go index b29a87ad..31a1920c 100644 --- a/workspace-server/internal/pendinguploads/sweeper.go +++ b/workspace-server/internal/pendinguploads/sweeper.go @@ -66,15 +66,21 @@ const sweepDeadline = 30 * time.Second // to exercise the ticker-driven sweep path without burning real wall- // clock time. func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) { - startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval) + startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil) } // startSweeperWithInterval is the test-friendly variant of StartSweeper // — same loop, but the cadence is caller-specified. Production code // should use StartSweeper to keep the SweepInterval constant pinned. -func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration) { +// If done is non-nil it is closed exactly once when the loop exits, +// allowing tests to wait for the goroutine to fully terminate before +// asserting on shared metric counters (issue #86). +func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) { if storage == nil { log.Println("pendinguploads sweeper: storage is nil — sweeper disabled") + if done != nil { + close(done) + } return } if ackRetention == 0 { @@ -86,6 +92,12 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention ) ticker := time.NewTicker(interval) defer ticker.Stop() + defer func() { + log.Println("pendinguploads sweeper: shutdown") + if done != nil { + close(done) + } + }() // Run once immediately so a platform restart cleans up any rows // that became eligible while we were down — don't make the // operator wait 5 minutes for the first sweep. @@ -93,9 +105,16 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention for { select { case <-ctx.Done(): - log.Println("pendinguploads sweeper: shutdown") return case <-ticker.C: + // Guard: ctx may have been cancelled between the ticker firing + // and this case being selected (MPMC channel; close-to-ready race). + // Calling sweepOnce with an already-cancelled ctx would increment + // the error counter on shutdown — polluting the next test's + // baseline (issue #86 full-suite failure). + if ctx.Err() != nil { + continue + } sweepOnce(ctx, storage, ackRetention) } } diff --git a/workspace-server/internal/pendinguploads/sweeper_test.go b/workspace-server/internal/pendinguploads/sweeper_test.go index 8095e83d..b1a723a6 100644 --- a/workspace-server/internal/pendinguploads/sweeper_test.go +++ b/workspace-server/internal/pendinguploads/sweeper_test.go @@ -136,7 +136,7 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) if got := store.calls.Load(); got < 1 { t.Errorf("expected at least one immediate sweep, got %d", got) @@ -145,6 +145,10 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) { if store.gotRetention.Load() != 3600 { t.Errorf("retention seconds = %d, want 3600", store.gotRetention.Load()) } + // #86 fix: ensure goroutine has exited before the next test's + // metricDelta() baseline capture. + cancel() + <-done } func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) { @@ -152,23 +156,22 @@ func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, 0) + done := pendinguploads.StartSweeperForTest(ctx, store, 0) store.waitForCycle(t, 1, 2*time.Second) want := int64(pendinguploads.DefaultAckRetention.Seconds()) if store.gotRetention.Load() != want { t.Errorf("retention = %d, want default %d", store.gotRetention.Load(), want) } + // #86 fix. + cancel() + <-done } func TestStartSweeper_ContextCancelStopsLoop(t *testing.T) { store := newFakeSweepStorage([]pendinguploads.SweepResult{{}}, nil) ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - pendinguploads.StartSweeper(ctx, store, time.Second) - close(done) - }() + done := pendinguploads.StartSweeperForTest(ctx, store, time.Second) store.waitForCycle(t, 1, 2*time.Second) cancel() @@ -187,14 +190,17 @@ func TestStartSweeperWithInterval_TickerFiresAdditionalCycles(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeperWithIntervalForTest(ctx, store, time.Hour, 30*time.Millisecond) - + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) // Immediate cycle + at least one tick-driven cycle. store.waitForCycle(t, 2, 2*time.Second) if got := store.calls.Load(); got < 2 { t.Errorf("expected ≥2 cycles (immediate + 1 tick), got %d", got) } + // #86 fix: drain the done channel so the goroutine is fully gone + // before the next test's metricDelta() baseline capture. + cancel() + <-done } func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { @@ -217,7 +223,7 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { // waitForCycle is too early. _, _, deltaError := metricDelta(t) - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) // Wait for the first (errored) cycle. store.waitForCycle(t, 1, 2*time.Second) @@ -226,11 +232,13 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { // stops the loop on the next select pass with no in-flight metric // writes outstanding. waitForMetricDelta(t, deltaError, 1, 2*time.Second) - // Cancel — the goroutine returns cleanly, proving the error path - // didn't crash the loop. Without this fix the goroutine would have - // either panicked (process abort visible at exit) or stuck (this - // cancel + done-channel pattern would deadlock instead). + // Cancel and wait for the goroutine to fully exit (#86 fix). + // Without the done-channel wait, the goroutine races with the next + // test's metricDelta() baseline capture — the next test may see + // error=1 from this test still "in flight", throwing off its + // deltaError assertion. cancel() + <-done } // metricDelta returns a function that, when called, returns how much @@ -265,7 +273,7 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) // Poll for the success counters to settle — closes the cycleDone- @@ -278,6 +286,15 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) { if got := deltaError(); got != 0 { t.Errorf("error counter delta = %d, want 0", got) } + + // #86 fix: drain the done channel so the goroutine is fully gone + // before the next test's metricDelta() baseline capture. Without this + // the previous test's goroutine could still be mid-Sweep (blocked on + // the fake's results channel) and its eventual return would mutate + // the shared error/acked counters after the next test has already + // snapshot its baseline. + cancel() + <-done } func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { @@ -290,7 +307,7 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) // Poll for the error counter to settle — cycleDone fires inside @@ -300,4 +317,9 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { // though the metric WILL be 1 a few ms later. See // waitForMetricDelta comment. waitForMetricDelta(t, deltaError, 1, 2*time.Second) + + // #86 fix: ensure the goroutine has fully exited before the next + // test's metricDelta() baseline capture. + cancel() + <-done }