Merge remote-tracking branch 'origin/main' into trig-190

This commit is contained in:
Molecule AI Core Platform Lead 2026-05-09 22:58:08 +00:00
commit a8df558909
3 changed files with 73 additions and 21 deletions

View File

@ -6,12 +6,23 @@ import (
)
// StartSweeperWithIntervalForTest exposes startSweeperWithInterval to
// the external test package. The production code uses StartSweeper
// the external test package. The production code uses StartSeper
// (which pins the canonical SweepInterval); tests pin a short interval
// to exercise the ticker-driven cycle without burning real wall-clock
// time. The Go convention `export_test.go` keeps this seam OUT of the
// production binary — files ending in _test.go are stripped at build
// time, so this re-export only exists during `go test`.
func StartSweeperWithIntervalForTest(ctx context.Context, storage Storage, ackRetention, interval time.Duration) {
startSweeperWithInterval(ctx, storage, ackRetention, interval)
startSweeperWithInterval(ctx, storage, ackRetention, interval, nil)
}
// StartSweeperForTest starts the sweeper and returns a done channel
// that is closed exactly once when the loop exits. Tests MUST receive
// from done before returning so the goroutine has fully terminated and
// the shared metric counters are stable for the next test's baseline
// capture (issue #86).
func StartSweeperForTest(ctx context.Context, storage Storage, ackRetention time.Duration) chan struct{} {
done := make(chan struct{})
go startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, done)
return done
}

View File

