Merge pull request #2919 from Molecule-AI/staging

staging → main: auto-promote ae22a55
This commit is contained in:
molecule-ai[bot] 2026-05-05 13:08:45 -07:00 committed by GitHub
commit 184ce7ae4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1458 additions and 448 deletions

View File

@ -1,111 +0,0 @@
# Team Expansion (Recursive Workspaces)
When a workspace is expanded into a team, it gains sub-workspaces while its own agent remains as the **team lead** (coordinator). This is recursive — sub-workspaces can themselves be expanded into teams, infinitely deep.
## How It Works
When Developer PM is expanded into a team:
```
Business Core
|
+-- Developer PM (agent stays, becomes coordinator)
|
+-- Frontend Agent (sub-workspace, private scope)
+-- Backend Agent (sub-workspace, private scope)
+-- QA Agent (sub-workspace, private scope)
```
- Developer PM's agent **still exists** and acts as coordinator
- Developer PM receives incoming A2A messages from Business Core
- Developer PM's agent decides how to delegate to sub-workspaces
- Sub-workspaces talk to Developer PM and to each other (same level)
- Sub-workspaces **cannot** talk to Business Core or any workspace outside the team
## Communication Rules
| Direction | Allowed? | Example |
|-----------|----------|---------|
| Parent level -> team lead | Yes | Business Core -> Developer PM |
| Team lead -> sub-workspaces | Yes | Developer PM -> Frontend Agent |
| Sub-workspace -> team lead | Yes | Frontend Agent -> Developer PM |
| Sub-workspace <-> sibling | Yes | Frontend Agent <-> Backend Agent |
| Outside -> sub-workspace directly | No (403) | Business Core -> Frontend Agent |
| Sub-workspace -> outside directly | No | Frontend Agent -> Business Core |
The team lead (Developer PM) is the **only** bridge between the team's internal world and the outside.
## Scoped Registry
Sub-workspaces register in the platform registry but with a **private scope**. The registry knows about them but enforces access control.
```
Registry:
Business Core :8001 scope: public
Developer PM :8002 scope: public
Frontend Agent :8010 scope: private, parent=Developer PM
Backend Agent :8011 scope: private, parent=Developer PM
QA Agent :8012 scope: private, parent=Developer PM
```
- The platform can always discover any workspace (for provisioning, monitoring)
- The parent workspace can discover its sub-workspaces
- Sub-workspaces can discover their siblings (same parent)
- Outside workspaces get a **403 Forbidden** if they try to discover a private sub-workspace
## How to Expand
Expansion is triggered via `POST /workspaces/:id/expand`. The platform reads the `sub_workspaces` list from the workspace's config and provisions each one. On the canvas, users right-click a workspace node and select "Expand into team."
Collapsing is the inverse: `POST /workspaces/:id/collapse`. Sub-workspaces are stopped and removed.
## What Happens on Expansion
When Developer PM is expanded into a team, the hierarchy changes but the outside view doesn't. Business Core's parent/child relationship to Developer PM is unaffected — Developer PM still responds to the same A2A endpoint.
The events fired:
- `WORKSPACE_EXPANDED` with the new `sub_workspace_ids` in the payload
- `WORKSPACE_PROVISIONING` for each new sub-workspace
- `WORKSPACE_ONLINE` for each sub-workspace as they come up
Communication rules are automatically derived from the new hierarchy — no manual wiring needed.
## Canvas Behavior
- Children render as embedded mini-cards (`TeamMemberChip`) inside the parent node, not as separate canvas nodes
- Each mini-card shows full status: gradient bar, name, tier badge, skills pills, active tasks, descendant count
- **Recursive rendering** up to 3 levels deep (`MAX_NESTING_DEPTH = 3`) — sub-cards can contain their own "Team" sections
- Parent node dynamically resizes: 210-280px (no children), 320-450px (children), 400-560px (grandchildren)
- Eject button (sky-blue arrow icon) on hover extracts a child from the team
- "Extract from Team" also available in the right-click context menu
- Double-click a team node to zoom/fit to the parent area
- The parent workspace node shows a badge with total descendant count
## Collapsing a Team
The inverse of expansion, triggered via `POST /workspaces/:id/collapse`:
1. Each sub-workspace agent wraps up current work and writes a handoff document to memory
2. Sub-workspaces are stopped and removed
3. The team lead's agent goes back to handling everything directly
4. A `WORKSPACE_COLLAPSED` event fires
Sub-workspace memory is cleaned up based on backend (see [Memory — Cleanup](../architecture/memory.md#cleanup-on-workspace-deletion)).
## Deleting a Team Workspace
When a team workspace is deleted:
1. Platform shows a warning listing all sub-workspaces that will be deleted
2. User can **drag sub-workspaces out** of the team before confirming (promotes them to the parent level)
3. On confirmation, cascade delete removes the parent and all remaining sub-workspaces
4. `WORKSPACE_REMOVED` events fire for each deleted workspace
## Related Docs
- [Communication Rules](../api-protocol/communication-rules.md) — Full access control model
- [Core Concepts](../product/core-concepts.md) — Workspace fundamentals
- [System Prompt Structure](./system-prompt-structure.md) — How peer capabilities are injected
- [Provisioner](../architecture/provisioner.md) — How sub-workspaces are deployed
- [Registry & Heartbeat](../api-protocol/registry-and-heartbeat.md) — How registration works
- [Event Log](../architecture/event-log.md) — Events fired during expansion
- [Canvas UI](../frontend/canvas.md) — Visual behavior of teams

View File

@ -41,8 +41,6 @@ Full contract: `docs/runbooks/admin-auth.md`.
| GET | /admin/workspaces/:id/test-token | admin_test_token.go — mint a fresh bearer token for E2E scripts; returns 404 unless `MOLECULE_ENV != production` or `MOLECULE_ENABLE_TEST_TOKENS=1` |
| GET/POST/DELETE | /admin/secrets[/:key] | secrets.go — legacy aliases for /settings/secrets |
| WS | /workspaces/:id/terminal | terminal.go |
| POST | /workspaces/:id/expand | team.go |
| POST | /workspaces/:id/collapse | team.go |
| POST/GET | /workspaces/:id/approvals | approvals.go |
| POST | /workspaces/:id/approvals/:id/decide | approvals.go |
| GET | /approvals/pending | approvals.go |

View File

@ -336,8 +336,6 @@ This same logic governs: A2A delegation, memory scope enforcement, activity visi
| Method | Endpoint | Purpose |
|--------|----------|---------|
| `POST` | `/workspaces/:id/expand` | Expand workspace into team (become coordinator) |
| `POST` | `/workspaces/:id/collapse` | Collapse team back to single workspace |
### Files, Terminal, Templates, Bundles (8 endpoints)

View File

@ -186,4 +186,3 @@ So the UI now exposes more operational failure state directly instead of silentl
- [Quickstart](../quickstart.md)
- [Platform API](../api-protocol/platform-api.md)
- [Workspace Runtime](../agent-runtime/workspace-runtime.md)
- [Team Expansion](../agent-runtime/team-expansion.md)

View File

@ -18,7 +18,7 @@ lands in the watch list with a colliding term, add a row here.
| **plugin** | A directory under `plugins/` packaging one or more skills or an MCP server wrapper, installable per-workspace via `POST /workspaces/:id/plugins`. Governed by `plugin.yaml`. | **Langflow**: a visual UI node / component in a flowchart. **CrewAI**: a Python-importable callable registered as a capability. |
| **agent** | A persistent containerized workspace running continuously — an identity with memory, a role, and a schedule. Not a one-shot invocation. | Most frameworks (AutoGPT, LangChain agents, OpenAI Assistants): a stateless function-call loop. No persistence between invocations unless explicitly checkpointed. |
| **flow** | A task execution within a workspace — a request enters, the agent runs tools, emits a response, logs activity. No explicit graph abstraction. | **Langflow**: a directed graph of nodes you author visually. **LangGraph**: a stateful graph of callable nodes. Our "flow" is an imperative timeline, not a graph. |
| **team** | A named cluster of workspaces under a PM (org template `expand_team`). Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. |
| **team** | A named cluster of workspaces under a PM . Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. |
| **skill** | A directory with `SKILL.md` that an agent invokes via the `Skill` tool. Skills are documentation + optional scripts that teach an agent a recipe. | **Anthropic Skills API**: nearly identical. **CrewAI tool**: closer to our plugin's MCP tool, not our skill. |
| **channel** | An outbound/inbound social integration (Telegram, Slack, …) per-workspace, wired in `workspace_channels`. | Slack's "channel": the container for messages. We use "channel" for the adapter + credentials, not the conversation itself. |
| **runtime** | The execution engine image tag for a workspace: one of `langgraph`, `claude-code`, `openclaw`, `crewai`, `autogen`, `deepagents`, `hermes`. | **LangGraph runtime**: the Python process running the graph. We use "runtime" for the Docker image + adapter pairing, not the inner process. |

View File

@ -166,8 +166,6 @@ list_workspaces
| MCP Tool | API Route | Method | Description |
|----------|-----------|--------|-------------|
| `expand_team` | `/workspaces/:id/expand` | POST | Expand team node |
| `collapse_team` | `/workspaces/:id/collapse` | POST | Collapse team node |
### Templates & Bundles

View File

@ -63,21 +63,30 @@ fi
# Memory v2 sidecar (built-in postgres plugin). Co-located with the
# main server so operators flipping MEMORY_V2_CUTOVER=true don't need
# to provision a separate service. Stays inert at the protocol layer
# until that env var is set — the workspace-server's wiring.go skips
# building the client without MEMORY_PLUGIN_URL, so the running plugin
# is a no-op for traffic.
# to provision a separate service.
#
# Env defaults:
# Spawn-gating: only start the sidecar when the operator has indicated
# they want it — either MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set.
# Without that signal, the sidecar adds zero value (the platform's
# wiring.go skips building the client too) but pays a real cost: the
# plugin's first migration runs `CREATE EXTENSION vector`, which fails
# on tenant Postgres without pgvector preinstalled and aborts container
# boot via the 30s health gate. Caught on staging redeploy 2026-05-05.
#
# Env defaults (when sidecar IS spawned):
# MEMORY_PLUGIN_DATABASE_URL = $DATABASE_URL (share existing Postgres;
# plugin's `memory_namespaces` / `memory_records` tables coexist
# with `agent_memories` and the rest of the platform schema —
# no conflicts. Operator can override with a separate URL.)
# MEMORY_PLUGIN_LISTEN_ADDR = :9100
# MEMORY_PLUGIN_LISTEN_ADDR = 127.0.0.1:9100
#
# Set MEMORY_PLUGIN_DISABLE=1 to skip launching the sidecar entirely
# (e.g. an operator running the plugin externally on a separate host).
if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then
# Set MEMORY_PLUGIN_DISABLE=1 to force-skip the sidecar even with
# cutover env set (e.g. running the plugin externally on a separate host).
memory_plugin_wanted=""
if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then
memory_plugin_wanted=1
fi
if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then
: "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}"
: "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}"
export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR

View File

@ -10,6 +10,7 @@ package main
import (
"context"
"database/sql"
"embed"
"errors"
"fmt"
"log"
@ -17,6 +18,7 @@ import (
"net/http"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
@ -26,6 +28,16 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/pgplugin"
)
// migrationsFS bundles the .up.sql files into the binary at build time
// so the prebuilt image doesn't need the source tree at runtime. The
// prior `os.ReadDir("cmd/memory-plugin-postgres/migrations")` path
// only resolved during `go test` from the repo root — in the published
// image the path didn't exist and boot failed after the 30s health gate
// (caught on staging redeploy 2026-05-05 after PR #2906).
//
//go:embed migrations/*.up.sql
var migrationsFS embed.FS
const (
envDatabaseURL = "MEMORY_PLUGIN_DATABASE_URL"
envListenAddr = "MEMORY_PLUGIN_LISTEN_ADDR"
@ -149,32 +161,71 @@ func openDB(databaseURL string) (*sql.DB, error) {
return db, nil
}
// runMigrations applies the schema migrations bundled at
// cmd/memory-plugin-postgres/migrations/. Idempotent on repeat boot.
// runMigrations applies the schema migrations bundled into the binary
// via go:embed (see migrationsFS at the top of this file). Idempotent
// on repeat boot — every migration file uses CREATE … IF NOT EXISTS.
//
// Implementation note: rather than embedding the full migrate engine,
// we read the migration files at boot from a known relative path. The
// down migrations are deliberately NOT applied here — that's a manual
// operator action. This keeps the binary tiny and avoids dragging in
// golang-migrate's drivers.
// The down migrations are deliberately NOT applied here — that's a
// manual operator action. This keeps the binary tiny and avoids
// dragging in golang-migrate's drivers.
//
// MEMORY_PLUGIN_MIGRATIONS_DIR (filesystem path) is honored as an
// override for operators who need to ship custom migrations alongside
// the binary without rebuilding. When unset (the common case) we read
// from the embedded FS.
func runMigrations(db *sql.DB) error {
// Find the migrations directory. In `go run` mode it's relative
// to the cmd dir; in the prebuilt binary case it's expected next
// to the binary OR via env var override.
dir := os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")
if dir == "" {
// Best-effort: try the cwd-relative path that works for `go test`.
dir = "cmd/memory-plugin-postgres/migrations"
if dir := strings.TrimSpace(os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")); dir != "" {
return runMigrationsFromDisk(db, dir)
}
entries, err := os.ReadDir(dir)
return runMigrationsFromEmbed(db)
}
// runMigrationsFromEmbed applies the *.up.sql files bundled into the
// binary at build time. Order is alphabetical (matches the on-disk
// behavior of os.ReadDir on Linux for the same set of names).
func runMigrationsFromEmbed(db *sql.DB) error {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return fmt.Errorf("read migrations dir %q: %w", dir, err)
return fmt.Errorf("read embedded migrations: %w", err)
}
names := make([]string, 0, len(entries))
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
path := dir + "/" + e.Name()
names = append(names, e.Name())
}
sort.Strings(names)
for _, name := range names {
data, err := migrationsFS.ReadFile("migrations/" + name)
if err != nil {
return fmt.Errorf("read embedded %q: %w", name, err)
}
if _, err := db.Exec(string(data)); err != nil {
return fmt.Errorf("apply %q: %w", name, err)
}
log.Printf("applied embedded migration %s", name)
}
return nil
}
// runMigrationsFromDisk preserves the legacy filesystem-path mode for
// operator-supplied custom migrations.
func runMigrationsFromDisk(db *sql.DB, dir string) error {
entries, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("read migrations dir %q: %w", dir, err)
}
names := make([]string, 0, len(entries))
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
names = append(names, e.Name())
}
sort.Strings(names)
for _, name := range names {
path := dir + "/" + name
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("read %q: %w", path, err)
@ -182,7 +233,7 @@ func runMigrations(db *sql.DB) error {
if _, err := db.Exec(string(data)); err != nil {
return fmt.Errorf("apply %q: %w", path, err)
}
log.Printf("applied migration %s", e.Name())
log.Printf("applied disk migration %s (from %s)", name, dir)
}
return nil
}

View File

@ -0,0 +1,72 @@
package main
import (
"strings"
"testing"
)
// TestMigrationsEmbedded_ContainsCreateTable pins that the migrations
// are bundled into the binary at build time, NOT loaded from a
// filesystem path that doesn't exist at runtime in the published image.
//
// Pre-fix: PR #2906 shipped the binary without the migrations dir;
// `os.ReadDir("cmd/memory-plugin-postgres/migrations")` errored on every
// tenant boot, the 30s health gate aborted the container, and the
// staging redeploy fleet job marked all tenants as failed. Embedding
// the migrations into the binary removes the runtime path entirely.
func TestMigrationsEmbedded_ContainsCreateTable(t *testing.T) {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
t.Fatalf("embedded migrations dir unreadable: %v", err)
}
if len(entries) == 0 {
t.Fatal("embedded migrations dir is empty — go:embed pattern matched no files")
}
var seenUp bool
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
seenUp = true
data, err := migrationsFS.ReadFile("migrations/" + e.Name())
if err != nil {
t.Errorf("read embedded %q: %v", e.Name(), err)
continue
}
if !strings.Contains(string(data), "CREATE TABLE") {
t.Errorf("embedded %q has no CREATE TABLE — wrong file embedded?", e.Name())
}
}
if !seenUp {
t.Fatal("no *.up.sql in embedded migrations — runtime would have no schema to apply")
}
}
// TestRunMigrationsFromEmbed_OrderingIsAlphabetic pins that we apply
// migrations in deterministic alphabetical order, not in whatever
// arbitrary order migrationsFS.ReadDir happens to return. With one
// migration today this is moot, but a future second migration ('002_…')
// MUST run after '001_…' or the schema is broken.
//
// We can't easily exercise db.Exec here (no test DB); instead pin the
// sort step on the directory listing itself.
func TestRunMigrationsFromEmbed_OrderingIsAlphabetic(t *testing.T) {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
t.Fatalf("embedded migrations dir unreadable: %v", err)
}
var names []string
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
names = append(names, e.Name())
}
for i := 1; i < len(names); i++ {
if names[i-1] > names[i] {
t.Errorf("ReadDir returned non-sorted names; runMigrationsFromEmbed must sort. "+
"Got %q before %q", names[i-1], names[i])
}
}
}

