Commit Graph

7 Commits

Author SHA1 Message Date
dev-lead
9e18ab4620 fix(pendinguploads): wait for error metric before test exit
Some checks failed
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Successful in 0s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Successful in 1s
Retarget main PRs to staging / Retarget to staging (pull_request) Has been skipped
CI / Detect changes (pull_request) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
E2E API Smoke Test / detect-changes (pull_request) Successful in 8s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
Harness Replays / detect-changes (pull_request) Successful in 7s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 3s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3s
CI / Python Lint & Test (pull_request) Successful in 5s
CI / Canvas (Next.js) (pull_request) Successful in 5s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Harness Replays / Harness Replays (pull_request) Failing after 6s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m0s
CI / Platform (Go) (pull_request) Successful in 4m34s
TestStartSweeper_TransientErrorDoesNotCrashLoop leaks an in-flight
metric write across the test boundary: cycleDone fires inside the
fake's Sweep defer (before Sweep returns), waitForCycle returns
immediately after, cancel() lands, but the goroutine still has
metrics.PendingUploadsSweepError() to execute. Whether that write
happens before or after the next test's metricDelta() baseline read
is a coin-flip on slow CI hosts.

Outcome: TestStartSweeper_RecordsMetricsOnSuccess fails with
"error counter delta = 1, want 0" — looks like a real bug, isn't.
Instrumented analysis (per the file's existing waitForMetricDelta
docstring covering the same shape) confirms the metric IS getting
recorded, just AFTER the next test reads its baseline.

The Records* tests already use waitForMetricDelta to close this race
on their own assertions. This change extends the same shape to
TransientErrorDoesNotCrashLoop so it doesn't poison subsequent tests'
baselines.

Verified by running `go test -race -count=20 ./internal/pendinguploads/...`
locally — passes deterministically.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 07:37:45 -07:00
Hongming Wang
b759548822 fix(chat-uploads): activity rows commit atomically with PutBatch
Closes #149.

uploadPollMode for poll-mode chat uploads previously committed N
pending_uploads rows in one Tx (PutBatch), then wrote N activity_logs
rows individually outside any Tx. A per-row failure on activity row K
left rows 1..K-1 committed and pending_uploads orphaned until the 24h
TTL — not data-loss because the platform's fetcher handled the
half-state cleanly, but the user never saw file K in the canvas and
the inconsistency surfaced as an "uploaded but invisible" complaint
class.

Thread one Tx through PutBatchTx + N × LogActivityTx + Commit so all
or none commit. Broadcasts are deferred until after Commit — emitting
an ACTIVITY_LOGGED event for a row that ends up rolled back would
paint a ghost message into the canvas's optimistic UI. A new
LogActivityTx returns a commitHook the caller invokes post-Commit;
the existing fire-and-forget LogActivity is unchanged for the 4 other
production callers (a2a_proxy_helpers + activity.go report path).

Storage interface gains PutBatchTx; PostgresStorage.PutBatch is
refactored to share the validation + insert path. inMemStorage and
fakeSweepStorage delegate or no-op for PutBatchTx (the in-mem fake
can't model Tx state — DB-level atomicity is verified by the existing
real-Postgres integration test for PutBatch + the new unit test
asserting the Go handler calls Rollback on activity-insert failure).

Tests:
- TestPollUpload_AtomicRollbackOnActivityInsertFailure pins the new
  contract via sqlmock — second activity insert errors → Rollback
  expected, Commit must NOT be called.
- TestLogActivityTx_DefersBroadcastUntilCommitHook +
  _InsertError_NoHook_NoBroadcast + _NilTx_Errors cover the new API.
- TestPutBatchTx_HappyPath / _EmptyItems / _ValidationFails /
  _PerRowErrorPropagates cover Tx-aware storage layer.
- 7 existing TestPollUpload_* tests updated to mock Begin + Commit
  (or Begin + Rollback for failure paths) since the handler now
  opens a Tx around PutBatch + activity inserts.

All workspace-server tests pass; integration tag also clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 21:34:28 -07:00
Hongming Wang
27db090d3d
Merge pull request #2907 from Molecule-AI/feat/poll-mode-chat-upload-phase5a
feat(poll-upload): phase 5a — atomic batch insert + acked-index + mime hardening
2026-05-05 11:16:56 -07:00
Hongming Wang
9991057ad1 feat(poll-upload): phase 5a — atomic batch insert + acked-index + mime hardening
Resolves four of six findings from the retrospective code review of Phases
1–4 (poll-mode chat upload). Bundled because every change is in the
platform's pending_uploads layer or the multi-file handler that reads it.

Findings resolved:

1. Important — Sweep query lacked an index for the acked-retention OR-arm.
   The Phase 1 partial indexes are both `WHERE acked_at IS NULL`, so the
   `(acked_at IS NOT NULL AND acked_at < retention)` half of the WHERE
   clause seq-scanned the table on every cycle. Add a complementary
   partial index on `acked_at WHERE acked_at IS NOT NULL` so both arms
   of the disjunction are index-covered. Disjoint from the existing two
   indexes (no row matches both predicates), so write amplification is
   bounded to ~one index entry per terminal-state row.