@ -66,15 +66,21 @@ const sweepDeadline = 30 * time.Second
// to exercise the ticker-driven sweep path without burning real wall-
// clock time.
func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) {
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval)
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil)
}
// startSweeperWithInterval is the test-friendly variant of StartSweeper
// — same loop, but the cadence is caller-specified. Production code
// should use StartSweeper to keep the SweepInterval constant pinned.
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration) {
// If done is non-nil it is closed exactly once when the loop exits,
// allowing tests to wait for the goroutine to fully terminate before
// asserting on shared metric counters (issue #86).
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) {
if storage == nil {
log.Println("pendinguploads sweeper: storage is nil — sweeper disabled")
if done != nil {
close(done)
}
return
}
if ackRetention == 0 {
@ -86,6 +92,12 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention
)
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer func() {
log.Println("pendinguploads sweeper: shutdown")
if done != nil {
close(done)
}
}()
// Run once immediately so a platform restart cleans up any rows
// that became eligible while we were down — don't make the
// operator wait 5 minutes for the first sweep.
@ -93,9 +105,16 @@ func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention
for {
select {
case <-ctx.Done():
log.Println("pendinguploads sweeper: shutdown")
return
case <-ticker.C:
// Guard: ctx may have been cancelled between the ticker firing
// and this case being selected (MPMC channel; close-to-ready race).
// Calling sweepOnce with an already-cancelled ctx would increment
// the error counter on shutdown — polluting the next test's
// baseline (issue #86 full-suite failure).
if ctx.Err() != nil {
continue
}
sweepOnce(ctx, storage, ackRetention)
}
}

View File

@ -136,7 +136,7 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pendinguploads.StartSweeper(ctx, store, time.Hour)
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
store.waitForCycle(t, 1, 2*time.Second)
if got := store.calls.Load(); got < 1 {
t.Errorf("expected at least one immediate sweep, got %d", got)
@ -145,6 +145,10 @@ func TestStartSweeper_RunsImmediatelyAndOnTick(t *testing.T) {
if store.gotRetention.Load() != 3600 {
t.Errorf("retention seconds = %d, want 3600", store.gotRetention.Load())
}
// #86 fix: ensure goroutine has exited before the next test's
// metricDelta() baseline capture.
cancel()
<-done
}
func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) {
@ -152,23 +156,22 @@ func TestStartSweeper_ZeroAckRetentionUsesDefault(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pendinguploads.StartSweeper(ctx, store, 0)
done := pendinguploads.StartSweeperForTest(ctx, store, 0)
store.waitForCycle(t, 1, 2*time.Second)
want := int64(pendinguploads.DefaultAckRetention.Seconds())
if store.gotRetention.Load() != want {
t.Errorf("retention = %d, want default %d", store.gotRetention.Load(), want)
}
// #86 fix.
cancel()
<-done
}
func TestStartSweeper_ContextCancelStopsLoop(t *testing.T) {
store := newFakeSweepStorage([]pendinguploads.SweepResult{{}}, nil)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
pendinguploads.StartSweeper(ctx, store, time.Second)
close(done)
}()
done := pendinguploads.StartSweeperForTest(ctx, store, time.Second)
store.waitForCycle(t, 1, 2*time.Second)
cancel()
@ -187,14 +190,17 @@ func TestStartSweeperWithInterval_TickerFiresAdditionalCycles(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pendinguploads.StartSweeperWithIntervalForTest(ctx, store, time.Hour, 30*time.Millisecond)
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
// Immediate cycle + at least one tick-driven cycle.
store.waitForCycle(t, 2, 2*time.Second)
if got := store.calls.Load(); got < 2 {
t.Errorf("expected ≥2 cycles (immediate + 1 tick), got %d", got)
}
// #86 fix: drain the done channel so the goroutine is fully gone
// before the next test's metricDelta() baseline capture.
cancel()
<-done
}
func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
@ -217,7 +223,7 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
// waitForCycle is too early.
_, _, deltaError := metricDelta(t)
go pendinguploads.StartSweeper(ctx, store, time.Hour)
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
// Wait for the first (errored) cycle.
store.waitForCycle(t, 1, 2*time.Second)
@ -226,11 +232,13 @@ func TestStartSweeper_TransientErrorDoesNotCrashLoop(t *testing.T) {
// stops the loop on the next select pass with no in-flight metric
// writes outstanding.
waitForMetricDelta(t, deltaError, 1, 2*time.Second)
// Cancel — the goroutine returns cleanly, proving the error path
// didn't crash the loop. Without this fix the goroutine would have
// either panicked (process abort visible at exit) or stuck (this
// cancel + done-channel pattern would deadlock instead).
// Cancel and wait for the goroutine to fully exit (#86 fix).
// Without the done-channel wait, the goroutine races with the next
// test's metricDelta() baseline capture — the next test may see
// error=1 from this test still "in flight", throwing off its
// deltaError assertion.
cancel()
<-done
}
// metricDelta returns a function that, when called, returns how much
@ -265,7 +273,7 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pendinguploads.StartSweeper(ctx, store, time.Hour)
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
store.waitForCycle(t, 1, 2*time.Second)
// Poll for the success counters to settle — closes the cycleDone-
@ -278,6 +286,15 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) {
if got := deltaError(); got != 0 {
t.Errorf("error counter delta = %d, want 0", got)
}
// #86 fix: drain the done channel so the goroutine is fully gone
// before the next test's metricDelta() baseline capture. Without this
// the previous test's goroutine could still be mid-Sweep (blocked on
// the fake's results channel) and its eventual return would mutate
// the shared error/acked counters after the next test has already
// snapshot its baseline.
cancel()
<-done
}
func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
@ -290,7 +307,7 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pendinguploads.StartSweeper(ctx, store, time.Hour)
done := pendinguploads.StartSweeperForTest(ctx, store, time.Hour)
store.waitForCycle(t, 1, 2*time.Second)
// Poll for the error counter to settle — cycleDone fires inside
@ -300,4 +317,9 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
// though the metric WILL be 1 a few ms later. See
// waitForMetricDelta comment.
waitForMetricDelta(t, deltaError, 1, 2*time.Second)
// #86 fix: ensure the goroutine has fully exited before the next
// test's metricDelta() baseline capture.
cancel()
<-done
}