8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
149 lines
5.7 KiB
Go
149 lines
5.7 KiB
Go
// sweeper.go — periodic GC for the pending_uploads table.
|
|
//
|
|
// The platform's poll-mode chat-upload handler creates a row in
|
|
// pending_uploads for every chat-attached file the canvas sends to a
|
|
// poll-mode workspace. The workspace's inbox poller fetches the bytes
|
|
// and acks the row, but two failure modes leak rows long-term:
|
|
//
|
|
// 1. Workspace fetches but never acks (network hiccup between GET
|
|
// /content and POST /ack; workspace crashed between the two).
|
|
// Phase 1's Get refuses to re-serve an acked row, but a never-
|
|
// acked row could in principle be fetched repeatedly until expires_at.
|
|
// Phase 2's workspace-side fetcher is idempotent; the worry is
|
|
// only disk usage on the platform side.
|
|
//
|
|
// 2. Workspace never fetches at all (workspace was offline when the
|
|
// row was written; the upload's TTL elapsed).
|
|
//
|
|
// This sweeper handles both. It runs every SweepInterval, deletes rows
|
|
// in either category, and emits structured logs + Prometheus counters
|
|
// so a stuck-fetch dashboard can spot the leak class.
|
|
//
|
|
// Failure isolation: a transient DB error must NOT crash the sweeper.
|
|
// We log + continue; the next tick retries. ctx cancellation cleanly
|
|
// shuts the loop down for graceful shutdown.
|
|
|
|
package pendinguploads
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"time"
|
|
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/metrics"
|
|
)
|
|
|
|
// SweepInterval is the cadence of the GC loop. 5 minutes is a balance
|
|
// between "rows reaped quickly enough that disk usage doesn't surprise
|
|
// anyone" and "we don't pay a DELETE round-trip every 30 seconds when
|
|
// there are no candidates." Aligned with other low-priority sweepers
|
|
// (registry/orphan_sweeper runs at 60s but operates on Docker — much
|
|
// more expensive per cycle than a single indexed DELETE).
|
|
const SweepInterval = 5 * time.Minute
|
|
|
|
// DefaultAckRetention is how long an acked row sticks around before the
|
|
// sweeper deletes it. 1 hour gives the workspace enough time to retry
|
|
// the GET if its first fetch crashed mid-write — at-least-once handoff
|
|
// without leaking content for a full 24h after the workspace already
|
|
// has a copy.
|
|
const DefaultAckRetention = 1 * time.Hour
|
|
|
|
// sweepDeadline bounds a single sweep cycle. A daemon at the edge of
|
|
// timeout shouldn't pile up goroutines; 30s is generous for a single
|
|
// indexed DELETE on a table that should rarely have more than a few
|
|
// thousand rows in flight.
|
|
const sweepDeadline = 30 * time.Second
|
|
|
|
// StartSweeper runs the GC loop until ctx is cancelled. nil storage
|
|
// makes the loop a no-op (matches the handlers' tolerance for an
|
|
// unconfigured pendinguploads — some test harnesses run without the
|
|
// storage wired).
|
|
//
|
|
// Pass ackRetention=0 to use DefaultAckRetention. Negative values are
|
|
// clamped at the storage layer.
|
|
//
|
|
// Production callers use SweepInterval (5m). Tests use a short interval
|
|
// to exercise the ticker-driven sweep path without burning real wall-
|
|
// clock time.
|
|
func StartSweeper(ctx context.Context, storage Storage, ackRetention time.Duration) {
|
|
startSweeperWithInterval(ctx, storage, ackRetention, SweepInterval, nil)
|
|
}
|
|
|
|
// startSweeperWithInterval is the test-friendly variant of StartSweeper
|
|
// — same loop, but the cadence is caller-specified. Production code
|
|
// should use StartSweeper to keep the SweepInterval constant pinned.
|
|
// If done is non-nil it is closed exactly once when the loop exits,
|
|
// allowing tests to wait for the goroutine to fully terminate before
|
|
// asserting on shared metric counters (issue #86).
|
|
func startSweeperWithInterval(ctx context.Context, storage Storage, ackRetention, interval time.Duration, done chan struct{}) {
|
|
if storage == nil {
|
|
log.Println("pendinguploads sweeper: storage is nil — sweeper disabled")
|
|
if done != nil {
|
|
close(done)
|
|
}
|
|
return
|
|
}
|
|
if ackRetention == 0 {
|
|
ackRetention = DefaultAckRetention
|
|
}
|
|
log.Printf(
|
|
"pendinguploads sweeper started — sweeping every %s; ack retention %s",
|
|
interval, ackRetention,
|
|
)
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
defer func() {
|
|
log.Println("pendinguploads sweeper: shutdown")
|
|
if done != nil {
|
|
close(done)
|
|
}
|
|
}()
|
|
// Run once immediately so a platform restart cleans up any rows
|
|
// that became eligible while we were down — don't make the
|
|
// operator wait 5 minutes for the first sweep.
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Guard: ctx may have been cancelled between the ticker firing
|
|
// and this case being selected (MPMC channel; close-to-ready race).
|
|
// Calling sweepOnce with an already-cancelled ctx would increment
|
|
// the error counter on shutdown — polluting the next test's
|
|
// baseline (issue #86 full-suite failure).
|
|
if ctx.Err() != nil {
|
|
continue
|
|
}
|
|
sweepOnce(ctx, storage, ackRetention)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sweepOnce(parent context.Context, storage Storage, ackRetention time.Duration) {
|
|
ctx, cancel := context.WithTimeout(parent, sweepDeadline)
|
|
defer cancel()
|
|
|
|
res, err := storage.Sweep(ctx, ackRetention)
|
|
if err != nil {
|
|
// Transient errors: log + continue. The next tick retries; if
|
|
// the DB is genuinely down, the rest of the platform is also
|
|
// broken and disk usage is the least of the operator's
|
|
// problems.
|
|
log.Printf("pendinguploads sweeper: Sweep failed: %v", err)
|
|
metrics.PendingUploadsSweepError()
|
|
return
|
|
}
|
|
metrics.PendingUploadsSwept(res.Acked, res.Expired)
|
|
if res.Total() > 0 {
|
|
// Per-cycle structured-ish log (one line per cycle that did
|
|
// something). Quiet by design — most cycles delete zero rows
|
|
// on a healthy system, and a stream of empty-result lines
|
|
// would drown the production log without surfacing a signal.
|
|
log.Printf(
|
|
"pendinguploads sweeper: deleted acked=%d expired=%d total=%d",
|
|
res.Acked, res.Expired, res.Total(),
|
|
)
|
|
}
|
|
}
|