forked from molecule-ai/molecule-core
Resolves four of six findings from the retrospective code review of Phases 1–4 (poll-mode chat upload). Bundled because every change is in the platform's pending_uploads layer or the multi-file handler that reads it. Findings resolved: 1. Important — Sweep query lacked an index for the acked-retention OR-arm. The Phase 1 partial indexes are both `WHERE acked_at IS NULL`, so the `(acked_at IS NOT NULL AND acked_at < retention)` half of the WHERE clause seq-scanned the table on every cycle. Add a complementary partial index on `acked_at WHERE acked_at IS NOT NULL` so both arms of the disjunction are index-covered. Disjoint from the existing two indexes (no row matches both predicates), so write amplification is bounded to ~one index entry per terminal-state row. 2. Important — uploadPollMode partial-failure left orphans. The previous per-file Put loop committed rows 1..K-1 and then errored on row K with no compensation, so a client retry would double-insert the survivors. Refactor the handler into three explicit phases (pre-validate + read-into-memory, single atomic PutBatch, per-file activity row) and add Storage.PutBatch with all-or-nothing transaction semantics. 3. FYI — pendinguploads.StartSweeperWithInterval was exported only for tests. Move it to lower-case startSweeperWithInterval and expose the test seam through pendinguploads/export_test.go (Go convention; the shim file is stripped from the production binary at build time). 4. Nit — multipart Content-Type was passed verbatim into pending_uploads rows and re-served on /content. Add safeMimetype which strips parameters, rejects CR/LF/control bytes, and coerces malformed shapes to application/octet-stream. The eventual GET /content response can no longer be header-split via a crafted Content-Type on the multipart. Comprehensive tests: - 10 PutBatch unit tests (sqlmock): happy path, empty input, all four pre-validation rejection paths, BeginTx error, per-row error + Rollback (no Commit), first-row error, Commit error. - 4 new PutBatch integration tests (real Postgres): all-rows-commit happy path with COUNT(*) verification, atomic-rollback no-leak via a NUL-byte filename that lib/pq rejects mid-batch, oversize short-circuit no-Tx, idx_pending_uploads_acked existence + partial predicate via pg_indexes (planner-shape-independent). - 3 new chat_files_poll tests: atomic rollback on second-file oversize, atomic rollback on PutBatch error, mimetype CRLF/NUL/parameter sanitization (8 sub-cases). The two remaining review findings (inbox_uploads.fetch_and_stage blocks the poll loop synchronously; two httpx Clients per row) are Python-side and ship in Phase 5b once this lands on staging. Test-only export pattern via export_test.go, atomic pre-validation discipline (validate before Tx), and behavior-based (not name-based) test assertions follow the standing project conventions.
130 lines
5.0 KiB
Go
130 lines
5.0 KiB
Go
// sweeper.go — periodic GC for the pending_uploads table.
|
|
//
|
|
// The platform's poll-mode chat-upload handler creates a row in
|
|
// pending_uploads for every chat-attached file the canvas sends to a
|
|
// poll-mode workspace. The workspace's inbox poller fetches the bytes
|
|
// and acks the row, but two failure modes leak rows long-term:
|
|
//
|
|
// 1. Workspace fetches but never acks (network hiccup between GET
|
|
// /content and POST /ack; workspace crashed between the two).
|
|
// Phase 1's Get refuses to re-serve an acked row, but a never-
|
|
// acked row could in principle be fetched repeatedly until expires_at.
|
|
// Phase 2's workspace-side fetcher is idempotent; the worry is
|
|
// only disk usage on the platform side.
|
|
//
|
|
// 2. Workspace never fetches at all (workspace was offline when the
|
|
// row was written; the upload's TTL elapsed).
|
|
//
|
|
// This sweeper handles both. It runs every SweepInterval, deletes rows
|
|
// in either category, and emits structured logs + Prometheus counters
|
|
// so a stuck-fetch dashboard can spot the leak class.
|
|
//
|
|
// Failure isolation: a transient DB error must NOT crash the sweeper.
|
|
// We log + continue; the next tick retries. ctx cancellation cleanly
|
|
// shuts the loop down for graceful shutdown.
|
|
|
|
package pendinguploads
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
|
)
|
|
|
|
// SweepInterval is the cadence of the GC loop. 5 minutes is a balance
|
|
// between "rows reaped quickly enough that disk usage doesn't surprise
|
|
// anyone" and "we don't pay a DELETE round-trip every 30 seconds when
|
|
// there are no candidates." Aligned with other low-priority sweepers
|
|
// (registry/orphan_sweeper runs at 60s but operates on Docker — much
|
|
// more expensive per cycle than a single indexed DELETE).
|
|
const SweepInterval = 5 * time.Minute
|
|
|
|
// DefaultAckRetention is how long an acked row sticks around before the
|
|
// sweeper deletes it. 1 hour gives the workspace enough time to retry
|
|
// the GET if its first fetch crashed mid-write — at-least-once handoff
|
|
// without leaking content for a full 24h after the workspace already
|
|
// has a copy.
|
|
const DefaultAckRetention = 1 * time.Hour
|
|
|
|
// sweepDeadline bounds a single sweep cycle. A daemon at the edge of
|
|
// timeout shouldn't pile up goroutines; 30s is generous for a single
|
|
// indexed DELETE on a table that should rarely have more than a few
|
|
// thousand rows in flight.
|
|
const sweepDeadline = 30 * time.Second
|
|
|
|
// StartSweeper runs the GC loop until ctx is cancelled. nil storage
|
|
// makes the loop a no-op (matches the handlers' tolerance for an
|
|
// unconfigured pendinguploads — some test harnesses run without the
|
|
// storage wired).
|
|
//
|
|
// Pass ackRetention=0 to use DefaultAckRetention. Negative values are
|
|
// clamped at the storage layer.
|
|
//
|
|
// Production callers use SweepInterval (5m). Tests use a short interval
|
|
// to exercise the ticker-driven sweep path without burning real wall-
|
|
// clock time.
|
|
func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) {
|
|
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval)
|
|
}
|
|
|
|
// startSweeperWithInterval is the test-friendly variant of StartSweeper
|
|
// — same loop, but the cadence is caller-specified. Production code
|
|
// should use StartSweeper to keep the SweepInterval constant pinned.
|
|
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration) {
|
|
if storage == nil {
|
|
log.Println("pendinguploads sweeper: storage is nil — sweeper disabled")
|
|
return
|
|
}
|
|
if ackRetention == 0 {
|
|
ackRetention = DefaultAckRetention
|
|
}
|
|
log.Printf(
|
|
"pendinguploads sweeper started — sweeping every %s; ack retention %s",
|
|
interval, ackRetention,
|
|
)
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
// Run once immediately so a platform restart cleans up any rows
|
|
// that became eligible while we were down — don't make the
|
|
// operator wait 5 minutes for the first sweep.
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Println("pendinguploads sweeper: shutdown")
|
|
return
|
|
case <-ticker.C:
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sweepOnce(parent context.Context, storage Storage, ackRetention time.Duration) {
|
|
ctx, cancel := context.WithTimeout(parent, sweepDeadline)
|
|
defer cancel()
|
|
|
|
res, err := storage.Sweep(ctx, ackRetention)
|
|
if err != nil {
|
|
// Transient errors: log + continue. The next tick retries; if
|
|
// the DB is genuinely down, the rest of the platform is also
|
|
// broken and disk usage is the least of the operator's
|
|
// problems.
|
|
log.Printf("pendinguploads sweeper: Sweep failed: %v", err)
|
|
metrics.PendingUploadsSweepError()
|
|
return
|
|
}
|
|
metrics.PendingUploadsSwept(res.Acked, res.Expired)
|
|
if res.Total() > 0 {
|
|
// Per-cycle structured-ish log (one line per cycle that did
|
|
// something). Quiet by design — most cycles delete zero rows
|
|
// on a healthy system, and a stream of empty-result lines
|
|
// would drown the production log without surfacing a signal.
|
|
log.Printf(
|
|
"pendinguploads sweeper: deleted acked=%d expired=%d total=%d",
|
|
res.Acked, res.Expired, res.Total(),
|
|
)
|
|
}
|
|
}
|