View File

@ -21,14 +21,23 @@ PORT=3000 HOSTNAME=0.0.0.0 node server.js &
CANVAS_PID=$!
# Memory v2 sidecar (built-in postgres plugin). See Dockerfile entrypoint
# comment for rationale. Stays inert at the protocol layer until the
# operator sets MEMORY_V2_CUTOVER=true; running it is cheap.
# comment for rationale.
#
# Defaults the plugin's DATABASE_URL to the tenant's DATABASE_URL so
# operators don't need to configure two of them. Plugin tables coexist
# with the platform schema.
# Spawn-gating: only start the sidecar when the operator has indicated
# they want it (MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set).
# Without that signal, the sidecar adds zero value and risks aborting
# tenant boot via the 30s health gate when the tenant Postgres lacks
# pgvector. Caught on staging redeploy 2026-05-05:
# pq: extension "vector" is not available
#
# Defaults (when sidecar IS spawned): MEMORY_PLUGIN_DATABASE_URL
# falls back to the tenant's DATABASE_URL.
MEMORY_PLUGIN_PID=""
if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then
memory_plugin_wanted=""
if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then
memory_plugin_wanted=1
fi
if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then
: "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}"
: "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}"
export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR

View File

@ -21,6 +21,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
"github.com/google/uuid"
)
@ -96,6 +97,16 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
}
if existing {
log.Printf("Org import: %q already exists (id=%s) — skipping create+provision, recursing into children for partial-match", ws.Name, existingID)
parentRef := ""
if parentID != nil {
parentRef = *parentID
}
provlog.Event("provision.skip_existing", map[string]any{
"name": ws.Name,
"existing_id": existingID,
"parent_id": parentRef,
"tier": tier,
})
*results = append(*results, map[string]interface{}{
"id": existingID,
"name": ws.Name,

View File

@ -0,0 +1,112 @@
package handlers
// provlog_emit_test.go — pins that the structured-logging emit sites
// added for #2867 PR-D actually fire when their boundary is crossed.
//
// These are call-site contract tests, not provlog package tests (those
// live next to the helper). The assertion is "this dispatcher path
// emits this event name" — if a refactor moves the call out of the
// boundary helper, the gate fails. Fields are NOT pinned here on
// purpose; the field set is convenience for ops, not contract for the
// emit point. Pinning fields would block additive evolution of the
// payload (see also feedback_behavior_based_ast_gates.md).
import (
"bytes"
"context"
"log"
"strings"
"sync"
"testing"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
)
// captureProvLog redirects the global logger to a buffer for the test
// duration. provlog.Event uses log.Printf, so this is the only seam.
// Returned mutex protects against concurrent reads from the goroutine
// fired by provisionWorkspaceAuto (the goroutine never returns in
// these tests because Start() is stubbed, but the buffer can still be
// touched by it racing the assertion).
func captureProvLog(t *testing.T) (read func() string) {
t.Helper()
var buf bytes.Buffer
var mu sync.Mutex
prevWriter := log.Writer()
prevFlags := log.Flags()
log.SetFlags(0)
log.SetOutput(&safeWriter{buf: &buf, mu: &mu})
t.Cleanup(func() {
log.SetOutput(prevWriter)
log.SetFlags(prevFlags)
})
return func() string {
mu.Lock()
defer mu.Unlock()
return buf.String()
}
}
// TestProvisionWorkspaceAutoSync_EmitsProvisionStart — sync variant is
// chosen for the assertion path because it returns once the (stubbed)
// Start() has been called, so we know the emit has flushed. The async
// variant would race a goroutine.
func TestProvisionWorkspaceAutoSync_EmitsProvisionStart(t *testing.T) {
read := captureProvLog(t)
h := &WorkspaceHandler{cpProv: &trackingCPProv{}}
// Best-effort: the body will hit DB code under provisionWorkspaceCP
// — we only need the emit at the entry, which fires unconditionally
// before the dispatch. Recovering from any later panic keeps the
// test focused.
defer func() { _ = recover() }()
h.provisionWorkspaceAutoSync("ws-test-1", "tmpl", nil, models.CreateWorkspacePayload{
Name: "n", Tier: 4, Runtime: "claude-code",
})
got := read()
if !strings.Contains(got, "evt: provision.start ") {
t.Fatalf("expected provision.start emit, got log:\n%s", got)
}
if !strings.Contains(got, `"workspace_id":"ws-test-1"`) {
t.Errorf("workspace_id not in payload: %s", got)
}
if !strings.Contains(got, `"sync":true`) {
t.Errorf("sync flag not pinned for sync dispatcher: %s", got)
}
}
// TestStopForRestart_EmitsRestartPreStop — emit fires before the actual
// Stop call, so the trackingCPProv stub doesn't need to be wired for
// real Stop semantics. Backend label "cp" pinned because that's the
// SaaS path; we don't pin "docker" or "none" branches here (separate
// tests would only re-test the trivial branch label switch).
func TestStopForRestart_EmitsRestartPreStop(t *testing.T) {
read := captureProvLog(t)
h := &WorkspaceHandler{cpProv: &trackingCPProv{}}
defer func() { _ = recover() }()
h.stopForRestart(context.Background(), "ws-restart-1")
got := read()
if !strings.Contains(got, "evt: restart.pre_stop ") {
t.Fatalf("expected restart.pre_stop emit, got log:\n%s", got)
}
if !strings.Contains(got, `"workspace_id":"ws-restart-1"`) {
t.Errorf("workspace_id not in payload: %s", got)
}
if !strings.Contains(got, `"backend":"cp"`) {
t.Errorf("backend label missing or wrong: %s", got)
}
}
// TestStopForRestart_EmitsBackendNoneWhenUnwired — pin the no-backend
// branch so a future refactor that drops the label switch is caught.
// This is the silent-Stop case (workspace_dispatchers.go:StopWorkspaceAuto
// returns nil for unwired backends); the emit ensures the operator can
// still see the boundary in the log.
func TestStopForRestart_EmitsBackendNoneWhenUnwired(t *testing.T) {
read := captureProvLog(t)
h := &WorkspaceHandler{} // both nil
h.stopForRestart(context.Background(), "ws-restart-2")
got := read()
if !strings.Contains(got, `"backend":"none"`) {
t.Fatalf("expected backend=none for unwired handler: %s", got)
}
}

View File

@ -1,132 +0,0 @@
package handlers
import (
"encoding/json"
"log"
"net/http"
"os"
"path/filepath"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/gin-gonic/gin"
"gopkg.in/yaml.v3"
)
// TeamHandler now hosts only Collapse — the visual "expand" action is
// canvas-side and creating children goes through the regular
// WorkspaceHandler.Create path with parent_id set, like any other
// workspace. Every workspace can have children; "team" is just the
// state of having children. The old Expand handler bulk-created
// children by reading sub_workspaces from a parent's config and was
// non-idempotent — calling it N times leaked N×children EC2s, which
// is how tenant-hongming accumulated 72 stale workspaces.
type TeamHandler struct {
wh *WorkspaceHandler
b *events.Broadcaster
}
// NewTeamHandler constructs a TeamHandler. wh is used by Collapse to
// route StopWorkspaceAuto through the backend dispatcher.
func NewTeamHandler(b *events.Broadcaster, wh *WorkspaceHandler, platformURL, configsDir string) *TeamHandler {
return &TeamHandler{wh: wh, b: b}
}
// Collapse handles POST /workspaces/:id/collapse
// Stops and removes all child workspaces.
func (h *TeamHandler) Collapse(c *gin.Context) {
parentID := c.Param("id")
ctx := c.Request.Context()
// Find children
rows, err := db.DB.QueryContext(ctx,
`SELECT id, name FROM workspaces WHERE parent_id = $1 AND status != 'removed'`, parentID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query children"})
return
}
defer rows.Close()
removed := make([]string, 0)
for rows.Next() {
var childID, childName string
if rows.Scan(&childID, &childName) != nil {
continue
}
// Stop the workload via the backend dispatcher (CP for SaaS,
// Docker for self-hosted). Pre-2026-05-05 this was
// `if h.provisioner != nil { h.provisioner.Stop(...) }`, which
// silently skipped on every SaaS tenant — child EC2s kept running
// after team-collapse until the orphan sweeper caught them
// (issue #2813).
if err := h.wh.StopWorkspaceAuto(ctx, childID); err != nil {
log.Printf("Team collapse: stop %s failed: %v — orphan sweeper will reconcile", childID, err)
}
// Mark as removed
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusRemoved, childID); err != nil {
log.Printf("Team collapse: failed to remove workspace %s: %v", childID, err)
}
if _, err := db.DB.ExecContext(ctx,
`DELETE FROM canvas_layouts WHERE workspace_id = $1`, childID); err != nil {
log.Printf("Team collapse: failed to delete layout for %s: %v", childID, err)
}
h.b.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", childID, map[string]interface{}{})
removed = append(removed, childName)
}
h.b.RecordAndBroadcast(ctx, "WORKSPACE_COLLAPSED", parentID, map[string]interface{}{
"removed_children": removed,
})
c.JSON(http.StatusOK, gin.H{
"status": "collapsed",
"removed": removed,
})
}
// findTemplateDirByName resolves a workspace name to its template
// directory. Kept here because callers outside this package may use
// it, even though the in-package consumer (Expand) is gone.
//
// TODO: relocate alongside the templates handler if no other callers
// surface, or delete entirely after a deprecation cycle.
func findTemplateDirByName(configsDir, name string) string {
normalized := normalizeName(name)
candidate := filepath.Join(configsDir, normalized)
if _, err := os.Stat(filepath.Join(candidate, "config.yaml")); err == nil {
return candidate
}
// Fall back to scanning all dirs
entries, err := os.ReadDir(configsDir)
if err != nil {
return ""
}
for _, e := range entries {
if !e.IsDir() {
continue
}
cfgPath := filepath.Join(configsDir, e.Name(), "config.yaml")
data, err := os.ReadFile(cfgPath)
if err != nil {
continue
}
var cfg struct {
Name string `yaml:"name"`
}
if json.Unmarshal(data, &cfg) == nil && cfg.Name == name {
return filepath.Join(configsDir, e.Name())
}
if yaml.Unmarshal(data, &cfg) == nil && cfg.Name == name {
return filepath.Join(configsDir, e.Name())
}
}
return ""
}

View File

@ -1,130 +0,0 @@
package handlers
import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// ---------- TeamHandler: Collapse ----------
func TestTeamCollapse_NoChildren(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs")
// No children
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
WithArgs("ws-parent").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
// WORKSPACE_COLLAPSED broadcast
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-parent"}}
c.Request = httptest.NewRequest("POST", "/", nil)
handler.Collapse(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "collapsed" {
t.Errorf("expected status 'collapsed', got %v", resp["status"])
}
}
func TestTeamCollapse_WithChildren(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs")
// Two children
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
WithArgs("ws-parent").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("child-1", "Worker A").
AddRow("child-2", "Worker B"))
// UPDATE + DELETE + broadcast for child-1
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs("child-1").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM canvas_layouts").
WithArgs("child-1").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
// UPDATE + DELETE + broadcast for child-2
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs("child-2").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM canvas_layouts").
WithArgs("child-2").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
// WORKSPACE_COLLAPSED broadcast for parent
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-parent"}}
c.Request = httptest.NewRequest("POST", "/", nil)
handler.Collapse(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
removed, ok := resp["removed"].([]interface{})
if !ok || len(removed) != 2 {
t.Errorf("expected 2 removed children, got %v", resp["removed"])
}
}
// ---------- findTemplateDirByName helper ----------
func TestFindTemplateDirByName_DirectMatch(t *testing.T) {
dir := t.TempDir()
subDir := filepath.Join(dir, "mybot")
os.MkdirAll(subDir, 0755)
os.WriteFile(filepath.Join(subDir, "config.yaml"), []byte("name: MyBot"), 0644)
result := findTemplateDirByName(dir, "mybot")
if result != subDir {
t.Errorf("expected %s, got %s", subDir, result)
}
}
func TestFindTemplateDirByName_NotFound(t *testing.T) {
dir := t.TempDir()
result := findTemplateDirByName(dir, "nonexistent")
if result != "" {
t.Errorf("expected empty string, got %s", result)
}
}
func TestFindTemplateDirByName_InvalidConfigsDir(t *testing.T) {
result := findTemplateDirByName("/nonexistent/path", "anything")
if result != "" {
t.Errorf("expected empty string for invalid dir, got %s", result)
}
}

View File

@ -35,6 +35,7 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog"
)
// HasProvisioner reports whether either backend (CP or local Docker) is
@ -101,6 +102,14 @@ func (h *WorkspaceHandler) DefaultTier() int {
// lives in prepareProvisionContext (shared by both per-backend
// goroutines).
func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool {
provlog.Event("provision.start", map[string]any{
"workspace_id": workspaceID,
"name": payload.Name,
"tier": payload.Tier,
"runtime": payload.Runtime,
"template": payload.Template,
"sync": false,
})
if h.cpProv != nil {
go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload)
return true
@ -136,6 +145,14 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri
// Keep these two helpers in sync — when one grows a new arm (third
// backend, retry semantics), the other should too.
func (h *WorkspaceHandler) provisionWorkspaceAutoSync(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool {
provlog.Event("provision.start", map[string]any{
"workspace_id": workspaceID,
"name": payload.Name,
"tier": payload.Tier,
"runtime": payload.Runtime,
"template": payload.Template,
"sync": true,
})
if h.cpProv != nil {
h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload)
return true

View File

@ -12,6 +12,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog"
"github.com/gin-gonic/gin"
)
@ -431,6 +432,16 @@ func coalesceRestart(workspaceID string, cycle func()) {
// NPE'd before reaching the reprovision step — which is why every SaaS dead-
// agent incident pre-this-fix required manual restart from canvas.
func (h *WorkspaceHandler) stopForRestart(ctx context.Context, workspaceID string) {
backend := "none"
if h.provisioner != nil {
backend = "docker"
} else if h.cpProv != nil {
backend = "cp"
}
provlog.Event("restart.pre_stop", map[string]any{
"workspace_id": workspaceID,
"backend": backend,
})
if h.provisioner != nil {
h.provisioner.Stop(ctx, workspaceID)
return

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog"
)
// CPProvisionerAPI is the contract WorkspaceHandler uses to talk to the
@ -214,6 +215,13 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
}
log.Printf("CP provisioner: workspace %s → EC2 instance %s (%s)", cfg.WorkspaceID, result.InstanceID, result.State)
provlog.Event("provision.ec2_started", map[string]any{
"workspace_id": cfg.WorkspaceID,
"instance_id": result.InstanceID,
"state": result.State,
"tier": cfg.Tier,
"runtime": cfg.Runtime,
})
return result.InstanceID, nil
}
@ -273,6 +281,10 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
return fmt.Errorf("cp provisioner: stop %s: unexpected %d: %s",
workspaceID, resp.StatusCode, strings.TrimSpace(string(body)))
}
provlog.Event("provision.ec2_stopped", map[string]any{
"workspace_id": workspaceID,
"instance_id": instanceID,
})
return nil
}

View File

@ -0,0 +1,48 @@
// Package provlog emits structured, single-line JSON log records for
// provisioning-lifecycle boundaries (workspace create, EC2 start/stop,
// restart, idempotency skips). Records share a stable `evt:` prefix and
// JSON payload so a future grep|jq pipeline (or a Loki/Datadog ingest)
// can reconstruct the per-workspace timeline without parsing the
// human-prose log lines that already exist.
//
// Existing log.Printf lines are intentionally NOT replaced — they
// remain the operator-facing message. Event() emits a paired structured
// record alongside, additive only.
//
// Event taxonomy (extend by appending; never rename):
//
// provision.start — workspace row inserted, EC2 about to launch
// provision.skip_existing — idempotency hit, no new EC2
// provision.ec2_started — RunInstances returned an instance id
// provision.ec2_stopped — TerminateInstances acknowledged
// restart.pre_stop — Restart handler about to call Stop
//
// Required fields per event are documented at each call site.
package provlog
import (
"encoding/json"
"log"
)
// Event writes a single line of the form:
//
// evt: <name> {"k":"v",...}
//
// to the standard logger. JSON encoding errors are silently swallowed —
// a logging helper must never panic the request path. fields may be
// nil; the empty payload `{}` is still useful to mark an event boundary.
func Event(name string, fields map[string]any) {
if fields == nil {
fields = map[string]any{}
}
payload, err := json.Marshal(fields)
if err != nil {
// Fall back to a static payload so the event boundary still
// appears in the log. The marshal error itself is recorded
// on a best-effort basis.
log.Printf("evt: %s {\"_marshal_err\":%q}", name, err.Error())
return
}
log.Printf("evt: %s %s", name, payload)
}

View File

@ -0,0 +1,97 @@
package provlog
import (
"bytes"
"encoding/json"
"log"
"strings"
"testing"
)
// captureLog redirects the default logger to a buffer for the duration
// of fn and returns whatever was written.
func captureLog(t *testing.T, fn func()) string {
t.Helper()
var buf bytes.Buffer
prevWriter := log.Writer()
prevFlags := log.Flags()
log.SetOutput(&buf)
log.SetFlags(0) // strip date/time so assertions stay deterministic
t.Cleanup(func() {
log.SetOutput(prevWriter)
log.SetFlags(prevFlags)
})
fn()
return buf.String()
}
func TestEvent_EmitsEvtPrefixAndJSONPayload(t *testing.T) {
out := captureLog(t, func() {
Event("provision.start", map[string]any{
"workspace_id": "ws-123",
"tier": 4,
"runtime": "claude-code",
})
})
out = strings.TrimSpace(out)
if !strings.HasPrefix(out, "evt: provision.start ") {
t.Fatalf("expected evt-prefixed line, got %q", out)
}
jsonPart := strings.TrimPrefix(out, "evt: provision.start ")
var got map[string]any
if err := json.Unmarshal([]byte(jsonPart), &got); err != nil {
t.Fatalf("payload not valid JSON: %v (raw=%q)", err, jsonPart)
}
if got["workspace_id"] != "ws-123" {
t.Errorf("workspace_id field lost: %+v", got)
}
// JSON unmarshal turns numbers into float64 — exact-equal compare.
if got["tier"].(float64) != 4 {
t.Errorf("tier field lost: %+v", got)
}
if got["runtime"] != "claude-code" {
t.Errorf("runtime field lost: %+v", got)
}
}
func TestEvent_NilFieldsEmitsEmptyObject(t *testing.T) {
out := captureLog(t, func() {
Event("restart.pre_stop", nil)
})
if !strings.Contains(out, "evt: restart.pre_stop {}") {
t.Fatalf("nil fields should emit empty object, got %q", out)
}
}
func TestEvent_PreservesEventBoundaryOnUnmarshalableValue(t *testing.T) {
// A channel cannot be marshaled by encoding/json — verify we still
// emit the event boundary with a recorded marshal error. This is
// the structural guarantee: the call site never sees a panic, and
// the event name is always present in the log.
out := captureLog(t, func() {
Event("provision.ec2_started", map[string]any{
"chan": make(chan int),
})
})
if !strings.Contains(out, "evt: provision.ec2_started ") {
t.Fatalf("event boundary missing on marshal error: %q", out)
}
if !strings.Contains(out, "_marshal_err") {
t.Fatalf("expected _marshal_err sentinel, got %q", out)
}
}
func TestEvent_SingleLineOutput(t *testing.T) {
// Log aggregators line-split on \n. A multi-line emit would silently
// fragment the JSON across two records — pin single-line shape.
out := captureLog(t, func() {
Event("provision.skip_existing", map[string]any{
"existing_id": "ws-abc",
"name": "child-1",
})
})
trimmed := strings.TrimRight(out, "\n")
if strings.Contains(trimmed, "\n") {
t.Fatalf("event line must be single-line, got %q", out)
}
}

View File

@ -243,13 +243,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// entire platform. Gated behind AdminAuth (issue #180).
r.GET("/approvals/pending", middleware.AdminAuth(db.DB), apph.ListAll)
// Team handlers — Collapse only. The bulk-Expand path is gone:
// every workspace can have children via the regular CreateWorkspace
// flow with parent_id set, so a separate handler that bulk-creates
// from sub_workspaces (and was non-idempotent — calling it twice
// duplicated the team) earned its way out.
teamh := handlers.NewTeamHandler(broadcaster, wh, platformURL, configsDir)
wsAuth.POST("/collapse", teamh.Collapse)
// (TeamHandler is gone — #2864.) The visual canvas Collapse
// button calls PATCH /workspaces/:id { collapsed: true/false }
// (presentational toggle on canvas_layouts), NOT the destructive
// POST /collapse that stopped + removed children. The
// destructive route had zero UI callers (verified via grep
// across canvas/, scripts/, and the MCP tool registry — only
// docs referenced it). team.go + team_test.go + the route
// + helpers (findTemplateDirByName, NewTeamHandler) are
// deleted; visual collapse is unaffected.
// Agents
ah := handlers.NewAgentHandler(broadcaster)

View File

@ -553,10 +553,26 @@ def _poll_once(
# Imported lazily at use-site so a runtime that never sees an
# upload-receive row never imports the module. Cheap on the hot
# path because Python caches the import.
from inbox_uploads import is_chat_upload_row, fetch_and_stage
from inbox_uploads import is_chat_upload_row, BatchFetcher
new_count = 0
last_id: str | None = None
# ``batch_fetcher`` is lazy: a poll batch with no upload rows pays
# zero overhead. Once the first upload row appears we open one
# BatchFetcher and submit every subsequent upload row to its thread
# pool; before processing the FIRST non-upload row we drain the
# pool (wait_all) so the URI cache is hot when message rewriting
# runs. Without the barrier, the chat message that references the
# upload would arrive at the agent with the un-rewritten
# platform-pending: URI.
batch_fetcher: BatchFetcher | None = None
def _drain_uploads(bf: BatchFetcher | None) -> None:
if bf is None:
return
bf.wait_all()
bf.close()
for row in rows:
if not isinstance(row, dict):
continue
@ -570,14 +586,21 @@ def _poll_once(
# message_from_activity. We DO advance the cursor past
# this row so a permanent network outage on /content
# doesn't stall the cursor and block real chat traffic.
fetch_and_stage(
row,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
)
if batch_fetcher is None:
batch_fetcher = BatchFetcher(
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
)
batch_fetcher.submit(row)
last_id = str(row.get("id", "")) or last_id
continue
# Non-upload row: drain any pending uploads first so the URI
# cache is populated before we run rewrite_request_body /
# message_from_activity on a row that may reference one.
if batch_fetcher is not None:
_drain_uploads(batch_fetcher)
batch_fetcher = None
if _is_self_notify_row(row):
# The workspace-server's `/notify` handler writes the agent's
# own send_message_to_user POSTs to activity_logs with
@ -612,6 +635,13 @@ def _poll_once(
last_id = message.activity_id
new_count += 1
# Drain any uploads still in flight if the batch ended with upload
# rows (no chat-message row to trigger the inline drain). Without
# this, a future poll that picks up the chat-message row first
# would race with the still-running fetches.
if batch_fetcher is not None:
_drain_uploads(batch_fetcher)
if last_id is not None:
state.save_cursor(last_id, cursor_key)
return new_count
@ -654,6 +684,7 @@ def start_poller_thread(
platform_url: str,
workspace_id: str,
interval: float = POLL_INTERVAL_SECONDS,
stop_event: threading.Event | None = None,
) -> threading.Thread:
"""Spawn the poller as a daemon thread. Returns the Thread handle.
@ -665,13 +696,18 @@ def start_poller_thread(
operator running ``ps -eL`` or eyeballing ``threading.enumerate()``
can tell which thread is which without reverse-engineering it from
crash tracebacks.
Pass ``stop_event`` to enable graceful shutdown used by tests so
the daemon thread doesn't outlive the test that started it and race
with later tests' httpx patches. Production code passes None and
relies on the daemon flag for process-exit cleanup.
"""
name = "molecule-mcp-inbox-poller"
if workspace_id:
name = f"{name}-{workspace_id[:8]}"
t = threading.Thread(
target=_poll_loop,
args=(state, platform_url, workspace_id, interval),
args=(state, platform_url, workspace_id, interval, stop_event),
name=name,
daemon=True,
)

View File

@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted.
"""
from __future__ import annotations
import concurrent.futures
import logging
import mimetypes
import os
@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024
# 10s default for /activity calls — both are user-perceived latency.
DEFAULT_FETCH_TIMEOUT = 60.0
# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom
# for the realistic "user dragged 3-4 files into chat at once" case
# while bounding the platform's per-workspace fan-out. The cap matters
# because the platform's /content endpoint reads bytea from Postgres in
# a single round-trip per request — N workers = N concurrent DB reads
# of up to 25 MB each, so a higher cap could pressure platform memory
# without much UX win (network bandwidth is the bottleneck once the
# bytes are buffered).
DEFAULT_BATCH_FETCH_WORKERS = 4
# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox
# poll loop before giving up on still-in-flight fetches. Aligned with
# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop
# longer than its own deadline. A timeout fires only if a worker thread
# is stuck past the underlying httpx timeout — pathological case;
# normal completion is bounded by per-fetch timeout × ceil(N/W).
DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0
# Cap on the URI cache. A long-lived workspace handling thousands of
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
# entries-needed-for-a-typical-conversation well within memory.
@ -275,6 +294,7 @@ def fetch_and_stage(
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
client: Any = None,
) -> str | None:
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
@ -289,6 +309,11 @@ def fetch_and_stage(
On success, the URI cache is updated so a subsequent chat message
referencing the same ``platform-pending:`` URI is rewritten before
the agent sees it.
Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and
POST ack (saves one TLS handshake per row vs. constructing one
per-call). ``BatchFetcher`` does this across an entire poll batch so
N concurrent fetches share one connection pool.
"""
body = _request_body_dict(row)
if body is None:
@ -317,25 +342,58 @@ def fetch_and_stage(
if not isinstance(filename, str):
filename = "file"
# Lazy httpx import: the standalone MCP path uses httpx; an in-
# container caller that imports this module by accident shouldn't
# explode at import time.
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
# Caller-supplied client: reuse for both GET + POST ack. Otherwise
# build a one-shot client and close it on the way out. Lazy httpx
# import keeps the standalone MCP path's optional dep optional.
own_client = client is None
if own_client:
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
client = httpx.Client(timeout=timeout_secs)
try:
return _fetch_and_stage_with_client(
client,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
file_id=file_id,
pending_uri=pending_uri,
filename=filename,
body=body,
)
finally:
if own_client:
try:
client.close()
except Exception: # noqa: BLE001 — close should never crash the caller
pass
def _fetch_and_stage_with_client(
client: Any,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
file_id: str,
pending_uri: str,
filename: str,
body: dict[str, Any],
) -> str | None:
"""Inner body of fetch_and_stage. Always uses the supplied client for
both GET and POST so the connection pool is shared across the call.
"""
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
try:
with httpx.Client(timeout=timeout_secs) as client:
resp = client.get(content_url, headers=headers)
resp = client.get(content_url, headers=headers)
except Exception as exc: # noqa: BLE001
logger.warning(
"inbox_uploads: GET %s failed: %s", content_url, exc
)
logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc)
return None
if resp.status_code == 404:
@ -403,8 +461,7 @@ def fetch_and_stage(
# back the on-disk file — the platform's sweep will clean up
# eventually.
try:
with httpx.Client(timeout=timeout_secs) as client:
ack_resp = client.post(ack_url, headers=headers)
ack_resp = client.post(ack_url, headers=headers)
if ack_resp.status_code >= 400:
logger.warning(
"inbox_uploads: ack %s returned %d: %s",
@ -418,6 +475,198 @@ def fetch_and_stage(
return local_uri
# ---------------------------------------------------------------------------
# BatchFetcher — concurrent fetch across a single poll batch
# ---------------------------------------------------------------------------
class BatchFetcher:
"""Fetch + stage + ack a batch of upload-receive rows concurrently.
Why this exists: the inbox poll loop used to call ``fetch_and_stage``
serially per row. With N upload rows in a batch (a user dragging
multiple files into chat at once), the loop blocked for
``N × per_fetch_latency`` before processing the chat message that
referenced them a 4-file upload at 5s each = 20s of stall
before the agent saw the user's prompt. ``BatchFetcher`` runs the
fetches on a small thread pool (default 4 workers) so the stall is
bounded by ``ceil(N/W) × per_fetch_latency`` instead.
Connection reuse: one ``httpx.Client`` is shared across every fetch
in the batch. httpx clients carry a connection pool, so a second
fetch to the same platform host reuses the TCP+TLS handshake from
the first measurable win when fetches happen back-to-back.
Correctness invariant the caller MUST preserve: the inbox loop is
expected to call ``wait_all()`` before processing the chat-message
activity row that REFERENCES one of these uploads. Without the
barrier, the URI cache is empty when ``rewrite_request_body`` runs
and the agent sees the un-rewritten ``platform-pending:`` URI. The
caller-side test ``test_poll_once_waits_for_uploads_before_messages``
pins this end-to-end.
Use as a context manager so the executor + client are torn down
even if the caller raises mid-batch.
"""
def __init__(
self,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
max_workers: int = DEFAULT_BATCH_FETCH_WORKERS,
client: Any = None,
):
self._platform_url = platform_url
self._workspace_id = workspace_id
self._headers = dict(headers) # copy so caller mutations don't leak in
self._timeout_secs = timeout_secs
# Caller can inject a client (tests do this); production callers
# let us build one. Track ownership so we only close ours.
self._own_client = client is None
if self._own_client:
try:
import httpx # noqa: WPS433
except ImportError:
# Match fetch_and_stage's behavior: log + degrade rather
# than raising at construction time. submit() will then
# return None for every row.
logger.error("inbox_uploads: httpx not installed; BatchFetcher inert")
self._client: Any = None
else:
self._client = httpx.Client(timeout=timeout_secs)
else:
self._client = client
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="upload-fetch",
)
self._futures: list[concurrent.futures.Future[Any]] = []
self._closed = False
# Flipped to True by wait_all when the timeout fires; close()
# reads this to decide between drain-and-wait vs cancel-queued.
self._timed_out = False
def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None:
"""Submit ``row`` for fetch + stage + ack. Non-blocking — the
worker thread runs ``fetch_and_stage`` with the shared client.
Returns the Future so a caller that wants per-row outcome can
await it; ``None`` if the BatchFetcher is in a degraded state
(httpx missing).
"""
if self._closed:
raise RuntimeError("BatchFetcher: submit after close")
if self._client is None:
return None
fut = self._executor.submit(
fetch_and_stage,
row,
platform_url=self._platform_url,
workspace_id=self._workspace_id,
headers=self._headers,
timeout_secs=self._timeout_secs,
client=self._client,
)
self._futures.append(fut)
return fut
def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None:
"""Block until every submitted future completes (or times out).
Per-future exceptions are logged + swallowed ``fetch_and_stage``
already converts every error path to ``return None``, so a real
exception propagating up to here is unexpected and we don't want
one bad fetch to abort the whole batch.
Timeouts are also logged + swallowed AND record the timed-out
futures on ``self._timed_out`` so ``close`` can cancel them
without paying their full latency. Without this hand-off,
``close()``'s ``shutdown(wait=True)`` would block on the leaked
workers and undo the user-facing timeout the inbox poll loop
would stall indefinitely on a hung /content fetch.
"""
if not self._futures:
return
try:
done, not_done = concurrent.futures.wait(
self._futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED,
)
except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here
logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc)
return
for fut in done:
exc = fut.exception()
if exc is not None:
logger.warning(
"inbox_uploads: BatchFetcher worker raised: %s", exc
)
if not_done:
logger.warning(
"inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout",
len(not_done),
timeout,
)
# Mark these futures so close() knows to cancel-not-wait. We
# cancel queued-but-not-started ones immediately; futures
# already running can't be cancelled (Python's threading
# model), but close() will pass cancel_futures=True so any
# remaining queued items don't run.
for fut in not_done:
fut.cancel()
self._timed_out = True
def close(self) -> None:
"""Tear down the executor + (if owned) the httpx client.
Idempotent. After close, ``submit`` raises and the BatchFetcher
cannot be reused construct a fresh one for the next poll.
If ``wait_all`` reported a timeout, shutdown skips the
``wait=True`` drain and instead asks the executor to drop queued
futures (``cancel_futures=True``). Currently-running workers
can't be interrupted by Python's threading model, but the poll
loop returns immediately rather than blocking on a hung fetch.
"""
if self._closed:
return
self._closed = True
timed_out = getattr(self, "_timed_out", False)
try:
if timed_out:
# cancel_futures landed in Python 3.9 — guarded for older
# interpreters via a TypeError fallback. Drop queued
# tasks; running ones will exit when their httpx call
# eventually returns or the daemon thread dies.
try:
self._executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
self._executor.shutdown(wait=False)
else:
# Healthy path: wait for in-flight work so we don't
# interrupt a fetch mid-write.
self._executor.shutdown(wait=True)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: executor shutdown error: %s", exc)
if self._own_client and self._client is not None:
try:
self._client.close()
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: client close error: %s", exc)
def __enter__(self) -> "BatchFetcher":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
# ---------------------------------------------------------------------------
# URI rewrite for incoming chat messages
# ---------------------------------------------------------------------------

View File

@ -555,16 +555,34 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts
and write to a stale workspace."""
and write to a stale workspace.
Stop_event is plumbed so the thread cleans up at the end of the
test instead of leaking into later tests. Without cleanup, the
daemon's ~10ms tick races with later tests that patch httpx.Client
the leaked thread sees their patched response and runs an
unwanted iteration of _poll_once that double-counts mocked calls
(caught when test_batch_fetcher_owns_client_when_not_supplied
surfaced this on Python 3.11 CI but not 3.13 local).
"""
resp = _make_response(200, [])
p, _ = _patch_httpx(resp)
stop_event = threading.Event()
with p, patch("platform_auth.auth_headers", return_value={}):
# Use a very short interval so the loop body runs at least once
# before we exit the test.
t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01)
t = inbox.start_poller_thread(
state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event
)
time.sleep(0.05)
assert t.daemon is True
assert t.is_alive()
assert t.daemon is True
assert t.is_alive()
# Signal shutdown + wait for the thread to actually exit before
# we leave the test scope. Without this join, the leaked thread
# races with later tests' httpx patches.
stop_event.set()
t.join(timeout=2.0)
assert not t.is_alive(), "poller thread did not exit on stop_event"
# ---------------------------------------------------------------------------
@ -577,6 +595,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Phase 5b — BatchFetcher integration with the poll loop
# ---------------------------------------------------------------------------
#
# These tests pin the cross-module contract between inbox._poll_once and
# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted
# to a single BatchFetcher AND drained (URI cache populated) before any
# subsequent message row is processed. Without the drain, the
# rewrite_request_body path inside message_from_activity surfaces the
# un-rewritten ``platform-pending:`` URI to the agent.
def _upload_row(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "chat_upload_receive",
"summary": f"chat_upload_receive: {file_id}.pdf",
"request_body": {
"file_id": file_id,
"name": f"{file_id}.pdf",
"uri": f"platform-pending:ws-1/{file_id}",
"mimeType": "application/pdf",
"size": 3,
},
"created_at": "2026-05-04T10:00:00Z",
}
def _message_row_referencing(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {
"params": {
"message": {
"parts": [
{"kind": "text", "text": "have a look"},
{
"kind": "file",
"file": {
"uri": f"platform-pending:ws-1/{file_id}",
"name": f"{file_id}.pdf",
},
},
]
}
}
},
"created_at": "2026-05-04T10:00:01Z",
}
def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"):
"""Replace ``httpx.Client`` so:
- GET /activity returns ``activity_rows``
- GET /workspaces/.../content returns ``upload_bytes`` with content-type
- POST /ack returns 200
Returns the patch context manager; tests use ``with p:``. Each new
Client(...) gets a fresh MagicMock so the test can verify
constructor-count expectations without pinning singletons.
"""
def _client_factory(*args, **kwargs):
c = MagicMock()
c.__enter__ = MagicMock(return_value=c)
c.__exit__ = MagicMock(return_value=False)
def _get(url, params=None, headers=None):
if "/activity" in url:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = activity_rows
resp.text = ""
return resp
if "/pending-uploads/" in url and "/content" in url:
resp = MagicMock()
resp.status_code = 200
resp.content = upload_bytes
resp.headers = {"content-type": "application/pdf"}
resp.text = ""
return resp
resp = MagicMock()
resp.status_code = 404
resp.text = ""
return resp
def _post(url, headers=None):
resp = MagicMock()
resp.status_code = 200
resp.text = ""
return resp
c.get = MagicMock(side_effect=_get)
c.post = MagicMock(side_effect=_post)
c.close = MagicMock()
return c
return patch("httpx.Client", side_effect=_client_factory)
def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path):
"""The chat-message row's file.uri MUST be rewritten to the local
workspace: URI by the time it lands in the InboxState queue. This
requires BatchFetcher.wait_all() to run before message_from_activity
on the second row.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
# Sandbox the on-disk staging dir so the test can't pollute the
# workspace's real chat-uploads.
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [
_upload_row("act-1", "file-A"),
_message_row_referencing("act-2", "file-A"),
]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)"
queued = state.peek(10)
assert len(queued) == 1
# The contract this test exists to pin: the platform-pending: URI
# was rewritten to workspace: BEFORE the message landed in the
# state queue. message_from_activity mutates row['request_body']
# in-place, so the rewritten URI is observable on the row dict
# we passed in.
rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1]
assert rewritten_part["file"]["uri"].startswith("workspace:"), (
f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; "
"rewrite_request_body ran before BatchFetcher.wait_all populated the cache"
)
# Cursor advanced past BOTH rows — upload-receive (act-1) is
# acknowledged via the inbox cursor regardless of fetch outcome.
assert state.load_cursor() == "act-2"
def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path):
"""End-of-batch drain: a poll that contains ONLY upload rows (no
chat-message row to trigger the inline drain) must still drain the
BatchFetcher before _poll_once returns. Otherwise a future poll
that picks up the corresponding chat-message row would race with
in-flight fetches from the previous batch.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# By the time _poll_once returned, the URI cache must be hot
# for both file_ids — proves the end-of-loop drain ran.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
# Upload rows are NOT message rows; queue stays empty.
assert n == 0
# Cursor advances past both upload rows.
assert state.load_cursor() == "act-2"
def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState):
"""A batch with no upload-receive rows must not pay the BatchFetcher
construction cost the executor + httpx client allocation is
deferred until the first upload row appears.
"""
import inbox_uploads
constructed: list[Any] = []
def _patched_init(self, **kwargs):
constructed.append(kwargs)
# Don't actually run __init__; we never hit submit/wait_all.
self._closed = False
self._futures = []
self._executor = MagicMock()
self._client = MagicMock()
self._own_client = False
rows = [
{
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "hi"}]},
"created_at": "2026-04-30T22:00:00Z",
},
]
state.save_cursor("act-old")
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 1
assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present"
def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch):
"""When CONFIGS_DIR is unset, the cursor path resolves through
configs_dir.resolve() /configs in-container, ~/.molecule-workspace

View File

@ -695,3 +695,426 @@ def test_rewrite_request_body_handles_non_list_parts():
def test_rewrite_request_body_handles_non_dict_file():
body = {"parts": [{"kind": "file", "file": "not a dict"}]}
inbox_uploads.rewrite_request_body(body) # must not raise
# ---------------------------------------------------------------------------
# fetch_and_stage with shared client — Phase 5b client-reuse contract
# ---------------------------------------------------------------------------
#
# When a caller passes ``client=`` to fetch_and_stage, that client must be
# used for BOTH the GET /content and the POST /ack — no fresh
# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b
# implementation made one new client for GET and another for ack; the new
# shape lets BatchFetcher share one connection pool across an entire batch.
def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch):
row = _row(uri="platform-pending:ws-1/file-1")
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
ack_resp = _make_resp(200)
supplied = MagicMock()
supplied.get = MagicMock(return_value=get_resp)
supplied.post = MagicMock(return_value=ack_resp)
# Sentinel: any code path that constructs httpx.Client when one was
# already supplied is a regression — count constructions.
constructed: list[Any] = []
class _ShouldNotBeCalled:
def __init__(self, *a, **kw):
constructed.append((a, kw))
monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled)
local_uri = inbox_uploads.fetch_and_stage(
row,
platform_url="http://plat",
workspace_id="ws-1",
headers={"Authorization": "Bearer t"},
client=supplied,
)
assert local_uri is not None
assert constructed == [], "supplied client must be reused; no new Client should be constructed"
# GET + POST ack both went through the supplied client.
supplied.get.assert_called_once()
supplied.post.assert_called_once()
# Caller-owned client must NOT be closed by fetch_and_stage; the
# batch fetcher (or test) closes it once the whole batch is done.
supplied.close.assert_not_called()
def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch):
row = _row(uri="platform-pending:ws-1/file-1")
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
ack_resp = _make_resp(200)
built: list[MagicMock] = []
def _factory(*args, **kwargs):
c = MagicMock()
c.get = MagicMock(return_value=get_resp)
c.post = MagicMock(return_value=ack_resp)
built.append(c)
return c
monkeypatch.setattr("httpx.Client", _factory)
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert local_uri is not None
# Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one.
assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}"
# Same client must serve BOTH calls.
built[0].get.assert_called_once()
built[0].post.assert_called_once()
# Owned client must be closed by fetch_and_stage on the way out.
built[0].close.assert_called_once()
def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client():
# Even on failure the supplied client must not be closed — the
# BatchFetcher owns the lifecycle for the whole batch.
row = _row(uri="platform-pending:ws-1/file-1")
supplied = MagicMock()
supplied.get = MagicMock(side_effect=RuntimeError("network down"))
supplied.post = MagicMock() # should not be reached on GET failure
inbox_uploads.fetch_and_stage(
row,
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=supplied,
)
supplied.close.assert_not_called()
supplied.post.assert_not_called()
# ---------------------------------------------------------------------------
# BatchFetcher — concurrent fetch + URI cache barrier
# ---------------------------------------------------------------------------
def _row_with_id(act_id: str, file_id: str) -> dict:
"""Helper: an upload-receive row with a distinct activity id + file id."""
return {
"id": act_id,
"method": "chat_upload_receive",
"request_body": {
"file_id": file_id,
"name": f"{file_id}.pdf",
"uri": f"platform-pending:ws-1/{file_id}",
"mimeType": "application/pdf",
"size": 1,
},
}
def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock:
"""Build one MagicMock client that returns per-file_id responses
based on the file_id segment of the URL.
"""
client = MagicMock()
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
for fid, resp in get_responses.items():
if f"/pending-uploads/{fid}/content" in url:
return resp
return _make_resp(404)
def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock:
return _make_resp(200)
client.get = MagicMock(side_effect=_get)
client.post = MagicMock(side_effect=_post)
return client
def test_batch_fetcher_runs_submitted_rows_concurrently():
# Three rows whose .get() blocks for ~120ms each. With 4 workers the
# batch should complete in ~120ms (parallel), not ~360ms (serial).
# The 250ms ceiling accommodates CI scheduler jitter while still
# discriminating concurrent (~120ms) from serial (~360ms).
import time
barrier_start = [0.0]
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
time.sleep(0.12)
for fid in ("a", "b", "c"):
if f"/pending-uploads/{fid}/content" in url:
return _make_resp(200, content=b"X", content_type="text/plain")
return _make_resp(404)
client = MagicMock()
client.get = MagicMock(side_effect=_slow_get)
client.post = MagicMock(return_value=_make_resp(200))
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=client,
max_workers=4,
)
barrier_start[0] = time.time()
for fid in ("a", "b", "c"):
bf.submit(_row_with_id(f"act-{fid}", fid))
bf.wait_all()
elapsed = time.time() - barrier_start[0]
bf.close()
assert elapsed < 0.25, (
f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s "
"(suggests serial execution — Phase 5b regression)"
)
assert client.get.call_count == 3
assert client.post.call_count == 3
def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated():
"""Pin the correctness invariant: when wait_all returns, the URI
cache is hot for every submitted row. Without this barrier the
inbox loop would process the chat-message row before its uploads
were staged, and rewrite_request_body would surface the un-rewritten
platform-pending: URI to the agent.
"""
import time
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
time.sleep(0.05)
return _make_resp(200, content=b"data", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_slow_get)
client.post = MagicMock(return_value=_make_resp(200))
inbox_uploads.get_cache().clear()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-a", "a"))
bf.submit(_row_with_id("act-b", "b"))
bf.wait_all()
# Cache must be hot for BOTH rows by the time wait_all returns.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None
def test_batch_fetcher_isolates_per_row_failure():
"""One failing fetch must not abort siblings. Sibling rows complete,
URI cache populates for them; the bad row's cache entry stays absent.
"""
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
if "/pending-uploads/bad/content" in url:
return _make_resp(500, text="upstream broken")
return _make_resp(200, content=b"ok", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_get)
client.post = MagicMock(return_value=_make_resp(200))
inbox_uploads.get_cache().clear()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-1", "good1"))
bf.submit(_row_with_id("act-2", "bad"))
bf.submit(_row_with_id("act-3", "good2"))
bf.wait_all()
cache = inbox_uploads.get_cache()
assert cache.get("platform-pending:ws-1/good1") is not None
assert cache.get("platform-pending:ws-1/good2") is not None
assert cache.get("platform-pending:ws-1/bad") is None
def test_batch_fetcher_reuses_one_client_across_all_submits():
"""Every row in the batch must share the same client instance. This
is the connection-pool-reuse leg of the perf win: a second fetch
to the same host reuses the TCP+TLS handshake from the first.
"""
client = MagicMock()
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
client.post = MagicMock(return_value=_make_resp(200))
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
for fid in ("a", "b", "c"):
bf.submit(_row_with_id(f"act-{fid}", fid))
bf.wait_all()
# 3 GETs + 3 POST acks all on the same client — no per-row Client
# construction.
assert client.get.call_count == 3
assert client.post.call_count == 3
def test_batch_fetcher_close_idempotent():
client = MagicMock()
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
)
bf.close()
bf.close() # second call must not raise
def test_batch_fetcher_submit_after_close_raises():
client = MagicMock()
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
)
bf.close()
with pytest.raises(RuntimeError, match="submit after close"):
bf.submit(_row_with_id("act-x", "x"))
def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch):
built: list[MagicMock] = []
def _factory(*args, **kwargs):
c = MagicMock()
c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
c.post = MagicMock(return_value=_make_resp(200))
built.append(c)
return c
monkeypatch.setattr("httpx.Client", _factory)
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}
)
bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
bf.close()
assert len(built) == 1, "expected one owned client per BatchFetcher"
built[0].close.assert_called_once()
def test_batch_fetcher_does_not_close_supplied_client():
client = MagicMock()
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
client.post = MagicMock(return_value=_make_resp(200))
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
# Supplied client survives the BatchFetcher's close — caller's lifecycle.
client.close.assert_not_called()
def test_batch_fetcher_wait_all_no_op_on_empty_batch():
client = MagicMock()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.wait_all() # nothing submitted; must not block, must not raise
client.get.assert_not_called()
client.post.assert_not_called()
def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch):
# No client supplied + httpx import fails → BatchFetcher degrades
# gracefully: submit() returns None and the row is silently skipped.
import sys
real_httpx = sys.modules.pop("httpx", None)
monkeypatch.setitem(sys.modules, "httpx", None)
try:
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}
)
result = bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
bf.close()
finally:
if real_httpx is not None:
sys.modules["httpx"] = real_httpx
else:
sys.modules.pop("httpx", None)
assert result is None
def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
"""The deadline contract: when wait_all times out, close() must NOT
block waiting for the leaked worker threads. Otherwise the inbox
poll loop stalls indefinitely on a hung /content fetch undoing
the user-facing timeout.
Strategy: build a client whose .get() blocks on a threading.Event
that the test never sets. Submit a row, wait_all with a tiny
timeout, then time close(). If close() drained-and-waited it would
block until we set the event (i.e., forever in this test).
"""
import threading
import time
blocker = threading.Event() # never set — workers stay running
def _hang_get(url, headers=None):
# Wait at most ~5s so a buggy implementation eventually unblocks
# the test instead of timing out the whole pytest run, but
# nothing legitimate should reach this fallback.
blocker.wait(timeout=5.0)
return _make_resp(200, content=b"x", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_hang_get)
client.post = MagicMock(return_value=_make_resp(200))
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=client,
max_workers=1, # serialize so submitting 1 keeps the worker busy
)
bf.submit(_row_with_id("act-a", "a"))
# Tiny timeout — wait_all must report the future as not_done.
bf.wait_all(timeout=0.05)
t0 = time.time()
bf.close()
elapsed = time.time() - t0
# Unblock the lingering worker so it doesn't pollute later tests.
blocker.set()
# Without the cancel-on-timeout fix, close() would block until
# blocker.set() — i.e., the full ~5s. With the fix it returns
# immediately because shutdown(wait=False) doesn't drain.
assert elapsed < 1.0, (
f"close() blocked for {elapsed:.2f}s after wait_all timeout — "
"cancel-on-timeout regression: close() is draining instead of bailing"
)
def test_batch_fetcher_close_without_timeout_still_drains():
"""Negative leg of the timeout contract: when wait_all completes
cleanly (no timeout), close() must KEEP its drain-and-wait
behavior so a still-queued ack POST isn't dropped mid-write.
"""
import time
def _slow_get(url, headers=None):
time.sleep(0.05)
return _make_resp(200, content=b"x", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_slow_get)
client.post = MagicMock(return_value=_make_resp(200))
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=client,
max_workers=2,
)
bf.submit(_row_with_id("act-a", "a"))
bf.submit(_row_with_id("act-b", "b"))
bf.wait_all() # generous default timeout — should not fire
bf.close()
# All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait.
assert client.get.call_count == 2
assert client.post.call_count == 2