forked from molecule-ai/molecule-core
Merge pull request #2902 from Molecule-AI/fix-pendinguploads-sweeper-test-race
test(pendinguploads): close cycleDone-vs-metric-record race in sweeper tests
This commit is contained in:
commit
f5613bf099
@ -65,6 +65,15 @@ func (f *fakeSweepStorage) Sweep(_ context.Context, ackRetention time.Duration)
|
||||
|
||||
// waitForCycle blocks until at least one Sweep completes, with a deadline.
|
||||
// Tests use this instead of time.Sleep to avoid flakes on slow CI hosts.
|
||||
//
|
||||
// CAVEAT: cycleDone fires from inside fakeSweepStorage.Sweep's defer,
|
||||
// which runs as Sweep returns its result — BEFORE the StartSweeper
|
||||
// loop has processed the (result, error) tuple and called the
|
||||
// metric recorders. Tests that assert on metric counters must NOT
|
||||
// rely on this wait alone; use waitForMetricDelta instead so the
|
||||
// metric increment race (Sweep returns → cycleDone fires → test
|
||||
// reads counter → only then does StartSweeper's loop call
|
||||
// metrics.PendingUploadsSweepError) doesn't produce a flake.
|
||||
func (f *fakeSweepStorage) waitForCycle(t *testing.T, n int, timeout time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.NewTimer(timeout)
|
||||
@ -78,6 +87,33 @@ func (f *fakeSweepStorage) waitForCycle(t *testing.T, n int, timeout time.Durati
|
||||
}
|
||||
}
|
||||
|
||||
// waitForMetricDelta polls the supplied delta function until it returns
|
||||
// `want` or the timeout elapses. Use after waitForCycle when the test
|
||||
// asserts on a metric counter — closes the race between cycleDone
|
||||
// (signalled inside fakeSweepStorage.Sweep's defer, BEFORE Sweep
|
||||
// returns to StartSweeper) and the metric recording (which happens in
|
||||
// StartSweeper's loop AFTER Sweep returns). On a slow CI host the test
|
||||
// goroutine wins the read before StartSweeper's goroutine writes the
|
||||
// counter; the polling assert preserves the determinism of "the metric
|
||||
// MUST be N" without timing-based flakes.
|
||||
//
|
||||
// Per memory feedback_question_test_when_unexpected.md: the failure
|
||||
// mode "delta=0, want=1" looked like a real bug at first glance —
|
||||
// "metric never incremented" — but instrumented analysis showed the
|
||||
// metric DID increment, just AFTER the test's read. The fix is the
|
||||
// test's wait shape, not the production code.
|
||||
func waitForMetricDelta(t *testing.T, delta func() int64, want int64, timeout time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if delta() == want {
|
||||
return
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("waited %s for metric delta=%d, last seen %d", timeout, want, delta())
|
||||
}
|
||||
|
||||
func TestStartSweeper_NilStorageDoesNotPanic(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -220,12 +256,13 @@ func TestStartSweeper_RecordsMetricsOnSuccess(t *testing.T) {
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
|
||||
if got := deltaAcked(); got != 3 {
|
||||
t.Errorf("acked counter delta = %d, want 3", got)
|
||||
}
|
||||
if got := deltaExpired(); got != 5 {
|
||||
t.Errorf("expired counter delta = %d, want 5", got)
|
||||
}
|
||||
// Poll for the success counters to settle — closes the cycleDone-
|
||||
// vs-metric-record race (see waitForMetricDelta comment).
|
||||
waitForMetricDelta(t, deltaAcked, 3, 2*time.Second)
|
||||
waitForMetricDelta(t, deltaExpired, 5, 2*time.Second)
|
||||
// Error counter MUST stay at zero on the success path. Read after
|
||||
// the success counters have settled — once those are correct,
|
||||
// StartSweeper has fully processed this cycle's result.
|
||||
if got := deltaError(); got != 0 {
|
||||
t.Errorf("error counter delta = %d, want 0", got)
|
||||
}
|
||||
@ -244,7 +281,11 @@ func TestStartSweeper_RecordsMetricsOnError(t *testing.T) {
|
||||
go pendinguploads.StartSweeper(ctx, store, time.Hour)
|
||||
store.waitForCycle(t, 1, 2*time.Second)
|
||||
|
||||
if got := deltaError(); got != 1 {
|
||||
t.Errorf("error counter delta = %d, want 1", got)
|
||||
}
|
||||
// Poll for the error counter to settle — cycleDone fires inside
|
||||
// the fake's Sweep defer, BEFORE StartSweeper's loop receives the
|
||||
// returned error and calls metrics.PendingUploadsSweepError. On
|
||||
// slow CI hosts a direct deltaError() read here returns 0 even
|
||||
// though the metric WILL be 1 a few ms later. See
|
||||
// waitForMetricDelta comment.
|
||||
waitForMetricDelta(t, deltaError, 1, 2*time.Second)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user