2. Important — uploadPollMode partial-failure left orphans. The previous
   per-file Put loop committed rows 1..K-1 and then errored on row K with
   no compensation, so a client retry would double-insert the survivors.
   Refactor the handler into three explicit phases (pre-validate +
   read-into-memory, single atomic PutBatch, per-file activity row) and
   add Storage.PutBatch with all-or-nothing transaction semantics.

3. FYI — pendinguploads.StartSweeperWithInterval was exported only for
   tests. Move it to lower-case startSweeperWithInterval and expose the
   test seam through pendinguploads/export_test.go (Go convention; the
   shim file is stripped from the production binary at build time).

4. Nit — multipart Content-Type was passed verbatim into pending_uploads
   rows and re-served on /content. Add safeMimetype which strips
   parameters, rejects CR/LF/control bytes, and coerces malformed shapes
   to application/octet-stream. The eventual GET /content response can no
   longer be header-split via a crafted Content-Type on the multipart.

Comprehensive tests:

- 10 PutBatch unit tests (sqlmock): happy path, empty input, all four
  pre-validation rejection paths, BeginTx error, per-row error +
  Rollback (no Commit), first-row error, Commit error.
- 4 new PutBatch integration tests (real Postgres): all-rows-commit
  happy path with COUNT(*) verification, atomic-rollback no-leak via
  a NUL-byte filename that lib/pq rejects mid-batch, oversize
  short-circuit no-Tx, idx_pending_uploads_acked existence + partial
  predicate via pg_indexes (planner-shape-independent).
- 3 new chat_files_poll tests: atomic rollback on second-file oversize,
  atomic rollback on PutBatch error, mimetype CRLF/NUL/parameter
  sanitization (8 sub-cases).

The two remaining review findings (inbox_uploads.fetch_and_stage blocks
the poll loop synchronously; two httpx Clients per row) are Python-side
and ship in Phase 5b once this lands on staging.

Test-only export pattern via export_test.go, atomic pre-validation
discipline (validate before Tx), and behavior-based (not name-based)
test assertions follow the standing project conventions.
2026-05-05 11:10:13 -07:00
Hongming Wang
c79ba05ed5 test(pendinguploads): close cycleDone-vs-metric-record race in sweeper tests
TestStartSweeper_RecordsMetricsOnError flaked on every CI rerun under
race detection: `error counter delta = 0, want 1`. Root cause is a
race between two goroutines, not a bug in the production sweeper.

The fake `fakeSweepStorage.Sweep` signals `cycleDone` from inside its
deferred return — that happens BEFORE Sweep's return value is
received by `sweepOnce`, which is what triggers the metric increment.
On slow CI hosts the test goroutine wins the read after `waitForCycle`
unblocks and BEFORE StartSweeper's goroutine has called
`metrics.PendingUploadsSweepError`, so the asserted delta is 0 even
though the metric WILL be 1 a few ms later.

Adds a polling assert helper, `waitForMetricDelta`, that closes the
race deterministically without timing-based sleeps:

- TestStartSweeper_RecordsMetricsOnError uses waitForMetricDelta to
  wait for the error counter to settle at 1.
- TestStartSweeper_RecordsMetricsOnSuccess uses it on the success
  counters (acked, expired) so the error-stayed-zero assertion
  reads after StartSweeper has fully processed the cycle.
- waitForCycle keeps its current shape but documents the caveat in
  its comment so future tests don't repeat the assumption.

Verified: `go test ./internal/pendinguploads/ -race -count 5` passes
all 9 tests across 5 iterations cleanly.

Per memory feedback_question_test_when_unexpected.md: the
"delta=0, want=1" failure looked like a real production bug at first
glance, but instrumented inspection showed the metric DOES increment,
just AFTER the test's read. The fix is the test's wait shape, not
the sweeper.

