From 2077cf4054b7d47cfd03b7402b52959d32f7fd93 Mon Sep 17 00:00:00 2001 From: Molecule AI Core-BE Date: Sat, 9 May 2026 22:30:03 +0000 Subject: [PATCH 1/3] [core-be-agent] fix(pendinguploads/test): correct sweeper test isolation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #86: TestStartSweeper_RecordsMetricsOnSuccess fails in full-suite. Root cause: two cooperating bugs in the sweeper test harness. 1. Sweeper loop called sweepOnce after ctx cancellation (double-increment). When ctx was cancelled the loop's select received ctx.Done(), called sweepOnce with the cancelled ctx, storage.Sweep returned context error, and metrics.PendingUploadsSweepError() incremented the error counter a SECOND time before the loop exited. Subsequent tests captured a polluted error baseline and their deltaError assertions failed. 2. Tests called defer cancel() without waiting for the goroutine to exit. The goroutine could still be blocked on Sweep (waiting for the next ticker's C channel) when the next test called metricDelta(). If the goroutine's Sweep returned during the next test's measurement window, the shared metric counters mutated mid-baseline. Fix (production code): - Guard the ticker arm: if ctx.Err() != nil, continue instead of calling sweepOnce. This prevents the post-cancellation sweep from running. Fix (test harness): - startSweeperWithInterval gains a done chan struct{} parameter. When the loop exits the channel is closed exactly once. - StartSweeperForTest starts the goroutine and returns the done channel, allowing tests to drain it with <-done after cancel() — guaranteeing the goroutine has fully terminated before the next test's baseline. All 8 sweeper tests now use StartSweeperForTest and drain the done channel before returning, ensuring stable metric baselines across the full suite. Co-Authored-By: Claude Opus 4.7 --- .../internal/pendinguploads/export_test.go | 15 +++++- .../internal/pendinguploads/sweeper.go | 25 +++++++-- .../internal/pendinguploads/sweeper_test.go | 54 +++++++++++++------ 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/workspace-server/internal/pendinguploads/export_test.go b/workspace-server/internal/pendinguploads/export_test.go index c758b629..b34d655d 100644 --- a/workspace-server/internal/pendinguploads/export_test.go +++ b/workspace-server/internal/pendinguploads/export_test.go @@ -6,12 +6,23 @@ import ( ) // StartSweeperWithIntervalForTest exposes startSweeperWithInterval to -// the external test package. The production code uses StartSweeper +// the external test package. The production code uses StartSeper // (which pins the canonical SweepInterval); tests pin a short interval // to exercise the ticker-driven cycle without burning real wall-clock // time. The Go convention `export_test.go` keeps this seam OUT of the // production binary — files ending in _test.go are stripped at build // time, so this re-export only exists during `go test`. func StartSweeperWithIntervalForTest(ctx context.Context, storage Storage, ackRetention, interval time.Duration) { - startSweeperWithInterval(ctx, storage, ackRetention, interval) + startSweeperWithInterval(ctx, storage, ackRetention, interval, nil) +} + +// StartSweeperForTest starts the sweeper and returns a done channel +// that is closed exactly once when the loop exits. Tests MUST receive +// from done before returning so the goroutine has fully terminated and +// the shared metric counters are stable for the next test's baseline +// capture (issue #86). +func StartSweeperForTest(ctx context.Context, storage Storage, ackRetention time.Duration) chan struct{} { + done := make(chan struct{}) + go startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, done) + return done } diff --git a/workspace-server/internal/pendinguploads/sweeper.go b/workspace-server/internal/pendinguploads/sweeper.go index b29a87ad..31a1920c 100644 --- a/workspace-server/internal/pendinguploads/sweeper.go +++ b/workspace-server/internal/pendinguploads/sweeper.go @@ -66,15 +66,21 @@ const sweepDeadline = 30 * time.Second // to exercise the ticker-driven sweep path without burning real wall- // clock time. func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) { - startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval) + startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil) } // startSweeperWithInterval is the test-friendly variant of StartSweeper // — same loop, but the cadence is caller-specified. Production code // should use StartSweeper to keep the SweepInterval constant pinned. -func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration) { +// If done is non-nil it is closed exactly once when the loop exits, +// allowing tests to wait for the goroutine to fully terminate before +// asserting on shared metric counters (issue #86). +func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) { if storage == nil { log.Println("pendinguploads sweeper: storage is nil — sweeper disabled") + if done != nil { + close(done) + } return } if ackRetention == 0 { @@ -86,6 +92,12 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention ) ticker := time.NewTicker(interval) defer ticker.Stop() + defer func() { + log.Println("pendinguploads sweeper: shutdown") + if done != nil { + close(done) + } + }() // Run once immediately so a platform restart cleans up any rows // that became eligible while we were down — don't make the // operator wait 5 minutes for the first sweep. @@ -93,9 +105,16 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention for { select { case <-ctx.Done(): - log.Println("pendinguploads sweeper: shutdown") return case <-ticker.C: + // Guard: ctx may have been cancelled between the ticker firing + // and this case being selected (MPMC channel; close-to-ready race). + // Calling sweepOnce with an already-cancelled ctx would increment + // the error counter on shutdown — polluting the next test's + // baseline (issue #86 full-suite failure). + if ctx.Err() != nil { + continue + } sweepOnce(ctx, storage, ackRetention) } } diff --git a/workspace-server/internal/pendinguploads/sweeper_test.go b/workspace-server/internal/pendinguploads/sweeper_test.go index 8095e83d..b1a723a6 100644 --- a/workspace-server/internal/pendinguploads/sweeper_test.go +++ b/workspace-server/internal/pendinguploads/sweeper_test.go @@ -136,7 +136,7 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) if got := store.calls.Load(); got < 1 { t.Errorf("expected at least one immediate sweep, got %d", got) @@ -145,6 +145,10 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) { if store.gotRetention.Load() != 3600 { t.Errorf("retention seconds = %d, want 3600", store.gotRetention.Load()) } + // #86 fix: ensure goroutine has exited before the next test's + // metricDelta() baseline capture. + cancel() + <-done } func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) { @@ -152,23 +156,22 @@ func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, 0) + done := pendinguploads.StartSweeperForTest(ctx, store, 0) store.waitForCycle(t, 1, 2*time.Second) want := int64(pendinguploads.DefaultAckRetention.Seconds()) if store.gotRetention.Load() != want { t.Errorf("retention = %d, want default %d", store.gotRetention.Load(), want) } + // #86 fix. + cancel() + <-done } func TestStartSweeper_ContextCancelStopsLoop(t *testing.T) { store := newFakeSweepStorage([]pendinguploads.SweepResult{{}}, nil) ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - pendinguploads.StartSweeper(ctx, store, time.Second) - close(done) - }() + done := pendinguploads.StartSweeperForTest(ctx, store, time.Second) store.waitForCycle(t, 1, 2*time.Second) cancel() @@ -187,14 +190,17 @@ func TestStartSweeperWithInterval_TickerFiresAdditionalCycles(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeperWithIntervalForTest(ctx, store, time.Hour, 30*time.Millisecond) - + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) // Immediate cycle + at least one tick-driven cycle. store.waitForCycle(t, 2, 2*time.Second) if got := store.calls.Load(); got < 2 { t.Errorf("expected ≥2 cycles (immediate + 1 tick), got %d", got) } + // #86 fix: drain the done channel so the goroutine is fully gone + // before the next test's metricDelta() baseline capture. + cancel() + <-done } func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { @@ -217,7 +223,7 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { // waitForCycle is too early. _, _, deltaError := metricDelta(t) - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) // Wait for the first (errored) cycle. store.waitForCycle(t, 1, 2*time.Second) @@ -226,11 +232,13 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) { // stops the loop on the next select pass with no in-flight metric // writes outstanding. waitForMetricDelta(t, deltaError, 1, 2*time.Second) - // Cancel — the goroutine returns cleanly, proving the error path - // didn't crash the loop. Without this fix the goroutine would have - // either panicked (process abort visible at exit) or stuck (this - // cancel + done-channel pattern would deadlock instead). + // Cancel and wait for the goroutine to fully exit (#86 fix). + // Without the done-channel wait, the goroutine races with the next + // test's metricDelta() baseline capture — the next test may see + // error=1 from this test still "in flight", throwing off its + // deltaError assertion. cancel() + <-done } // metricDelta returns a function that, when called, returns how much @@ -265,7 +273,7 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) // Poll for the success counters to settle — closes the cycleDone- @@ -278,6 +286,15 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) { if got := deltaError(); got != 0 { t.Errorf("error counter delta = %d, want 0", got) } + + // #86 fix: drain the done channel so the goroutine is fully gone + // before the next test's metricDelta() baseline capture. Without this + // the previous test's goroutine could still be mid-Sweep (blocked on + // the fake's results channel) and its eventual return would mutate + // the shared error/acked counters after the next test has already + // snapshot its baseline. + cancel() + <-done } func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { @@ -290,7 +307,7 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go pendinguploads.StartSweeper(ctx, store, time.Hour) + done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour) store.waitForCycle(t, 1, 2*time.Second) // Poll for the error counter to settle — cycleDone fires inside @@ -300,4 +317,9 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) { // though the metric WILL be 1 a few ms later. See // waitForMetricDelta comment. waitForMetricDelta(t, deltaError, 1, 2*time.Second) + + // #86 fix: ensure the goroutine has fully exited before the next + // test's metricDelta() baseline capture. + cancel() + <-done } From 281b1493f89570687cff743a193050141ca3df1a Mon Sep 17 00:00:00 2001 From: Molecule AI Core Platform Lead Date: Sat, 9 May 2026 22:40:45 +0000 Subject: [PATCH 2/3] trigger: re-run sop-tier-check after core-lead approval + main sync From 1d644f451d9e4af350849d9ac15cd685d97bd163 Mon Sep 17 00:00:00 2001 From: Molecule AI Core Platform Lead Date: Sat, 9 May 2026 22:50:00 +0000 Subject: [PATCH 3/3] trigger: re-run sop-tier-check after core-lead approval + main sync