Issue #86: TestStartSweeper_RecordsMetricsOnSuccess fails in full-suite. Root cause: two cooperating bugs in the sweeper test harness. 1. Sweeper loop called sweepOnce after ctx cancellation (double-increment). When ctx was cancelled the loop's select received ctx.Done(), called sweepOnce with the cancelled ctx, storage.Sweep returned context error, and metrics.PendingUploadsSweepError() incremented the error counter a SECOND time before the loop exited. Subsequent tests captured a polluted error baseline and their deltaError assertions failed. 2. Tests called defer cancel() without waiting for the goroutine to exit. The goroutine could still be blocked on Sweep (waiting for the next ticker's C channel) when the next test called metricDelta(). If the goroutine's Sweep returned during the next test's measurement window, the shared metric counters mutated mid-baseline. Fix (production code): - Guard the ticker arm: if ctx.Err() != nil, continue instead of calling sweepOnce. This prevents the post-cancellation sweep from running. Fix (test harness): - startSweeperWithInterval gains a done chan struct{} parameter. When the loop exits the channel is closed exactly once. - StartSweeperForTest starts the goroutine and returns the done channel, allowing tests to drain it with <-done after cancel() — guaranteeing the goroutine has fully terminated before the next test's baseline. All 8 sweeper tests now use StartSweeperForTest and drain the done channel before returning, ensuring stable metric baselines across the full suite. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
149 lines
5.7 KiB
Go
149 lines
5.7 KiB
Go
// sweeper.go — periodic GC for the pending_uploads table.
|
|
//
|
|
// The platform's poll-mode chat-upload handler creates a row in
|
|
// pending_uploads for every chat-attached file the canvas sends to a
|
|
// poll-mode workspace. The workspace's inbox poller fetches the bytes
|
|
// and acks the row, but two failure modes leak rows long-term:
|
|
//
|
|
// 1. Workspace fetches but never acks (network hiccup between GET
|
|
// /content and POST /ack; workspace crashed between the two).
|
|
// Phase 1's Get refuses to re-serve an acked row, but a never-
|
|
// acked row could in principle be fetched repeatedly until expires_at.
|
|
// Phase 2's workspace-side fetcher is idempotent; the worry is
|
|
// only disk usage on the platform side.
|
|
//
|
|
// 2. Workspace never fetches at all (workspace was offline when the
|
|
// row was written; the upload's TTL elapsed).
|
|
//
|
|
// This sweeper handles both. It runs every SweepInterval, deletes rows
|
|
// in either category, and emits structured logs + Prometheus counters
|
|
// so a stuck-fetch dashboard can spot the leak class.
|
|
//
|
|
// Failure isolation: a transient DB error must NOT crash the sweeper.
|
|
// We log + continue; the next tick retries. ctx cancellation cleanly
|
|
// shuts the loop down for graceful shutdown.
|
|
|
|
package pendinguploads
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
|
)
|
|
|
|
// SweepInterval is the cadence of the GC loop. 5 minutes is a balance
|
|
// between "rows reaped quickly enough that disk usage doesn't surprise
|
|
// anyone" and "we don't pay a DELETE round-trip every 30 seconds when
|
|
// there are no candidates." Aligned with other low-priority sweepers
|
|
// (registry/orphan_sweeper runs at 60s but operates on Docker — much
|
|
// more expensive per cycle than a single indexed DELETE).
|
|
const SweepInterval = 5 * time.Minute
|
|
|
|
// DefaultAckRetention is how long an acked row sticks around before the
|
|
// sweeper deletes it. 1 hour gives the workspace enough time to retry
|
|
// the GET if its first fetch crashed mid-write — at-least-once handoff
|
|
// without leaking content for a full 24h after the workspace already
|
|
// has a copy.
|
|
const DefaultAckRetention = 1 * time.Hour
|
|
|
|
// sweepDeadline bounds a single sweep cycle. A daemon at the edge of
|
|
// timeout shouldn't pile up goroutines; 30s is generous for a single
|
|
// indexed DELETE on a table that should rarely have more than a few
|
|
// thousand rows in flight.
|
|
const sweepDeadline = 30 * time.Second
|
|
|
|
// StartSweeper runs the GC loop until ctx is cancelled. nil storage
|
|
// makes the loop a no-op (matches the handlers' tolerance for an
|
|
// unconfigured pendinguploads — some test harnesses run without the
|
|
// storage wired).
|
|
//
|
|
// Pass ackRetention=0 to use DefaultAckRetention. Negative values are
|
|
// clamped at the storage layer.
|
|
//
|
|
// Production callers use SweepInterval (5m). Tests use a short interval
|
|
// to exercise the ticker-driven sweep path without burning real wall-
|
|
// clock time.
|
|
func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) {
|
|
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil)
|
|
}
|
|
|
|
// startSweeperWithInterval is the test-friendly variant of StartSweeper
|
|
// — same loop, but the cadence is caller-specified. Production code
|
|
// should use StartSweeper to keep the SweepInterval constant pinned.
|
|
// If done is non-nil it is closed exactly once when the loop exits,
|
|
// allowing tests to wait for the goroutine to fully terminate before
|
|
// asserting on shared metric counters (issue #86).
|
|
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) {
|
|
if storage == nil {
|
|
log.Println("pendinguploads sweeper: storage is nil — sweeper disabled")
|
|
if done != nil {
|
|
close(done)
|
|
}
|
|
return
|
|
}
|
|
if ackRetention == 0 {
|
|
ackRetention = DefaultAckRetention
|
|
}
|
|
log.Printf(
|
|
"pendinguploads sweeper started — sweeping every %s; ack retention %s",
|
|
interval, ackRetention,
|
|
)
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
defer func() {
|
|
log.Println("pendinguploads sweeper: shutdown")
|
|
if done != nil {
|
|
close(done)
|
|
}
|
|
}()
|
|
// Run once immediately so a platform restart cleans up any rows
|
|
// that became eligible while we were down — don't make the
|
|
// operator wait 5 minutes for the first sweep.
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Guard: ctx may have been cancelled between the ticker firing
|
|
// and this case being selected (MPMC channel; close-to-ready race).
|
|
// Calling sweepOnce with an already-cancelled ctx would increment
|
|
// the error counter on shutdown — polluting the next test's
|
|
// baseline (issue #86 full-suite failure).
|
|
if ctx.Err() != nil {
|
|
continue
|
|
}
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sweepOnce(parent context.Context, storage Storage, ackRetention time.Duration) {
|
|
ctx, cancel := context.WithTimeout(parent, sweepDeadline)
|
|
defer cancel()
|
|
|
|
res, err := storage.Sweep(ctx, ackRetention)
|
|
if err != nil {
|
|
// Transient errors: log + continue. The next tick retries; if
|
|
// the DB is genuinely down, the rest of the platform is also
|
|
// broken and disk usage is the least of the operator's
|
|
// problems.
|
|
log.Printf("pendinguploads sweeper: Sweep failed: %v", err)
|
|
metrics.PendingUploadsSweepError()
|
|
return
|
|
}
|
|
metrics.PendingUploadsSwept(res.Acked, res.Expired)
|
|
if res.Total() > 0 {
|
|
// Per-cycle structured-ish log (one line per cycle that did
|
|
// something). Quiet by design — most cycles delete zero rows
|
|
// on a healthy system, and a stream of empty-result lines
|
|
// would drown the production log without surfacing a signal.
|
|
log.Printf(
|
|
"pendinguploads sweeper: deleted acked=%d expired=%d total=%d",
|
|
res.Acked, res.Expired, res.Total(),
|
|
)
|
|
}
|
|
}
|