Unblocks every PR currently broken by this flake (#2898 hit it on
two consecutive CI runs; staging-merged PRs from earlier today
(#2877/#2881/#2885/#2886) introduced the test).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 10:46:17 -07:00
Hongming Wang
a327d207da feat(rfc): poll-mode chat upload — phase 3 GC sweep + observability
Phase 3 of the poll-mode chat upload rollout. Stack atop Phase 2.

The platform's pending_uploads table grows once-per-uploaded-file with
no built-in cleanup. Phase 1's hard TTL (expires_at default 24h) makes
expired rows un-fetchable but doesn't actually delete them; Phase 1's
ack stamps acked_at but leaves the row indefinitely. Without a sweep
the table grows unbounded across normal traffic.

This PR adds:

- `Storage.Sweep(ctx, ackRetention)` — a single round-trip CTE that
  deletes acked rows past their retention window plus unacked rows
  past expires_at. Returns `(acked, expired)` deletion counts so
  Phase 3 dashboards can spot the stuck-fetch pattern (high expired,
  low acked) vs healthy churn.
- `pendinguploads.StartSweeper(ctx, storage, ackRetention)` —
  background goroutine that calls Sweep every 5 minutes (default).
  Runs once immediately on startup so a platform restart cleans up
  any rows that became eligible while we were down.
- Prometheus counters `molecule_pending_uploads_swept_total` with
  `outcome={acked,expired,error}` labels. Wired into the existing
  `/metrics` endpoint.
- Wired from cmd/server/main.go via supervised.RunWithRecover —
  one transient panic doesn't take the platform down with it.

Defaults:
  - SweepInterval = 5m (matches the dashboard refresh cadence)
  - DefaultAckRetention = 1h (gives the workspace at-least-once retry
    headroom in case it processed but failed to write the file before
    crashing)

Test coverage: 100% on storage_test.go (extended with sweepSQL pin +
six Sweep test cases including negative-retention clamp + zero-retention
immediate-delete + DB error wrapping) and sweeper_test.go (ticker-driven
+ ctx-cancel + nil-storage + transient-error-doesn't-crash + metric
counter assertions).

Closes the third of four phases tracked on the parent RFC; phase 4 is
the staging E2E test.
2026-05-05 05:00:13 -07:00
Hongming Wang
86fdaad111 feat(rfc): poll-mode chat upload — phase 1 platform staging layer
External-runtime workspaces (registered via molecule connect, behind
NAT, no public callback URL) currently see HTTP 422 "workspace has no
callback URL" on every chat file upload. The only escape is to wrap the
laptop in ngrok / Cloudflare tunnel + re-register push-mode — a tax
that shouldn't exist for a one-line use case.

This phase introduces the platform-side staging layer that lets
canvas → external workspace uploads ride the same poll loop the inbox
already uses for text messages.

Architecture (mirrors inbox poll, SSOT principle):
  Canvas POST /chat/uploads (multipart)
      ↓ delivery_mode=poll
  Platform: chat_files.uploadPollMode
      ↓ pendinguploads.Storage.Put + LogActivity(chat_upload_receive)
  Workspace's existing inbox poller picks up the activity row (Phase 2)
  Workspace fetches: GET /workspaces/:id/pending-uploads/:fid/content
  Workspace acks:    POST /workspaces/:id/pending-uploads/:fid/ack

Pieces in this PR:
  * Migration 20260505100000 — pending_uploads table; partial indexes
    on unacked + expires_at for the workspace fetch + Phase 3 sweep
    hot paths. No FK to workspaces (audit retention), 24h hard TTL.
  * internal/pendinguploads — Storage interface + Postgres impl. Bytes
    inline (bytea) today; the interface lets a future PR replace with
    S3 (RFC #2789) by swapping one constructor. 100% test coverage on
    the Postgres impl via sqlmock-pinned SQL.
  * handlers.PendingUploadsHandler — GET /content + POST /ack endpoints.
    wsAuth-gated; cross-workspace bleed protection via per-row
    workspace_id check (token leak from A can't read B's pending bytes).
    Handler tests pin happy path + every 4xx/5xx mapping including
    cross-workspace + race-with-sweep.
  * chat_files.go — Upload poll-mode branch behind WithPendingUploads
    builder. Push-mode unchanged (regression-tested). Multipart parse
    + per-file sanitize + storage.Put + activity_logs row per file.
  * SanitizeFilename — Go mirror of workspace/internal_chat_uploads.py
    sanitize_filename. Tests pin parity case-by-case so canvas-emitted
    URIs stay identical regardless of which path handles the upload.
  * Comprehensive logging — every state transition (staged, fetch,
    ack, error) emits a structured log line with workspace_id +
    file_id + size + sanitized name. Phase 3 metrics will hook these.

The pendinguploads.Storage wiring is opt-in (WithPendingUploads on
ChatFilesHandler) so a binary deployed without the migration keeps the
pre-existing 422 behavior — no boot-order coupling between code roll
and schema roll.

Phase 2 (separate PR): workspace inbox extension — inbox_uploads.py
fetches via the GET endpoint, writes to /workspace/.molecule/chat-
uploads/, acks, and rewrites the URI from platform-pending: → workspace:
so the agent's existing send-attachments path needs no changes.
Phase 3: GC sweep + dashboards. Phase 4: poll-mode E2E on staging.

Tests:
  * 100% coverage on pendinguploads (sqlmock-pinned SQL drift gate).
  * Functional 100% on new handler code (uncovered branches are
    documented defensive duplicates: uuid re-parse, multipart Open
    error, Writer.Write fail — none reproducible in unit tests).
  * Push-mode + NULL delivery_mode regression tests pin no behavior
    change for existing workspaces.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 04:22:24 -07:00