diff --git a/docs/agent-runtime/team-expansion.md b/docs/agent-runtime/team-expansion.md deleted file mode 100644 index 5785dd13..00000000 --- a/docs/agent-runtime/team-expansion.md +++ /dev/null @@ -1,111 +0,0 @@ -# Team Expansion (Recursive Workspaces) - -When a workspace is expanded into a team, it gains sub-workspaces while its own agent remains as the **team lead** (coordinator). This is recursive — sub-workspaces can themselves be expanded into teams, infinitely deep. - -## How It Works - -When Developer PM is expanded into a team: - -``` -Business Core - | - +-- Developer PM (agent stays, becomes coordinator) - | - +-- Frontend Agent (sub-workspace, private scope) - +-- Backend Agent (sub-workspace, private scope) - +-- QA Agent (sub-workspace, private scope) -``` - -- Developer PM's agent **still exists** and acts as coordinator -- Developer PM receives incoming A2A messages from Business Core -- Developer PM's agent decides how to delegate to sub-workspaces -- Sub-workspaces talk to Developer PM and to each other (same level) -- Sub-workspaces **cannot** talk to Business Core or any workspace outside the team - -## Communication Rules - -| Direction | Allowed? | Example | -|-----------|----------|---------| -| Parent level -> team lead | Yes | Business Core -> Developer PM | -| Team lead -> sub-workspaces | Yes | Developer PM -> Frontend Agent | -| Sub-workspace -> team lead | Yes | Frontend Agent -> Developer PM | -| Sub-workspace <-> sibling | Yes | Frontend Agent <-> Backend Agent | -| Outside -> sub-workspace directly | No (403) | Business Core -> Frontend Agent | -| Sub-workspace -> outside directly | No | Frontend Agent -> Business Core | - -The team lead (Developer PM) is the **only** bridge between the team's internal world and the outside. - -## Scoped Registry - -Sub-workspaces register in the platform registry but with a **private scope**. The registry knows about them but enforces access control. - -``` -Registry: - Business Core :8001 scope: public - Developer PM :8002 scope: public - Frontend Agent :8010 scope: private, parent=Developer PM - Backend Agent :8011 scope: private, parent=Developer PM - QA Agent :8012 scope: private, parent=Developer PM -``` - -- The platform can always discover any workspace (for provisioning, monitoring) -- The parent workspace can discover its sub-workspaces -- Sub-workspaces can discover their siblings (same parent) -- Outside workspaces get a **403 Forbidden** if they try to discover a private sub-workspace - -## How to Expand - -Expansion is triggered via `POST /workspaces/:id/expand`. The platform reads the `sub_workspaces` list from the workspace's config and provisions each one. On the canvas, users right-click a workspace node and select "Expand into team." - -Collapsing is the inverse: `POST /workspaces/:id/collapse`. Sub-workspaces are stopped and removed. - -## What Happens on Expansion - -When Developer PM is expanded into a team, the hierarchy changes but the outside view doesn't. Business Core's parent/child relationship to Developer PM is unaffected — Developer PM still responds to the same A2A endpoint. - -The events fired: -- `WORKSPACE_EXPANDED` with the new `sub_workspace_ids` in the payload -- `WORKSPACE_PROVISIONING` for each new sub-workspace -- `WORKSPACE_ONLINE` for each sub-workspace as they come up - -Communication rules are automatically derived from the new hierarchy — no manual wiring needed. - -## Canvas Behavior - -- Children render as embedded mini-cards (`TeamMemberChip`) inside the parent node, not as separate canvas nodes -- Each mini-card shows full status: gradient bar, name, tier badge, skills pills, active tasks, descendant count -- **Recursive rendering** up to 3 levels deep (`MAX_NESTING_DEPTH = 3`) — sub-cards can contain their own "Team" sections -- Parent node dynamically resizes: 210-280px (no children), 320-450px (children), 400-560px (grandchildren) -- Eject button (sky-blue arrow icon) on hover extracts a child from the team -- "Extract from Team" also available in the right-click context menu -- Double-click a team node to zoom/fit to the parent area -- The parent workspace node shows a badge with total descendant count - -## Collapsing a Team - -The inverse of expansion, triggered via `POST /workspaces/:id/collapse`: - -1. Each sub-workspace agent wraps up current work and writes a handoff document to memory -2. Sub-workspaces are stopped and removed -3. The team lead's agent goes back to handling everything directly -4. A `WORKSPACE_COLLAPSED` event fires - -Sub-workspace memory is cleaned up based on backend (see [Memory — Cleanup](../architecture/memory.md#cleanup-on-workspace-deletion)). - -## Deleting a Team Workspace - -When a team workspace is deleted: -1. Platform shows a warning listing all sub-workspaces that will be deleted -2. User can **drag sub-workspaces out** of the team before confirming (promotes them to the parent level) -3. On confirmation, cascade delete removes the parent and all remaining sub-workspaces -4. `WORKSPACE_REMOVED` events fire for each deleted workspace - -## Related Docs - -- [Communication Rules](../api-protocol/communication-rules.md) — Full access control model -- [Core Concepts](../product/core-concepts.md) — Workspace fundamentals -- [System Prompt Structure](./system-prompt-structure.md) — How peer capabilities are injected -- [Provisioner](../architecture/provisioner.md) — How sub-workspaces are deployed -- [Registry & Heartbeat](../api-protocol/registry-and-heartbeat.md) — How registration works -- [Event Log](../architecture/event-log.md) — Events fired during expansion -- [Canvas UI](../frontend/canvas.md) — Visual behavior of teams diff --git a/docs/api-reference.md b/docs/api-reference.md index e1a75668..12e94a3c 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -41,8 +41,6 @@ Full contract: `docs/runbooks/admin-auth.md`. | GET | /admin/workspaces/:id/test-token | admin_test_token.go — mint a fresh bearer token for E2E scripts; returns 404 unless `MOLECULE_ENV != production` or `MOLECULE_ENABLE_TEST_TOKENS=1` | | GET/POST/DELETE | /admin/secrets[/:key] | secrets.go — legacy aliases for /settings/secrets | | WS | /workspaces/:id/terminal | terminal.go | -| POST | /workspaces/:id/expand | team.go | -| POST | /workspaces/:id/collapse | team.go | | POST/GET | /workspaces/:id/approvals | approvals.go | | POST | /workspaces/:id/approvals/:id/decide | approvals.go | | GET | /approvals/pending | approvals.go | diff --git a/docs/architecture/molecule-technical-doc.md b/docs/architecture/molecule-technical-doc.md index 0d9c653c..cd3dc957 100644 --- a/docs/architecture/molecule-technical-doc.md +++ b/docs/architecture/molecule-technical-doc.md @@ -336,8 +336,6 @@ This same logic governs: A2A delegation, memory scope enforcement, activity visi | Method | Endpoint | Purpose | |--------|----------|---------| -| `POST` | `/workspaces/:id/expand` | Expand workspace into team (become coordinator) | -| `POST` | `/workspaces/:id/collapse` | Collapse team back to single workspace | ### Files, Terminal, Templates, Bundles (8 endpoints) diff --git a/docs/frontend/canvas.md b/docs/frontend/canvas.md index 8d59c80f..fc103bd6 100644 --- a/docs/frontend/canvas.md +++ b/docs/frontend/canvas.md @@ -186,4 +186,3 @@ So the UI now exposes more operational failure state directly instead of silentl - [Quickstart](../quickstart.md) - [Platform API](../api-protocol/platform-api.md) - [Workspace Runtime](../agent-runtime/workspace-runtime.md) -- [Team Expansion](../agent-runtime/team-expansion.md) diff --git a/docs/glossary.md b/docs/glossary.md index f0343a38..b3535ae8 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -18,7 +18,7 @@ lands in the watch list with a colliding term, add a row here. | **plugin** | A directory under `plugins/` packaging one or more skills or an MCP server wrapper, installable per-workspace via `POST /workspaces/:id/plugins`. Governed by `plugin.yaml`. | **Langflow**: a visual UI node / component in a flowchart. **CrewAI**: a Python-importable callable registered as a capability. | | **agent** | A persistent containerized workspace running continuously — an identity with memory, a role, and a schedule. Not a one-shot invocation. | Most frameworks (AutoGPT, LangChain agents, OpenAI Assistants): a stateless function-call loop. No persistence between invocations unless explicitly checkpointed. | | **flow** | A task execution within a workspace — a request enters, the agent runs tools, emits a response, logs activity. No explicit graph abstraction. | **Langflow**: a directed graph of nodes you author visually. **LangGraph**: a stateful graph of callable nodes. Our "flow" is an imperative timeline, not a graph. | -| **team** | A named cluster of workspaces under a PM (org template `expand_team`). Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. | +| **team** | A named cluster of workspaces under a PM . Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. | | **skill** | A directory with `SKILL.md` that an agent invokes via the `Skill` tool. Skills are documentation + optional scripts that teach an agent a recipe. | **Anthropic Skills API**: nearly identical. **CrewAI tool**: closer to our plugin's MCP tool, not our skill. | | **channel** | An outbound/inbound social integration (Telegram, Slack, …) per-workspace, wired in `workspace_channels`. | Slack's "channel": the container for messages. We use "channel" for the adapter + credentials, not the conversation itself. | | **runtime** | The execution engine image tag for a workspace: one of `langgraph`, `claude-code`, `openclaw`, `crewai`, `autogen`, `deepagents`, `hermes`. | **LangGraph runtime**: the Python process running the graph. We use "runtime" for the Docker image + adapter pairing, not the inner process. | diff --git a/docs/guides/mcp-server-setup.md b/docs/guides/mcp-server-setup.md index aacc554a..5539ba97 100644 --- a/docs/guides/mcp-server-setup.md +++ b/docs/guides/mcp-server-setup.md @@ -166,8 +166,6 @@ list_workspaces | MCP Tool | API Route | Method | Description | |----------|-----------|--------|-------------| -| `expand_team` | `/workspaces/:id/expand` | POST | Expand team node | -| `collapse_team` | `/workspaces/:id/collapse` | POST | Collapse team node | ### Templates & Bundles diff --git a/workspace-server/Dockerfile b/workspace-server/Dockerfile index ecf43fab..d6754312 100644 --- a/workspace-server/Dockerfile +++ b/workspace-server/Dockerfile @@ -63,21 +63,30 @@ fi # Memory v2 sidecar (built-in postgres plugin). Co-located with the # main server so operators flipping MEMORY_V2_CUTOVER=true don't need -# to provision a separate service. Stays inert at the protocol layer -# until that env var is set — the workspace-server's wiring.go skips -# building the client without MEMORY_PLUGIN_URL, so the running plugin -# is a no-op for traffic. +# to provision a separate service. # -# Env defaults: +# Spawn-gating: only start the sidecar when the operator has indicated +# they want it — either MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set. +# Without that signal, the sidecar adds zero value (the platform's +# wiring.go skips building the client too) but pays a real cost: the +# plugin's first migration runs `CREATE EXTENSION vector`, which fails +# on tenant Postgres without pgvector preinstalled and aborts container +# boot via the 30s health gate. Caught on staging redeploy 2026-05-05. +# +# Env defaults (when sidecar IS spawned): # MEMORY_PLUGIN_DATABASE_URL = $DATABASE_URL (share existing Postgres; # plugin's `memory_namespaces` / `memory_records` tables coexist # with `agent_memories` and the rest of the platform schema — # no conflicts. Operator can override with a separate URL.) -# MEMORY_PLUGIN_LISTEN_ADDR = :9100 +# MEMORY_PLUGIN_LISTEN_ADDR = 127.0.0.1:9100 # -# Set MEMORY_PLUGIN_DISABLE=1 to skip launching the sidecar entirely -# (e.g. an operator running the plugin externally on a separate host). -if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then +# Set MEMORY_PLUGIN_DISABLE=1 to force-skip the sidecar even with +# cutover env set (e.g. running the plugin externally on a separate host). +memory_plugin_wanted="" +if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then + memory_plugin_wanted=1 +fi +if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then : "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}" : "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}" export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR diff --git a/workspace-server/cmd/memory-plugin-postgres/main.go b/workspace-server/cmd/memory-plugin-postgres/main.go index 148c1dd4..2a1b2dee 100644 --- a/workspace-server/cmd/memory-plugin-postgres/main.go +++ b/workspace-server/cmd/memory-plugin-postgres/main.go @@ -10,6 +10,7 @@ package main import ( "context" "database/sql" + "embed" "errors" "fmt" "log" @@ -17,6 +18,7 @@ import ( "net/http" "os" "os/signal" + "sort" "strings" "syscall" "time" @@ -26,6 +28,16 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/pgplugin" ) +// migrationsFS bundles the .up.sql files into the binary at build time +// so the prebuilt image doesn't need the source tree at runtime. The +// prior `os.ReadDir("cmd/memory-plugin-postgres/migrations")` path +// only resolved during `go test` from the repo root — in the published +// image the path didn't exist and boot failed after the 30s health gate +// (caught on staging redeploy 2026-05-05 after PR #2906). +// +//go:embed migrations/*.up.sql +var migrationsFS embed.FS + const ( envDatabaseURL = "MEMORY_PLUGIN_DATABASE_URL" envListenAddr = "MEMORY_PLUGIN_LISTEN_ADDR" @@ -149,32 +161,71 @@ func openDB(databaseURL string) (*sql.DB, error) { return db, nil } -// runMigrations applies the schema migrations bundled at -// cmd/memory-plugin-postgres/migrations/. Idempotent on repeat boot. +// runMigrations applies the schema migrations bundled into the binary +// via go:embed (see migrationsFS at the top of this file). Idempotent +// on repeat boot — every migration file uses CREATE … IF NOT EXISTS. // -// Implementation note: rather than embedding the full migrate engine, -// we read the migration files at boot from a known relative path. The -// down migrations are deliberately NOT applied here — that's a manual -// operator action. This keeps the binary tiny and avoids dragging in -// golang-migrate's drivers. +// The down migrations are deliberately NOT applied here — that's a +// manual operator action. This keeps the binary tiny and avoids +// dragging in golang-migrate's drivers. +// +// MEMORY_PLUGIN_MIGRATIONS_DIR (filesystem path) is honored as an +// override for operators who need to ship custom migrations alongside +// the binary without rebuilding. When unset (the common case) we read +// from the embedded FS. func runMigrations(db *sql.DB) error { - // Find the migrations directory. In `go run` mode it's relative - // to the cmd dir; in the prebuilt binary case it's expected next - // to the binary OR via env var override. - dir := os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR") - if dir == "" { - // Best-effort: try the cwd-relative path that works for `go test`. - dir = "cmd/memory-plugin-postgres/migrations" + if dir := strings.TrimSpace(os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")); dir != "" { + return runMigrationsFromDisk(db, dir) } - entries, err := os.ReadDir(dir) + return runMigrationsFromEmbed(db) +} + +// runMigrationsFromEmbed applies the *.up.sql files bundled into the +// binary at build time. Order is alphabetical (matches the on-disk +// behavior of os.ReadDir on Linux for the same set of names). +func runMigrationsFromEmbed(db *sql.DB) error { + entries, err := migrationsFS.ReadDir("migrations") if err != nil { - return fmt.Errorf("read migrations dir %q: %w", dir, err) + return fmt.Errorf("read embedded migrations: %w", err) } + names := make([]string, 0, len(entries)) for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { continue } - path := dir + "/" + e.Name() + names = append(names, e.Name()) + } + sort.Strings(names) + for _, name := range names { + data, err := migrationsFS.ReadFile("migrations/" + name) + if err != nil { + return fmt.Errorf("read embedded %q: %w", name, err) + } + if _, err := db.Exec(string(data)); err != nil { + return fmt.Errorf("apply %q: %w", name, err) + } + log.Printf("applied embedded migration %s", name) + } + return nil +} + +// runMigrationsFromDisk preserves the legacy filesystem-path mode for +// operator-supplied custom migrations. +func runMigrationsFromDisk(db *sql.DB, dir string) error { + entries, err := os.ReadDir(dir) + if err != nil { + return fmt.Errorf("read migrations dir %q: %w", dir, err) + } + names := make([]string, 0, len(entries)) + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + names = append(names, e.Name()) + } + sort.Strings(names) + for _, name := range names { + path := dir + "/" + name data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("read %q: %w", path, err) @@ -182,7 +233,7 @@ func runMigrations(db *sql.DB) error { if _, err := db.Exec(string(data)); err != nil { return fmt.Errorf("apply %q: %w", path, err) } - log.Printf("applied migration %s", e.Name()) + log.Printf("applied disk migration %s (from %s)", name, dir) } return nil } diff --git a/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go b/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go new file mode 100644 index 00000000..f2f0b785 --- /dev/null +++ b/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "strings" + "testing" +) + +// TestMigrationsEmbedded_ContainsCreateTable pins that the migrations +// are bundled into the binary at build time, NOT loaded from a +// filesystem path that doesn't exist at runtime in the published image. +// +// Pre-fix: PR #2906 shipped the binary without the migrations dir; +// `os.ReadDir("cmd/memory-plugin-postgres/migrations")` errored on every +// tenant boot, the 30s health gate aborted the container, and the +// staging redeploy fleet job marked all tenants as failed. Embedding +// the migrations into the binary removes the runtime path entirely. +func TestMigrationsEmbedded_ContainsCreateTable(t *testing.T) { + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + t.Fatalf("embedded migrations dir unreadable: %v", err) + } + if len(entries) == 0 { + t.Fatal("embedded migrations dir is empty — go:embed pattern matched no files") + } + + var seenUp bool + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + seenUp = true + data, err := migrationsFS.ReadFile("migrations/" + e.Name()) + if err != nil { + t.Errorf("read embedded %q: %v", e.Name(), err) + continue + } + if !strings.Contains(string(data), "CREATE TABLE") { + t.Errorf("embedded %q has no CREATE TABLE — wrong file embedded?", e.Name()) + } + } + if !seenUp { + t.Fatal("no *.up.sql in embedded migrations — runtime would have no schema to apply") + } +} + +// TestRunMigrationsFromEmbed_OrderingIsAlphabetic pins that we apply +// migrations in deterministic alphabetical order, not in whatever +// arbitrary order migrationsFS.ReadDir happens to return. With one +// migration today this is moot, but a future second migration ('002_…') +// MUST run after '001_…' or the schema is broken. +// +// We can't easily exercise db.Exec here (no test DB); instead pin the +// sort step on the directory listing itself. +func TestRunMigrationsFromEmbed_OrderingIsAlphabetic(t *testing.T) { + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + t.Fatalf("embedded migrations dir unreadable: %v", err) + } + var names []string + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + names = append(names, e.Name()) + } + for i := 1; i < len(names); i++ { + if names[i-1] > names[i] { + t.Errorf("ReadDir returned non-sorted names; runMigrationsFromEmbed must sort. "+ + "Got %q before %q", names[i-1], names[i]) + } + } +} diff --git a/workspace-server/entrypoint-tenant.sh b/workspace-server/entrypoint-tenant.sh index 8059cc1c..0f2d6dde 100644 --- a/workspace-server/entrypoint-tenant.sh +++ b/workspace-server/entrypoint-tenant.sh @@ -21,14 +21,23 @@ PORT=3000 HOSTNAME=0.0.0.0 node server.js & CANVAS_PID=$! # Memory v2 sidecar (built-in postgres plugin). See Dockerfile entrypoint -# comment for rationale. Stays inert at the protocol layer until the -# operator sets MEMORY_V2_CUTOVER=true; running it is cheap. +# comment for rationale. # -# Defaults the plugin's DATABASE_URL to the tenant's DATABASE_URL so -# operators don't need to configure two of them. Plugin tables coexist -# with the platform schema. +# Spawn-gating: only start the sidecar when the operator has indicated +# they want it (MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set). +# Without that signal, the sidecar adds zero value and risks aborting +# tenant boot via the 30s health gate when the tenant Postgres lacks +# pgvector. Caught on staging redeploy 2026-05-05: +# pq: extension "vector" is not available +# +# Defaults (when sidecar IS spawned): MEMORY_PLUGIN_DATABASE_URL +# falls back to the tenant's DATABASE_URL. MEMORY_PLUGIN_PID="" -if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then +memory_plugin_wanted="" +if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then + memory_plugin_wanted=1 +fi +if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then : "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}" : "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}" export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR diff --git a/workspace-server/internal/handlers/org_import.go b/workspace-server/internal/handlers/org_import.go index 8f4d9a07..639c8ba9 100644 --- a/workspace-server/internal/handlers/org_import.go +++ b/workspace-server/internal/handlers/org_import.go @@ -21,6 +21,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" "github.com/google/uuid" ) @@ -96,6 +97,16 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX } if existing { log.Printf("Org import: %q already exists (id=%s) — skipping create+provision, recursing into children for partial-match", ws.Name, existingID) + parentRef := "" + if parentID != nil { + parentRef = *parentID + } + provlog.Event("provision.skip_existing", map[string]any{ + "name": ws.Name, + "existing_id": existingID, + "parent_id": parentRef, + "tier": tier, + }) *results = append(*results, map[string]interface{}{ "id": existingID, "name": ws.Name, diff --git a/workspace-server/internal/handlers/provlog_emit_test.go b/workspace-server/internal/handlers/provlog_emit_test.go new file mode 100644 index 00000000..6681c203 --- /dev/null +++ b/workspace-server/internal/handlers/provlog_emit_test.go @@ -0,0 +1,112 @@ +package handlers + +// provlog_emit_test.go — pins that the structured-logging emit sites +// added for #2867 PR-D actually fire when their boundary is crossed. +// +// These are call-site contract tests, not provlog package tests (those +// live next to the helper). The assertion is "this dispatcher path +// emits this event name" — if a refactor moves the call out of the +// boundary helper, the gate fails. Fields are NOT pinned here on +// purpose; the field set is convenience for ops, not contract for the +// emit point. Pinning fields would block additive evolution of the +// payload (see also feedback_behavior_based_ast_gates.md). + +import ( + "bytes" + "context" + "log" + "strings" + "sync" + "testing" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" +) + +// captureProvLog redirects the global logger to a buffer for the test +// duration. provlog.Event uses log.Printf, so this is the only seam. +// Returned mutex protects against concurrent reads from the goroutine +// fired by provisionWorkspaceAuto (the goroutine never returns in +// these tests because Start() is stubbed, but the buffer can still be +// touched by it racing the assertion). +func captureProvLog(t *testing.T) (read func() string) { + t.Helper() + var buf bytes.Buffer + var mu sync.Mutex + prevWriter := log.Writer() + prevFlags := log.Flags() + log.SetFlags(0) + log.SetOutput(&safeWriter{buf: &buf, mu: &mu}) + t.Cleanup(func() { + log.SetOutput(prevWriter) + log.SetFlags(prevFlags) + }) + return func() string { + mu.Lock() + defer mu.Unlock() + return buf.String() + } +} + +// TestProvisionWorkspaceAutoSync_EmitsProvisionStart — sync variant is +// chosen for the assertion path because it returns once the (stubbed) +// Start() has been called, so we know the emit has flushed. The async +// variant would race a goroutine. +func TestProvisionWorkspaceAutoSync_EmitsProvisionStart(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{cpProv: &trackingCPProv{}} + // Best-effort: the body will hit DB code under provisionWorkspaceCP + // — we only need the emit at the entry, which fires unconditionally + // before the dispatch. Recovering from any later panic keeps the + // test focused. + defer func() { _ = recover() }() + h.provisionWorkspaceAutoSync("ws-test-1", "tmpl", nil, models.CreateWorkspacePayload{ + Name: "n", Tier: 4, Runtime: "claude-code", + }) + got := read() + if !strings.Contains(got, "evt: provision.start ") { + t.Fatalf("expected provision.start emit, got log:\n%s", got) + } + if !strings.Contains(got, `"workspace_id":"ws-test-1"`) { + t.Errorf("workspace_id not in payload: %s", got) + } + if !strings.Contains(got, `"sync":true`) { + t.Errorf("sync flag not pinned for sync dispatcher: %s", got) + } +} + +// TestStopForRestart_EmitsRestartPreStop — emit fires before the actual +// Stop call, so the trackingCPProv stub doesn't need to be wired for +// real Stop semantics. Backend label "cp" pinned because that's the +// SaaS path; we don't pin "docker" or "none" branches here (separate +// tests would only re-test the trivial branch label switch). +func TestStopForRestart_EmitsRestartPreStop(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{cpProv: &trackingCPProv{}} + defer func() { _ = recover() }() + h.stopForRestart(context.Background(), "ws-restart-1") + got := read() + if !strings.Contains(got, "evt: restart.pre_stop ") { + t.Fatalf("expected restart.pre_stop emit, got log:\n%s", got) + } + if !strings.Contains(got, `"workspace_id":"ws-restart-1"`) { + t.Errorf("workspace_id not in payload: %s", got) + } + if !strings.Contains(got, `"backend":"cp"`) { + t.Errorf("backend label missing or wrong: %s", got) + } +} + +// TestStopForRestart_EmitsBackendNoneWhenUnwired — pin the no-backend +// branch so a future refactor that drops the label switch is caught. +// This is the silent-Stop case (workspace_dispatchers.go:StopWorkspaceAuto +// returns nil for unwired backends); the emit ensures the operator can +// still see the boundary in the log. +func TestStopForRestart_EmitsBackendNoneWhenUnwired(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{} // both nil + h.stopForRestart(context.Background(), "ws-restart-2") + got := read() + if !strings.Contains(got, `"backend":"none"`) { + t.Fatalf("expected backend=none for unwired handler: %s", got) + } +} diff --git a/workspace-server/internal/handlers/team.go b/workspace-server/internal/handlers/team.go deleted file mode 100644 index 0c536020..00000000 --- a/workspace-server/internal/handlers/team.go +++ /dev/null @@ -1,132 +0,0 @@ -package handlers - -import ( - "encoding/json" - "log" - "net/http" - "os" - "path/filepath" - - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" - "github.com/gin-gonic/gin" - "gopkg.in/yaml.v3" -) - -// TeamHandler now hosts only Collapse — the visual "expand" action is -// canvas-side and creating children goes through the regular -// WorkspaceHandler.Create path with parent_id set, like any other -// workspace. Every workspace can have children; "team" is just the -// state of having children. The old Expand handler bulk-created -// children by reading sub_workspaces from a parent's config and was -// non-idempotent — calling it N times leaked N×children EC2s, which -// is how tenant-hongming accumulated 72 stale workspaces. -type TeamHandler struct { - wh *WorkspaceHandler - b *events.Broadcaster -} - -// NewTeamHandler constructs a TeamHandler. wh is used by Collapse to -// route StopWorkspaceAuto through the backend dispatcher. -func NewTeamHandler(b *events.Broadcaster, wh *WorkspaceHandler, platformURL, configsDir string) *TeamHandler { - return &TeamHandler{wh: wh, b: b} -} - -// Collapse handles POST /workspaces/:id/collapse -// Stops and removes all child workspaces. -func (h *TeamHandler) Collapse(c *gin.Context) { - parentID := c.Param("id") - ctx := c.Request.Context() - - // Find children - rows, err := db.DB.QueryContext(ctx, - `SELECT id, name FROM workspaces WHERE parent_id = $1 AND status != 'removed'`, parentID) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query children"}) - return - } - defer rows.Close() - - removed := make([]string, 0) - for rows.Next() { - var childID, childName string - if rows.Scan(&childID, &childName) != nil { - continue - } - - // Stop the workload via the backend dispatcher (CP for SaaS, - // Docker for self-hosted). Pre-2026-05-05 this was - // `if h.provisioner != nil { h.provisioner.Stop(...) }`, which - // silently skipped on every SaaS tenant — child EC2s kept running - // after team-collapse until the orphan sweeper caught them - // (issue #2813). - if err := h.wh.StopWorkspaceAuto(ctx, childID); err != nil { - log.Printf("Team collapse: stop %s failed: %v — orphan sweeper will reconcile", childID, err) - } - - // Mark as removed - if _, err := db.DB.ExecContext(ctx, - `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusRemoved, childID); err != nil { - log.Printf("Team collapse: failed to remove workspace %s: %v", childID, err) - } - if _, err := db.DB.ExecContext(ctx, - `DELETE FROM canvas_layouts WHERE workspace_id = $1`, childID); err != nil { - log.Printf("Team collapse: failed to delete layout for %s: %v", childID, err) - } - - h.b.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", childID, map[string]interface{}{}) - - removed = append(removed, childName) - } - - h.b.RecordAndBroadcast(ctx, "WORKSPACE_COLLAPSED", parentID, map[string]interface{}{ - "removed_children": removed, - }) - - c.JSON(http.StatusOK, gin.H{ - "status": "collapsed", - "removed": removed, - }) -} - -// findTemplateDirByName resolves a workspace name to its template -// directory. Kept here because callers outside this package may use -// it, even though the in-package consumer (Expand) is gone. -// -// TODO: relocate alongside the templates handler if no other callers -// surface, or delete entirely after a deprecation cycle. -func findTemplateDirByName(configsDir, name string) string { - normalized := normalizeName(name) - - candidate := filepath.Join(configsDir, normalized) - if _, err := os.Stat(filepath.Join(candidate, "config.yaml")); err == nil { - return candidate - } - - // Fall back to scanning all dirs - entries, err := os.ReadDir(configsDir) - if err != nil { - return "" - } - for _, e := range entries { - if !e.IsDir() { - continue - } - cfgPath := filepath.Join(configsDir, e.Name(), "config.yaml") - data, err := os.ReadFile(cfgPath) - if err != nil { - continue - } - var cfg struct { - Name string `yaml:"name"` - } - if json.Unmarshal(data, &cfg) == nil && cfg.Name == name { - return filepath.Join(configsDir, e.Name()) - } - if yaml.Unmarshal(data, &cfg) == nil && cfg.Name == name { - return filepath.Join(configsDir, e.Name()) - } - } - return "" -} diff --git a/workspace-server/internal/handlers/team_test.go b/workspace-server/internal/handlers/team_test.go deleted file mode 100644 index e87a92ae..00000000 --- a/workspace-server/internal/handlers/team_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package handlers - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "testing" - - "github.com/DATA-DOG/go-sqlmock" - "github.com/gin-gonic/gin" -) - -// ---------- TeamHandler: Collapse ---------- - -func TestTeamCollapse_NoChildren(t *testing.T) { - mock := setupTestDB(t) - setupTestRedis(t) - broadcaster := newTestBroadcaster() - handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs") - - // No children - mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). - WithArgs("ws-parent"). - WillReturnRows(sqlmock.NewRows([]string{"id", "name"})) - - // WORKSPACE_COLLAPSED broadcast - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Params = gin.Params{{Key: "id", Value: "ws-parent"}} - c.Request = httptest.NewRequest("POST", "/", nil) - - handler.Collapse(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var resp map[string]interface{} - json.Unmarshal(w.Body.Bytes(), &resp) - if resp["status"] != "collapsed" { - t.Errorf("expected status 'collapsed', got %v", resp["status"]) - } -} - -func TestTeamCollapse_WithChildren(t *testing.T) { - mock := setupTestDB(t) - setupTestRedis(t) - broadcaster := newTestBroadcaster() - handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs") - - // Two children - mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). - WithArgs("ws-parent"). - WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). - AddRow("child-1", "Worker A"). - AddRow("child-2", "Worker B")) - - // UPDATE + DELETE + broadcast for child-1 - mock.ExpectExec("UPDATE workspaces SET status ="). - WithArgs("child-1"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM canvas_layouts"). - WithArgs("child-1"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // UPDATE + DELETE + broadcast for child-2 - mock.ExpectExec("UPDATE workspaces SET status ="). - WithArgs("child-2"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM canvas_layouts"). - WithArgs("child-2"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // WORKSPACE_COLLAPSED broadcast for parent - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Params = gin.Params{{Key: "id", Value: "ws-parent"}} - c.Request = httptest.NewRequest("POST", "/", nil) - - handler.Collapse(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var resp map[string]interface{} - json.Unmarshal(w.Body.Bytes(), &resp) - removed, ok := resp["removed"].([]interface{}) - if !ok || len(removed) != 2 { - t.Errorf("expected 2 removed children, got %v", resp["removed"]) - } -} -// ---------- findTemplateDirByName helper ---------- - -func TestFindTemplateDirByName_DirectMatch(t *testing.T) { - dir := t.TempDir() - subDir := filepath.Join(dir, "mybot") - os.MkdirAll(subDir, 0755) - os.WriteFile(filepath.Join(subDir, "config.yaml"), []byte("name: MyBot"), 0644) - - result := findTemplateDirByName(dir, "mybot") - if result != subDir { - t.Errorf("expected %s, got %s", subDir, result) - } -} - -func TestFindTemplateDirByName_NotFound(t *testing.T) { - dir := t.TempDir() - result := findTemplateDirByName(dir, "nonexistent") - if result != "" { - t.Errorf("expected empty string, got %s", result) - } -} - -func TestFindTemplateDirByName_InvalidConfigsDir(t *testing.T) { - result := findTemplateDirByName("/nonexistent/path", "anything") - if result != "" { - t.Errorf("expected empty string for invalid dir, got %s", result) - } -} diff --git a/workspace-server/internal/handlers/workspace_dispatchers.go b/workspace-server/internal/handlers/workspace_dispatchers.go index 18ede255..3df25877 100644 --- a/workspace-server/internal/handlers/workspace_dispatchers.go +++ b/workspace-server/internal/handlers/workspace_dispatchers.go @@ -35,6 +35,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" ) // HasProvisioner reports whether either backend (CP or local Docker) is @@ -101,6 +102,14 @@ func (h *WorkspaceHandler) DefaultTier() int { // lives in prepareProvisionContext (shared by both per-backend // goroutines). func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool { + provlog.Event("provision.start", map[string]any{ + "workspace_id": workspaceID, + "name": payload.Name, + "tier": payload.Tier, + "runtime": payload.Runtime, + "template": payload.Template, + "sync": false, + }) if h.cpProv != nil { go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) return true @@ -136,6 +145,14 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri // Keep these two helpers in sync — when one grows a new arm (third // backend, retry semantics), the other should too. func (h *WorkspaceHandler) provisionWorkspaceAutoSync(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool { + provlog.Event("provision.start", map[string]any{ + "workspace_id": workspaceID, + "name": payload.Name, + "tier": payload.Tier, + "runtime": payload.Runtime, + "template": payload.Template, + "sync": true, + }) if h.cpProv != nil { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) return true diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 3b3097c4..c5712be5 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -12,6 +12,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/gin-gonic/gin" ) @@ -431,6 +432,16 @@ func coalesceRestart(workspaceID string, cycle func()) { // NPE'd before reaching the reprovision step — which is why every SaaS dead- // agent incident pre-this-fix required manual restart from canvas. func (h *WorkspaceHandler) stopForRestart(ctx context.Context, workspaceID string) { + backend := "none" + if h.provisioner != nil { + backend = "docker" + } else if h.cpProv != nil { + backend = "cp" + } + provlog.Event("restart.pre_stop", map[string]any{ + "workspace_id": workspaceID, + "backend": backend, + }) if h.provisioner != nil { h.provisioner.Stop(ctx, workspaceID) return diff --git a/workspace-server/internal/provisioner/cp_provisioner.go b/workspace-server/internal/provisioner/cp_provisioner.go index edc67d9f..bdc5bff7 100644 --- a/workspace-server/internal/provisioner/cp_provisioner.go +++ b/workspace-server/internal/provisioner/cp_provisioner.go @@ -14,6 +14,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" ) // CPProvisionerAPI is the contract WorkspaceHandler uses to talk to the @@ -214,6 +215,13 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, } log.Printf("CP provisioner: workspace %s → EC2 instance %s (%s)", cfg.WorkspaceID, result.InstanceID, result.State) + provlog.Event("provision.ec2_started", map[string]any{ + "workspace_id": cfg.WorkspaceID, + "instance_id": result.InstanceID, + "state": result.State, + "tier": cfg.Tier, + "runtime": cfg.Runtime, + }) return result.InstanceID, nil } @@ -273,6 +281,10 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { return fmt.Errorf("cp provisioner: stop %s: unexpected %d: %s", workspaceID, resp.StatusCode, strings.TrimSpace(string(body))) } + provlog.Event("provision.ec2_stopped", map[string]any{ + "workspace_id": workspaceID, + "instance_id": instanceID, + }) return nil } diff --git a/workspace-server/internal/provlog/provlog.go b/workspace-server/internal/provlog/provlog.go new file mode 100644 index 00000000..4434c238 --- /dev/null +++ b/workspace-server/internal/provlog/provlog.go @@ -0,0 +1,48 @@ +// Package provlog emits structured, single-line JSON log records for +// provisioning-lifecycle boundaries (workspace create, EC2 start/stop, +// restart, idempotency skips). Records share a stable `evt:` prefix and +// JSON payload so a future grep|jq pipeline (or a Loki/Datadog ingest) +// can reconstruct the per-workspace timeline without parsing the +// human-prose log lines that already exist. +// +// Existing log.Printf lines are intentionally NOT replaced — they +// remain the operator-facing message. Event() emits a paired structured +// record alongside, additive only. +// +// Event taxonomy (extend by appending; never rename): +// +// provision.start — workspace row inserted, EC2 about to launch +// provision.skip_existing — idempotency hit, no new EC2 +// provision.ec2_started — RunInstances returned an instance id +// provision.ec2_stopped — TerminateInstances acknowledged +// restart.pre_stop — Restart handler about to call Stop +// +// Required fields per event are documented at each call site. +package provlog + +import ( + "encoding/json" + "log" +) + +// Event writes a single line of the form: +// +// evt: {"k":"v",...} +// +// to the standard logger. JSON encoding errors are silently swallowed — +// a logging helper must never panic the request path. fields may be +// nil; the empty payload `{}` is still useful to mark an event boundary. +func Event(name string, fields map[string]any) { + if fields == nil { + fields = map[string]any{} + } + payload, err := json.Marshal(fields) + if err != nil { + // Fall back to a static payload so the event boundary still + // appears in the log. The marshal error itself is recorded + // on a best-effort basis. + log.Printf("evt: %s {\"_marshal_err\":%q}", name, err.Error()) + return + } + log.Printf("evt: %s %s", name, payload) +} diff --git a/workspace-server/internal/provlog/provlog_test.go b/workspace-server/internal/provlog/provlog_test.go new file mode 100644 index 00000000..7d2f5f5f --- /dev/null +++ b/workspace-server/internal/provlog/provlog_test.go @@ -0,0 +1,97 @@ +package provlog + +import ( + "bytes" + "encoding/json" + "log" + "strings" + "testing" +) + +// captureLog redirects the default logger to a buffer for the duration +// of fn and returns whatever was written. +func captureLog(t *testing.T, fn func()) string { + t.Helper() + var buf bytes.Buffer + prevWriter := log.Writer() + prevFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) // strip date/time so assertions stay deterministic + t.Cleanup(func() { + log.SetOutput(prevWriter) + log.SetFlags(prevFlags) + }) + fn() + return buf.String() +} + +func TestEvent_EmitsEvtPrefixAndJSONPayload(t *testing.T) { + out := captureLog(t, func() { + Event("provision.start", map[string]any{ + "workspace_id": "ws-123", + "tier": 4, + "runtime": "claude-code", + }) + }) + out = strings.TrimSpace(out) + if !strings.HasPrefix(out, "evt: provision.start ") { + t.Fatalf("expected evt-prefixed line, got %q", out) + } + jsonPart := strings.TrimPrefix(out, "evt: provision.start ") + var got map[string]any + if err := json.Unmarshal([]byte(jsonPart), &got); err != nil { + t.Fatalf("payload not valid JSON: %v (raw=%q)", err, jsonPart) + } + if got["workspace_id"] != "ws-123" { + t.Errorf("workspace_id field lost: %+v", got) + } + // JSON unmarshal turns numbers into float64 — exact-equal compare. + if got["tier"].(float64) != 4 { + t.Errorf("tier field lost: %+v", got) + } + if got["runtime"] != "claude-code" { + t.Errorf("runtime field lost: %+v", got) + } +} + +func TestEvent_NilFieldsEmitsEmptyObject(t *testing.T) { + out := captureLog(t, func() { + Event("restart.pre_stop", nil) + }) + if !strings.Contains(out, "evt: restart.pre_stop {}") { + t.Fatalf("nil fields should emit empty object, got %q", out) + } +} + +func TestEvent_PreservesEventBoundaryOnUnmarshalableValue(t *testing.T) { + // A channel cannot be marshaled by encoding/json — verify we still + // emit the event boundary with a recorded marshal error. This is + // the structural guarantee: the call site never sees a panic, and + // the event name is always present in the log. + out := captureLog(t, func() { + Event("provision.ec2_started", map[string]any{ + "chan": make(chan int), + }) + }) + if !strings.Contains(out, "evt: provision.ec2_started ") { + t.Fatalf("event boundary missing on marshal error: %q", out) + } + if !strings.Contains(out, "_marshal_err") { + t.Fatalf("expected _marshal_err sentinel, got %q", out) + } +} + +func TestEvent_SingleLineOutput(t *testing.T) { + // Log aggregators line-split on \n. A multi-line emit would silently + // fragment the JSON across two records — pin single-line shape. + out := captureLog(t, func() { + Event("provision.skip_existing", map[string]any{ + "existing_id": "ws-abc", + "name": "child-1", + }) + }) + trimmed := strings.TrimRight(out, "\n") + if strings.Contains(trimmed, "\n") { + t.Fatalf("event line must be single-line, got %q", out) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index d6d7b2d7..ae928f2f 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -243,13 +243,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // entire platform. Gated behind AdminAuth (issue #180). r.GET("/approvals/pending", middleware.AdminAuth(db.DB), apph.ListAll) - // Team handlers — Collapse only. The bulk-Expand path is gone: - // every workspace can have children via the regular CreateWorkspace - // flow with parent_id set, so a separate handler that bulk-creates - // from sub_workspaces (and was non-idempotent — calling it twice - // duplicated the team) earned its way out. - teamh := handlers.NewTeamHandler(broadcaster, wh, platformURL, configsDir) - wsAuth.POST("/collapse", teamh.Collapse) + // (TeamHandler is gone — #2864.) The visual canvas Collapse + // button calls PATCH /workspaces/:id { collapsed: true/false } + // (presentational toggle on canvas_layouts), NOT the destructive + // POST /collapse that stopped + removed children. The + // destructive route had zero UI callers (verified via grep + // across canvas/, scripts/, and the MCP tool registry — only + // docs referenced it). team.go + team_test.go + the route + // + helpers (findTemplateDirByName, NewTeamHandler) are + // deleted; visual collapse is unaffected. // Agents ah := handlers.NewAgentHandler(broadcaster) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c7ea895..cff95c6d 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -553,10 +553,26 @@ def _poll_once( # Imported lazily at use-site so a runtime that never sees an # upload-receive row never imports the module. Cheap on the hot # path because Python caches the import. - from inbox_uploads import is_chat_upload_row, fetch_and_stage + from inbox_uploads import is_chat_upload_row, BatchFetcher new_count = 0 last_id: str | None = None + # ``batch_fetcher`` is lazy: a poll batch with no upload rows pays + # zero overhead. Once the first upload row appears we open one + # BatchFetcher and submit every subsequent upload row to its thread + # pool; before processing the FIRST non-upload row we drain the + # pool (wait_all) so the URI cache is hot when message rewriting + # runs. Without the barrier, the chat message that references the + # upload would arrive at the agent with the un-rewritten + # platform-pending: URI. + batch_fetcher: BatchFetcher | None = None + + def _drain_uploads(bf: BatchFetcher | None) -> None: + if bf is None: + return + bf.wait_all() + bf.close() + for row in rows: if not isinstance(row, dict): continue @@ -570,14 +586,21 @@ def _poll_once( # message_from_activity. We DO advance the cursor past # this row so a permanent network outage on /content # doesn't stall the cursor and block real chat traffic. - fetch_and_stage( - row, - platform_url=platform_url, - workspace_id=workspace_id, - headers=headers, - ) + if batch_fetcher is None: + batch_fetcher = BatchFetcher( + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + batch_fetcher.submit(row) last_id = str(row.get("id", "")) or last_id continue + # Non-upload row: drain any pending uploads first so the URI + # cache is populated before we run rewrite_request_body / + # message_from_activity on a row that may reference one. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with @@ -612,6 +635,13 @@ def _poll_once( last_id = message.activity_id new_count += 1 + # Drain any uploads still in flight if the batch ended with upload + # rows (no chat-message row to trigger the inline drain). Without + # this, a future poll that picks up the chat-message row first + # would race with the still-running fetches. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + if last_id is not None: state.save_cursor(last_id, cursor_key) return new_count @@ -654,6 +684,7 @@ def start_poller_thread( platform_url: str, workspace_id: str, interval: float = POLL_INTERVAL_SECONDS, + stop_event: threading.Event | None = None, ) -> threading.Thread: """Spawn the poller as a daemon thread. Returns the Thread handle. @@ -665,13 +696,18 @@ def start_poller_thread( operator running ``ps -eL`` or eyeballing ``threading.enumerate()`` can tell which thread is which without reverse-engineering it from crash tracebacks. + + Pass ``stop_event`` to enable graceful shutdown — used by tests so + the daemon thread doesn't outlive the test that started it and race + with later tests' httpx patches. Production code passes None and + relies on the daemon flag for process-exit cleanup. """ name = "molecule-mcp-inbox-poller" if workspace_id: name = f"{name}-{workspace_id[:8]}" t = threading.Thread( target=_poll_loop, - args=(state, platform_url, workspace_id, interval), + args=(state, platform_url, workspace_id, interval, stop_event), name=name, daemon=True, ) diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 798f18de..69fa53aa 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted. """ from __future__ import annotations +import concurrent.futures import logging import mimetypes import os @@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024 # 10s default for /activity calls — both are user-perceived latency. DEFAULT_FETCH_TIMEOUT = 60.0 +# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom +# for the realistic "user dragged 3-4 files into chat at once" case +# while bounding the platform's per-workspace fan-out. The cap matters +# because the platform's /content endpoint reads bytea from Postgres in +# a single round-trip per request — N workers = N concurrent DB reads +# of up to 25 MB each, so a higher cap could pressure platform memory +# without much UX win (network bandwidth is the bottleneck once the +# bytes are buffered). +DEFAULT_BATCH_FETCH_WORKERS = 4 + +# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox +# poll loop before giving up on still-in-flight fetches. Aligned with +# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop +# longer than its own deadline. A timeout fires only if a worker thread +# is stuck past the underlying httpx timeout — pathological case; +# normal completion is bounded by per-fetch timeout × ceil(N/W). +DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0 + # Cap on the URI cache. A long-lived workspace handling thousands of # uploads shouldn't grow without bound; an LRU cap of 1024 keeps the # entries-needed-for-a-typical-conversation well within memory. @@ -275,6 +294,7 @@ def fetch_and_stage( workspace_id: str, headers: dict[str, str], timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + client: Any = None, ) -> str | None: """Fetch the row's bytes, stage them under chat-uploads, and ack. @@ -289,6 +309,11 @@ def fetch_and_stage( On success, the URI cache is updated so a subsequent chat message referencing the same ``platform-pending:`` URI is rewritten before the agent sees it. + + Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and + POST ack (saves one TLS handshake per row vs. constructing one + per-call). ``BatchFetcher`` does this across an entire poll batch so + N concurrent fetches share one connection pool. """ body = _request_body_dict(row) if body is None: @@ -317,25 +342,58 @@ def fetch_and_stage( if not isinstance(filename, str): filename = "file" - # Lazy httpx import: the standalone MCP path uses httpx; an in- - # container caller that imports this module by accident shouldn't - # explode at import time. - try: - import httpx # noqa: WPS433 - except ImportError: - logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) - return None + # Caller-supplied client: reuse for both GET + POST ack. Otherwise + # build a one-shot client and close it on the way out. Lazy httpx + # import keeps the standalone MCP path's optional dep optional. + own_client = client is None + if own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + client = httpx.Client(timeout=timeout_secs) + try: + return _fetch_and_stage_with_client( + client, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + file_id=file_id, + pending_uri=pending_uri, + filename=filename, + body=body, + ) + finally: + if own_client: + try: + client.close() + except Exception: # noqa: BLE001 — close should never crash the caller + pass + + +def _fetch_and_stage_with_client( + client: Any, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + file_id: str, + pending_uri: str, + filename: str, + body: dict[str, Any], +) -> str | None: + """Inner body of fetch_and_stage. Always uses the supplied client for + both GET and POST so the connection pool is shared across the call. + """ content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" try: - with httpx.Client(timeout=timeout_secs) as client: - resp = client.get(content_url, headers=headers) + resp = client.get(content_url, headers=headers) except Exception as exc: # noqa: BLE001 - logger.warning( - "inbox_uploads: GET %s failed: %s", content_url, exc - ) + logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc) return None if resp.status_code == 404: @@ -403,8 +461,7 @@ def fetch_and_stage( # back the on-disk file — the platform's sweep will clean up # eventually. try: - with httpx.Client(timeout=timeout_secs) as client: - ack_resp = client.post(ack_url, headers=headers) + ack_resp = client.post(ack_url, headers=headers) if ack_resp.status_code >= 400: logger.warning( "inbox_uploads: ack %s returned %d: %s", @@ -418,6 +475,198 @@ def fetch_and_stage( return local_uri +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch across a single poll batch +# --------------------------------------------------------------------------- + + +class BatchFetcher: + """Fetch + stage + ack a batch of upload-receive rows concurrently. + + Why this exists: the inbox poll loop used to call ``fetch_and_stage`` + serially per row. With N upload rows in a batch (a user dragging + multiple files into chat at once), the loop blocked for + ``N × per_fetch_latency`` before processing the chat message that + referenced them — a 4-file upload at 5s each = 20s of stall + before the agent saw the user's prompt. ``BatchFetcher`` runs the + fetches on a small thread pool (default 4 workers) so the stall is + bounded by ``ceil(N/W) × per_fetch_latency`` instead. + + Connection reuse: one ``httpx.Client`` is shared across every fetch + in the batch. httpx clients carry a connection pool, so a second + fetch to the same platform host reuses the TCP+TLS handshake from + the first — measurable win when fetches happen back-to-back. + + Correctness invariant the caller MUST preserve: the inbox loop is + expected to call ``wait_all()`` before processing the chat-message + activity row that REFERENCES one of these uploads. Without the + barrier, the URI cache is empty when ``rewrite_request_body`` runs + and the agent sees the un-rewritten ``platform-pending:`` URI. The + caller-side test ``test_poll_once_waits_for_uploads_before_messages`` + pins this end-to-end. + + Use as a context manager so the executor + client are torn down + even if the caller raises mid-batch. + """ + + def __init__( + self, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + max_workers: int = DEFAULT_BATCH_FETCH_WORKERS, + client: Any = None, + ): + self._platform_url = platform_url + self._workspace_id = workspace_id + self._headers = dict(headers) # copy so caller mutations don't leak in + self._timeout_secs = timeout_secs + + # Caller can inject a client (tests do this); production callers + # let us build one. Track ownership so we only close ours. + self._own_client = client is None + if self._own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + # Match fetch_and_stage's behavior: log + degrade rather + # than raising at construction time. submit() will then + # return None for every row. + logger.error("inbox_uploads: httpx not installed; BatchFetcher inert") + self._client: Any = None + else: + self._client = httpx.Client(timeout=timeout_secs) + else: + self._client = client + + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="upload-fetch", + ) + self._futures: list[concurrent.futures.Future[Any]] = [] + self._closed = False + # Flipped to True by wait_all when the timeout fires; close() + # reads this to decide between drain-and-wait vs cancel-queued. + self._timed_out = False + + def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: + """Submit ``row`` for fetch + stage + ack. Non-blocking — the + worker thread runs ``fetch_and_stage`` with the shared client. + + Returns the Future so a caller that wants per-row outcome can + await it; ``None`` if the BatchFetcher is in a degraded state + (httpx missing). + """ + if self._closed: + raise RuntimeError("BatchFetcher: submit after close") + if self._client is None: + return None + fut = self._executor.submit( + fetch_and_stage, + row, + platform_url=self._platform_url, + workspace_id=self._workspace_id, + headers=self._headers, + timeout_secs=self._timeout_secs, + client=self._client, + ) + self._futures.append(fut) + return fut + + def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None: + """Block until every submitted future completes (or times out). + + Per-future exceptions are logged + swallowed — ``fetch_and_stage`` + already converts every error path to ``return None``, so a real + exception propagating up to here is unexpected and we don't want + one bad fetch to abort the whole batch. + + Timeouts are also logged + swallowed AND record the timed-out + futures on ``self._timed_out`` so ``close`` can cancel them + without paying their full latency. Without this hand-off, + ``close()``'s ``shutdown(wait=True)`` would block on the leaked + workers and undo the user-facing timeout — the inbox poll loop + would stall indefinitely on a hung /content fetch. + """ + if not self._futures: + return + try: + done, not_done = concurrent.futures.wait( + self._futures, + timeout=timeout, + return_when=concurrent.futures.ALL_COMPLETED, + ) + except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here + logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc) + return + for fut in done: + exc = fut.exception() + if exc is not None: + logger.warning( + "inbox_uploads: BatchFetcher worker raised: %s", exc + ) + if not_done: + logger.warning( + "inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout", + len(not_done), + timeout, + ) + # Mark these futures so close() knows to cancel-not-wait. We + # cancel queued-but-not-started ones immediately; futures + # already running can't be cancelled (Python's threading + # model), but close() will pass cancel_futures=True so any + # remaining queued items don't run. + for fut in not_done: + fut.cancel() + self._timed_out = True + + def close(self) -> None: + """Tear down the executor + (if owned) the httpx client. + + Idempotent. After close, ``submit`` raises and the BatchFetcher + cannot be reused — construct a fresh one for the next poll. + + If ``wait_all`` reported a timeout, shutdown skips the + ``wait=True`` drain and instead asks the executor to drop queued + futures (``cancel_futures=True``). Currently-running workers + can't be interrupted by Python's threading model, but the poll + loop returns immediately rather than blocking on a hung fetch. + """ + if self._closed: + return + self._closed = True + timed_out = getattr(self, "_timed_out", False) + try: + if timed_out: + # cancel_futures landed in Python 3.9 — guarded for older + # interpreters via a TypeError fallback. Drop queued + # tasks; running ones will exit when their httpx call + # eventually returns or the daemon thread dies. + try: + self._executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + self._executor.shutdown(wait=False) + else: + # Healthy path: wait for in-flight work so we don't + # interrupt a fetch mid-write. + self._executor.shutdown(wait=True) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: executor shutdown error: %s", exc) + if self._own_client and self._client is not None: + try: + self._client.close() + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: client close error: %s", exc) + + def __enter__(self) -> "BatchFetcher": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + # --------------------------------------------------------------------------- # URI rewrite for incoming chat messages # --------------------------------------------------------------------------- diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 162c32c2..cbba9a3b 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -555,16 +555,34 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat def test_start_poller_thread_is_daemon(state: inbox.InboxState): """Daemon flag is required so the poller dies with the parent process; a non-daemon poller would leak across `claude` restarts - and write to a stale workspace.""" + and write to a stale workspace. + + Stop_event is plumbed so the thread cleans up at the end of the + test instead of leaking into later tests. Without cleanup, the + daemon's ~10ms tick races with later tests that patch httpx.Client + — the leaked thread sees their patched response and runs an + unwanted iteration of _poll_once that double-counts mocked calls + (caught when test_batch_fetcher_owns_client_when_not_supplied + surfaced this on Python 3.11 CI but not 3.13 local). + """ resp = _make_response(200, []) p, _ = _patch_httpx(resp) + stop_event = threading.Event() with p, patch("platform_auth.auth_headers", return_value={}): # Use a very short interval so the loop body runs at least once # before we exit the test. - t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01) + t = inbox.start_poller_thread( + state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event + ) time.sleep(0.05) - assert t.daemon is True - assert t.is_alive() + assert t.daemon is True + assert t.is_alive() + # Signal shutdown + wait for the thread to actually exit before + # we leave the test scope. Without this join, the leaked thread + # races with later tests' httpx patches. + stop_event.set() + t.join(timeout=2.0) + assert not t.is_alive(), "poller thread did not exit on stop_event" # --------------------------------------------------------------------------- @@ -577,6 +595,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor" +# --------------------------------------------------------------------------- +# Phase 5b — BatchFetcher integration with the poll loop +# --------------------------------------------------------------------------- +# +# These tests pin the cross-module contract between inbox._poll_once and +# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted +# to a single BatchFetcher AND drained (URI cache populated) before any +# subsequent message row is processed. Without the drain, the +# rewrite_request_body path inside message_from_activity surfaces the +# un-rewritten ``platform-pending:`` URI to the agent. + + +def _upload_row(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "chat_upload_receive", + "summary": f"chat_upload_receive: {file_id}.pdf", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 3, + }, + "created_at": "2026-05-04T10:00:00Z", + } + + +def _message_row_referencing(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "have a look"}, + { + "kind": "file", + "file": { + "uri": f"platform-pending:ws-1/{file_id}", + "name": f"{file_id}.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + } + + +def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"): + """Replace ``httpx.Client`` so: + + - GET /activity returns ``activity_rows`` + - GET /workspaces/.../content returns ``upload_bytes`` with content-type + - POST /ack returns 200 + + Returns the patch context manager; tests use ``with p:``. Each new + Client(...) gets a fresh MagicMock so the test can verify + constructor-count expectations without pinning singletons. + """ + def _client_factory(*args, **kwargs): + c = MagicMock() + c.__enter__ = MagicMock(return_value=c) + c.__exit__ = MagicMock(return_value=False) + + def _get(url, params=None, headers=None): + if "/activity" in url: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = activity_rows + resp.text = "" + return resp + if "/pending-uploads/" in url and "/content" in url: + resp = MagicMock() + resp.status_code = 200 + resp.content = upload_bytes + resp.headers = {"content-type": "application/pdf"} + resp.text = "" + return resp + resp = MagicMock() + resp.status_code = 404 + resp.text = "" + return resp + + def _post(url, headers=None): + resp = MagicMock() + resp.status_code = 200 + resp.text = "" + return resp + + c.get = MagicMock(side_effect=_get) + c.post = MagicMock(side_effect=_post) + c.close = MagicMock() + return c + + return patch("httpx.Client", side_effect=_client_factory) + + +def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path): + """The chat-message row's file.uri MUST be rewritten to the local + workspace: URI by the time it lands in the InboxState queue. This + requires BatchFetcher.wait_all() to run before message_from_activity + on the second row. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + # Sandbox the on-disk staging dir so the test can't pollute the + # workspace's real chat-uploads. + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [ + _upload_row("act-1", "file-A"), + _message_row_referencing("act-2", "file-A"), + ] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + + assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)" + queued = state.peek(10) + assert len(queued) == 1 + # The contract this test exists to pin: the platform-pending: URI + # was rewritten to workspace: BEFORE the message landed in the + # state queue. message_from_activity mutates row['request_body'] + # in-place, so the rewritten URI is observable on the row dict + # we passed in. + rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1] + assert rewritten_part["file"]["uri"].startswith("workspace:"), ( + f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; " + "rewrite_request_body ran before BatchFetcher.wait_all populated the cache" + ) + # Cursor advanced past BOTH rows — upload-receive (act-1) is + # acknowledged via the inbox cursor regardless of fetch outcome. + assert state.load_cursor() == "act-2" + + +def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path): + """End-of-batch drain: a poll that contains ONLY upload rows (no + chat-message row to trigger the inline drain) must still drain the + BatchFetcher before _poll_once returns. Otherwise a future poll + that picks up the corresponding chat-message row would race with + in-flight fetches from the previous batch. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + # By the time _poll_once returned, the URI cache must be hot + # for both file_ids — proves the end-of-loop drain ran. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + # Upload rows are NOT message rows; queue stays empty. + assert n == 0 + # Cursor advances past both upload rows. + assert state.load_cursor() == "act-2" + + +def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState): + """A batch with no upload-receive rows must not pay the BatchFetcher + construction cost — the executor + httpx client allocation is + deferred until the first upload row appears. + """ + import inbox_uploads + + constructed: list[Any] = [] + + def _patched_init(self, **kwargs): + constructed.append(kwargs) + # Don't actually run __init__; we never hit submit/wait_all. + self._closed = False + self._futures = [] + self._executor = MagicMock() + self._client = MagicMock() + self._own_client = False + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "hi"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + state.save_cursor("act-old") + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 1 + assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present" + + def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch): """When CONFIGS_DIR is unset, the cursor path resolves through configs_dir.resolve() — /configs in-container, ~/.molecule-workspace diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index 515616e2..37446760 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -695,3 +695,426 @@ def test_rewrite_request_body_handles_non_list_parts(): def test_rewrite_request_body_handles_non_dict_file(): body = {"parts": [{"kind": "file", "file": "not a dict"}]} inbox_uploads.rewrite_request_body(body) # must not raise + + +# --------------------------------------------------------------------------- +# fetch_and_stage with shared client — Phase 5b client-reuse contract +# --------------------------------------------------------------------------- +# +# When a caller passes ``client=`` to fetch_and_stage, that client must be +# used for BOTH the GET /content and the POST /ack — no fresh +# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b +# implementation made one new client for GET and another for ack; the new +# shape lets BatchFetcher share one connection pool across an entire batch. + + +def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + supplied = MagicMock() + supplied.get = MagicMock(return_value=get_resp) + supplied.post = MagicMock(return_value=ack_resp) + # Sentinel: any code path that constructs httpx.Client when one was + # already supplied is a regression — count constructions. + constructed: list[Any] = [] + + class _ShouldNotBeCalled: + def __init__(self, *a, **kw): + constructed.append((a, kw)) + + monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled) + + local_uri = inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={"Authorization": "Bearer t"}, + client=supplied, + ) + assert local_uri is not None + assert constructed == [], "supplied client must be reused; no new Client should be constructed" + # GET + POST ack both went through the supplied client. + supplied.get.assert_called_once() + supplied.post.assert_called_once() + # Caller-owned client must NOT be closed by fetch_and_stage; the + # batch fetcher (or test) closes it once the whole batch is done. + supplied.close.assert_not_called() + + +def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=get_resp) + c.post = MagicMock(return_value=ack_resp) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + # Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one. + assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}" + # Same client must serve BOTH calls. + built[0].get.assert_called_once() + built[0].post.assert_called_once() + # Owned client must be closed by fetch_and_stage on the way out. + built[0].close.assert_called_once() + + +def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client(): + # Even on failure the supplied client must not be closed — the + # BatchFetcher owns the lifecycle for the whole batch. + row = _row(uri="platform-pending:ws-1/file-1") + supplied = MagicMock() + supplied.get = MagicMock(side_effect=RuntimeError("network down")) + supplied.post = MagicMock() # should not be reached on GET failure + inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=supplied, + ) + supplied.close.assert_not_called() + supplied.post.assert_not_called() + + +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch + URI cache barrier +# --------------------------------------------------------------------------- + + +def _row_with_id(act_id: str, file_id: str) -> dict: + """Helper: an upload-receive row with a distinct activity id + file id.""" + return { + "id": act_id, + "method": "chat_upload_receive", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 1, + }, + } + + +def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock: + """Build one MagicMock client that returns per-file_id responses + based on the file_id segment of the URL. + """ + client = MagicMock() + + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + for fid, resp in get_responses.items(): + if f"/pending-uploads/{fid}/content" in url: + return resp + return _make_resp(404) + + def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock: + return _make_resp(200) + + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(side_effect=_post) + return client + + +def test_batch_fetcher_runs_submitted_rows_concurrently(): + # Three rows whose .get() blocks for ~120ms each. With 4 workers the + # batch should complete in ~120ms (parallel), not ~360ms (serial). + # The 250ms ceiling accommodates CI scheduler jitter while still + # discriminating concurrent (~120ms) from serial (~360ms). + import time + + barrier_start = [0.0] + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.12) + for fid in ("a", "b", "c"): + if f"/pending-uploads/{fid}/content" in url: + return _make_resp(200, content=b"X", content_type="text/plain") + return _make_resp(404) + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=4, + ) + barrier_start[0] = time.time() + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + elapsed = time.time() - barrier_start[0] + bf.close() + + assert elapsed < 0.25, ( + f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s " + "(suggests serial execution — Phase 5b regression)" + ) + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated(): + """Pin the correctness invariant: when wait_all returns, the URI + cache is hot for every submitted row. Without this barrier the + inbox loop would process the chat-message row before its uploads + were staged, and rewrite_request_body would surface the un-rewritten + platform-pending: URI to the agent. + """ + import time + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.05) + return _make_resp(200, content=b"data", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() + # Cache must be hot for BOTH rows by the time wait_all returns. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None + + +def test_batch_fetcher_isolates_per_row_failure(): + """One failing fetch must not abort siblings. Sibling rows complete, + URI cache populates for them; the bad row's cache entry stays absent. + """ + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + if "/pending-uploads/bad/content" in url: + return _make_resp(500, text="upstream broken") + return _make_resp(200, content=b"ok", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-1", "good1")) + bf.submit(_row_with_id("act-2", "bad")) + bf.submit(_row_with_id("act-3", "good2")) + bf.wait_all() + + cache = inbox_uploads.get_cache() + assert cache.get("platform-pending:ws-1/good1") is not None + assert cache.get("platform-pending:ws-1/good2") is not None + assert cache.get("platform-pending:ws-1/bad") is None + + +def test_batch_fetcher_reuses_one_client_across_all_submits(): + """Every row in the batch must share the same client instance. This + is the connection-pool-reuse leg of the perf win: a second fetch + to the same host reuses the TCP+TLS handshake from the first. + """ + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + + # 3 GETs + 3 POST acks all on the same client — no per-row Client + # construction. + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_close_idempotent(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + bf.close() # second call must not raise + + +def test_batch_fetcher_submit_after_close_raises(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + with pytest.raises(RuntimeError, match="submit after close"): + bf.submit(_row_with_id("act-x", "x")) + + +def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch): + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + c.post = MagicMock(return_value=_make_resp(200)) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + + assert len(built) == 1, "expected one owned client per BatchFetcher" + built[0].close.assert_called_once() + + +def test_batch_fetcher_does_not_close_supplied_client(): + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + # Supplied client survives the BatchFetcher's close — caller's lifecycle. + client.close.assert_not_called() + + +def test_batch_fetcher_wait_all_no_op_on_empty_batch(): + client = MagicMock() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.wait_all() # nothing submitted; must not block, must not raise + client.get.assert_not_called() + client.post.assert_not_called() + + +def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): + # No client supplied + httpx import fails → BatchFetcher degrades + # gracefully: submit() returns None and the row is silently skipped. + import sys + + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + result = bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None + + +def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers(): + """The deadline contract: when wait_all times out, close() must NOT + block waiting for the leaked worker threads. Otherwise the inbox + poll loop stalls indefinitely on a hung /content fetch — undoing + the user-facing timeout. + + Strategy: build a client whose .get() blocks on a threading.Event + that the test never sets. Submit a row, wait_all with a tiny + timeout, then time close(). If close() drained-and-waited it would + block until we set the event (i.e., forever in this test). + """ + import threading + import time + + blocker = threading.Event() # never set — workers stay running + + def _hang_get(url, headers=None): + # Wait at most ~5s so a buggy implementation eventually unblocks + # the test instead of timing out the whole pytest run, but + # nothing legitimate should reach this fallback. + blocker.wait(timeout=5.0) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_hang_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=1, # serialize so submitting 1 keeps the worker busy + ) + bf.submit(_row_with_id("act-a", "a")) + # Tiny timeout — wait_all must report the future as not_done. + bf.wait_all(timeout=0.05) + t0 = time.time() + bf.close() + elapsed = time.time() - t0 + # Unblock the lingering worker so it doesn't pollute later tests. + blocker.set() + + # Without the cancel-on-timeout fix, close() would block until + # blocker.set() — i.e., the full ~5s. With the fix it returns + # immediately because shutdown(wait=False) doesn't drain. + assert elapsed < 1.0, ( + f"close() blocked for {elapsed:.2f}s after wait_all timeout — " + "cancel-on-timeout regression: close() is draining instead of bailing" + ) + + +def test_batch_fetcher_close_without_timeout_still_drains(): + """Negative leg of the timeout contract: when wait_all completes + cleanly (no timeout), close() must KEEP its drain-and-wait + behavior so a still-queued ack POST isn't dropped mid-write. + """ + import time + + def _slow_get(url, headers=None): + time.sleep(0.05) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=2, + ) + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() # generous default timeout — should not fire + bf.close() + + # All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait. + assert client.get.call_count == 2 + assert client.post.call_count == 2