From 30fb507165f96b36f274f295ec3dc13f7eb7f274 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:26:55 -0700 Subject: [PATCH 1/8] =?UTF-8?q?feat(poll-upload):=20phase=205b=20=E2=80=94?= =?UTF-8?q?=20concurrent=20BatchFetcher=20+=20httpx=20client=20reuse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the two remaining findings from the Phase 1-4 retrospective review (the Python-side counterparts to phase 5a): 1. Important — inbox_uploads.fetch_and_stage blocked the inbox poll loop synchronously per row. A user dragging 4 files into chat at once would stall the poller for 4× per-fetch latency before the chat message reached the agent. Add BatchFetcher: a thread-pool wrapper (default 4 workers) that submits fetches concurrently and exposes wait_all() as the barrier the inbox loop calls before processing the chat-message row that references the uploads. The drain barrier is the correctness invariant: rewrite_request_body must observe a populated URI cache when it walks the chat-message row's parts. _poll_once now drains the BatchFetcher inline before the first non-upload row, AND at end-of-batch (case: batch contains only upload rows; the corresponding chat message arrives in a later poll, but the future-poll-races-current-fetch race is closed). 2. Nit — fetch_and_stage created two httpx.Client instances per row (one for GET /content, one for POST /ack). Refactor so a single client serves both calls. When called from BatchFetcher, the batch-shared client serves every row's GET + ack — so the second fetch reuses the TCP+TLS handshake from the first. Comprehensive tests: - 13 new inbox_uploads tests: - fetch_and_stage with supplied client: zero httpx.Client constructions, GET+POST through the same client, caller's client not closed (lifecycle owned by caller). - fetch_and_stage without supplied client: exactly one httpx.Client constructed (was 2 pre-fix), closed on the way out. - BatchFetcher: 3 rows × 120ms = parallel completion < 250ms (vs. ~360ms serial), URI cache hot when wait_all returns, per-row failure isolation, single-client reuse across all submits, idempotent close, submit-after-close raises, owned-vs-supplied client lifecycle, no-op wait_all on empty batch, graceful httpx-missing degradation. - 3 new inbox tests: - poll_once drains uploads before processing the chat-message row (in-place mutation of row['request_body'] proves the URI was rewritten BEFORE message_from_activity returned). - poll_once with only upload rows still drains at end-of-batch. - poll_once with no upload rows never constructs a BatchFetcher (zero overhead on the no-upload happy path). 133 total inbox + inbox_uploads tests pass; 0 regressions. Closes the chat-upload poll-mode-perf gap end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 45 +++- workspace/inbox_uploads.py | 249 +++++++++++++++++-- workspace/tests/test_inbox.py | 213 ++++++++++++++++ workspace/tests/test_inbox_uploads.py | 339 ++++++++++++++++++++++++++ 4 files changed, 824 insertions(+), 22 deletions(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c7ea895..5e2f02b1 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -553,10 +553,26 @@ def _poll_once( # Imported lazily at use-site so a runtime that never sees an # upload-receive row never imports the module. Cheap on the hot # path because Python caches the import. - from inbox_uploads import is_chat_upload_row, fetch_and_stage + from inbox_uploads import is_chat_upload_row, BatchFetcher new_count = 0 last_id: str | None = None + # ``batch_fetcher`` is lazy: a poll batch with no upload rows pays + # zero overhead. Once the first upload row appears we open one + # BatchFetcher and submit every subsequent upload row to its thread + # pool; before processing the FIRST non-upload row we drain the + # pool (wait_all) so the URI cache is hot when message rewriting + # runs. Without the barrier, the chat message that references the + # upload would arrive at the agent with the un-rewritten + # platform-pending: URI. + batch_fetcher: BatchFetcher | None = None + + def _drain_uploads(bf: BatchFetcher | None) -> None: + if bf is None: + return + bf.wait_all() + bf.close() + for row in rows: if not isinstance(row, dict): continue @@ -570,14 +586,21 @@ def _poll_once( # message_from_activity. We DO advance the cursor past # this row so a permanent network outage on /content # doesn't stall the cursor and block real chat traffic. - fetch_and_stage( - row, - platform_url=platform_url, - workspace_id=workspace_id, - headers=headers, - ) + if batch_fetcher is None: + batch_fetcher = BatchFetcher( + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + batch_fetcher.submit(row) last_id = str(row.get("id", "")) or last_id continue + # Non-upload row: drain any pending uploads first so the URI + # cache is populated before we run rewrite_request_body / + # message_from_activity on a row that may reference one. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with @@ -612,6 +635,14 @@ def _poll_once( last_id = message.activity_id new_count += 1 + # Drain any uploads still in flight if the batch ended with upload + # rows (no chat-message row to trigger the inline drain). Without + # this, a future poll that picks up the chat-message row first + # would race with the still-running fetches. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None + if last_id is not None: state.save_cursor(last_id, cursor_key) return new_count diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 798f18de..913efdcd 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted. """ from __future__ import annotations +import concurrent.futures import logging import mimetypes import os @@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024 # 10s default for /activity calls — both are user-perceived latency. DEFAULT_FETCH_TIMEOUT = 60.0 +# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom +# for the realistic "user dragged 3-4 files into chat at once" case +# while bounding the platform's per-workspace fan-out. The cap matters +# because the platform's /content endpoint reads bytea from Postgres in +# a single round-trip per request — N workers = N concurrent DB reads +# of up to 25 MB each, so a higher cap could pressure platform memory +# without much UX win (network bandwidth is the bottleneck once the +# bytes are buffered). +DEFAULT_BATCH_FETCH_WORKERS = 4 + +# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox +# poll loop before giving up on still-in-flight fetches. Aligned with +# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop +# longer than its own deadline. A timeout fires only if a worker thread +# is stuck past the underlying httpx timeout — pathological case; +# normal completion is bounded by per-fetch timeout × ceil(N/W). +DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0 + # Cap on the URI cache. A long-lived workspace handling thousands of # uploads shouldn't grow without bound; an LRU cap of 1024 keeps the # entries-needed-for-a-typical-conversation well within memory. @@ -275,6 +294,7 @@ def fetch_and_stage( workspace_id: str, headers: dict[str, str], timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + client: Any = None, ) -> str | None: """Fetch the row's bytes, stage them under chat-uploads, and ack. @@ -289,6 +309,11 @@ def fetch_and_stage( On success, the URI cache is updated so a subsequent chat message referencing the same ``platform-pending:`` URI is rewritten before the agent sees it. + + Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and + POST ack (saves one TLS handshake per row vs. constructing one + per-call). ``BatchFetcher`` does this across an entire poll batch so + N concurrent fetches share one connection pool. """ body = _request_body_dict(row) if body is None: @@ -317,25 +342,58 @@ def fetch_and_stage( if not isinstance(filename, str): filename = "file" - # Lazy httpx import: the standalone MCP path uses httpx; an in- - # container caller that imports this module by accident shouldn't - # explode at import time. - try: - import httpx # noqa: WPS433 - except ImportError: - logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) - return None + # Caller-supplied client: reuse for both GET + POST ack. Otherwise + # build a one-shot client and close it on the way out. Lazy httpx + # import keeps the standalone MCP path's optional dep optional. + own_client = client is None + if own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + client = httpx.Client(timeout=timeout_secs) + try: + return _fetch_and_stage_with_client( + client, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + file_id=file_id, + pending_uri=pending_uri, + filename=filename, + body=body, + ) + finally: + if own_client: + try: + client.close() + except Exception: # noqa: BLE001 — close should never crash the caller + pass + + +def _fetch_and_stage_with_client( + client: Any, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + file_id: str, + pending_uri: str, + filename: str, + body: dict[str, Any], +) -> str | None: + """Inner body of fetch_and_stage. Always uses the supplied client for + both GET and POST so the connection pool is shared across the call. + """ content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" try: - with httpx.Client(timeout=timeout_secs) as client: - resp = client.get(content_url, headers=headers) + resp = client.get(content_url, headers=headers) except Exception as exc: # noqa: BLE001 - logger.warning( - "inbox_uploads: GET %s failed: %s", content_url, exc - ) + logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc) return None if resp.status_code == 404: @@ -403,8 +461,7 @@ def fetch_and_stage( # back the on-disk file — the platform's sweep will clean up # eventually. try: - with httpx.Client(timeout=timeout_secs) as client: - ack_resp = client.post(ack_url, headers=headers) + ack_resp = client.post(ack_url, headers=headers) if ack_resp.status_code >= 400: logger.warning( "inbox_uploads: ack %s returned %d: %s", @@ -418,6 +475,168 @@ def fetch_and_stage( return local_uri +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch across a single poll batch +# --------------------------------------------------------------------------- + + +class BatchFetcher: + """Fetch + stage + ack a batch of upload-receive rows concurrently. + + Why this exists: the inbox poll loop used to call ``fetch_and_stage`` + serially per row. With N upload rows in a batch (a user dragging + multiple files into chat at once), the loop blocked for + ``N × per_fetch_latency`` before processing the chat message that + referenced them — a 4-file upload at 5s each = 20s of stall + before the agent saw the user's prompt. ``BatchFetcher`` runs the + fetches on a small thread pool (default 4 workers) so the stall is + bounded by ``ceil(N/W) × per_fetch_latency`` instead. + + Connection reuse: one ``httpx.Client`` is shared across every fetch + in the batch. httpx clients carry a connection pool, so a second + fetch to the same platform host reuses the TCP+TLS handshake from + the first — measurable win when fetches happen back-to-back. + + Correctness invariant the caller MUST preserve: the inbox loop is + expected to call ``wait_all()`` before processing the chat-message + activity row that REFERENCES one of these uploads. Without the + barrier, the URI cache is empty when ``rewrite_request_body`` runs + and the agent sees the un-rewritten ``platform-pending:`` URI. The + caller-side test ``test_poll_once_waits_for_uploads_before_messages`` + pins this end-to-end. + + Use as a context manager so the executor + client are torn down + even if the caller raises mid-batch. + """ + + def __init__( + self, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + max_workers: int = DEFAULT_BATCH_FETCH_WORKERS, + client: Any = None, + ): + self._platform_url = platform_url + self._workspace_id = workspace_id + self._headers = dict(headers) # copy so caller mutations don't leak in + self._timeout_secs = timeout_secs + + # Caller can inject a client (tests do this); production callers + # let us build one. Track ownership so we only close ours. + self._own_client = client is None + if self._own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + # Match fetch_and_stage's behavior: log + degrade rather + # than raising at construction time. submit() will then + # return None for every row. + logger.error("inbox_uploads: httpx not installed; BatchFetcher inert") + self._client: Any = None + else: + self._client = httpx.Client(timeout=timeout_secs) + else: + self._client = client + + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="upload-fetch", + ) + self._futures: list[concurrent.futures.Future[Any]] = [] + self._closed = False + + def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: + """Submit ``row`` for fetch + stage + ack. Non-blocking — the + worker thread runs ``fetch_and_stage`` with the shared client. + + Returns the Future so a caller that wants per-row outcome can + await it; ``None`` if the BatchFetcher is in a degraded state + (httpx missing). + """ + if self._closed: + raise RuntimeError("BatchFetcher: submit after close") + if self._client is None: + return None + fut = self._executor.submit( + fetch_and_stage, + row, + platform_url=self._platform_url, + workspace_id=self._workspace_id, + headers=self._headers, + timeout_secs=self._timeout_secs, + client=self._client, + ) + self._futures.append(fut) + return fut + + def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None: + """Block until every submitted future completes (or times out). + + Per-future exceptions are logged + swallowed — ``fetch_and_stage`` + already converts every error path to ``return None``, so a real + exception propagating up to here is unexpected and we don't want + one bad fetch to abort the whole batch. + + Timeouts are also logged + swallowed; the caller will move on + and the un-acked rows will be retried by the next poll. + """ + if not self._futures: + return + try: + done, not_done = concurrent.futures.wait( + self._futures, + timeout=timeout, + return_when=concurrent.futures.ALL_COMPLETED, + ) + except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here + logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc) + return + for fut in done: + exc = fut.exception() + if exc is not None: + logger.warning( + "inbox_uploads: BatchFetcher worker raised: %s", exc + ) + if not_done: + logger.warning( + "inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout", + len(not_done), + timeout, + ) + + def close(self) -> None: + """Tear down the executor + (if owned) the httpx client. + + Idempotent. After close, ``submit`` raises and the BatchFetcher + cannot be reused — construct a fresh one for the next poll. + """ + if self._closed: + return + self._closed = True + # Drain remaining futures so worker threads aren't killed mid- + # request. wait=True is the safe default; for an inbox poller a + # 60s tail at shutdown is acceptable since uploads in flight are + # the only thing close() is called between. + try: + self._executor.shutdown(wait=True) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: executor shutdown error: %s", exc) + if self._own_client and self._client is not None: + try: + self._client.close() + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: client close error: %s", exc) + + def __enter__(self) -> "BatchFetcher": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + # --------------------------------------------------------------------------- # URI rewrite for incoming chat messages # --------------------------------------------------------------------------- diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 162c32c2..d62b2a0a 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -577,6 +577,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor" +# --------------------------------------------------------------------------- +# Phase 5b — BatchFetcher integration with the poll loop +# --------------------------------------------------------------------------- +# +# These tests pin the cross-module contract between inbox._poll_once and +# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted +# to a single BatchFetcher AND drained (URI cache populated) before any +# subsequent message row is processed. Without the drain, the +# rewrite_request_body path inside message_from_activity surfaces the +# un-rewritten ``platform-pending:`` URI to the agent. + + +def _upload_row(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "chat_upload_receive", + "summary": f"chat_upload_receive: {file_id}.pdf", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 3, + }, + "created_at": "2026-05-04T10:00:00Z", + } + + +def _message_row_referencing(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "have a look"}, + { + "kind": "file", + "file": { + "uri": f"platform-pending:ws-1/{file_id}", + "name": f"{file_id}.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + } + + +def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"): + """Replace ``httpx.Client`` so: + + - GET /activity returns ``activity_rows`` + - GET /workspaces/.../content returns ``upload_bytes`` with content-type + - POST /ack returns 200 + + Returns the patch context manager; tests use ``with p:``. Each new + Client(...) gets a fresh MagicMock so the test can verify + constructor-count expectations without pinning singletons. + """ + def _client_factory(*args, **kwargs): + c = MagicMock() + c.__enter__ = MagicMock(return_value=c) + c.__exit__ = MagicMock(return_value=False) + + def _get(url, params=None, headers=None): + if "/activity" in url: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = activity_rows + resp.text = "" + return resp + if "/pending-uploads/" in url and "/content" in url: + resp = MagicMock() + resp.status_code = 200 + resp.content = upload_bytes + resp.headers = {"content-type": "application/pdf"} + resp.text = "" + return resp + resp = MagicMock() + resp.status_code = 404 + resp.text = "" + return resp + + def _post(url, headers=None): + resp = MagicMock() + resp.status_code = 200 + resp.text = "" + return resp + + c.get = MagicMock(side_effect=_get) + c.post = MagicMock(side_effect=_post) + c.close = MagicMock() + return c + + return patch("httpx.Client", side_effect=_client_factory) + + +def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path): + """The chat-message row's file.uri MUST be rewritten to the local + workspace: URI by the time it lands in the InboxState queue. This + requires BatchFetcher.wait_all() to run before message_from_activity + on the second row. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + # Sandbox the on-disk staging dir so the test can't pollute the + # workspace's real chat-uploads. + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [ + _upload_row("act-1", "file-A"), + _message_row_referencing("act-2", "file-A"), + ] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + + assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)" + queued = state.peek(10) + assert len(queued) == 1 + # The contract this test exists to pin: the platform-pending: URI + # was rewritten to workspace: BEFORE the message landed in the + # state queue. message_from_activity mutates row['request_body'] + # in-place, so the rewritten URI is observable on the row dict + # we passed in. + rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1] + assert rewritten_part["file"]["uri"].startswith("workspace:"), ( + f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; " + "rewrite_request_body ran before BatchFetcher.wait_all populated the cache" + ) + # Cursor advanced past BOTH rows — upload-receive (act-1) is + # acknowledged via the inbox cursor regardless of fetch outcome. + assert state.load_cursor() == "act-2" + + +def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path): + """End-of-batch drain: a poll that contains ONLY upload rows (no + chat-message row to trigger the inline drain) must still drain the + BatchFetcher before _poll_once returns. Otherwise a future poll + that picks up the corresponding chat-message row would race with + in-flight fetches from the previous batch. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + # By the time _poll_once returned, the URI cache must be hot + # for both file_ids — proves the end-of-loop drain ran. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + # Upload rows are NOT message rows; queue stays empty. + assert n == 0 + # Cursor advances past both upload rows. + assert state.load_cursor() == "act-2" + + +def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState): + """A batch with no upload-receive rows must not pay the BatchFetcher + construction cost — the executor + httpx client allocation is + deferred until the first upload row appears. + """ + import inbox_uploads + + constructed: list[Any] = [] + + def _patched_init(self, **kwargs): + constructed.append(kwargs) + # Don't actually run __init__; we never hit submit/wait_all. + self._closed = False + self._futures = [] + self._executor = MagicMock() + self._client = MagicMock() + self._own_client = False + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "hi"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + state.save_cursor("act-old") + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 1 + assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present" + + def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch): """When CONFIGS_DIR is unset, the cursor path resolves through configs_dir.resolve() — /configs in-container, ~/.molecule-workspace diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index 515616e2..c13cea70 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -695,3 +695,342 @@ def test_rewrite_request_body_handles_non_list_parts(): def test_rewrite_request_body_handles_non_dict_file(): body = {"parts": [{"kind": "file", "file": "not a dict"}]} inbox_uploads.rewrite_request_body(body) # must not raise + + +# --------------------------------------------------------------------------- +# fetch_and_stage with shared client — Phase 5b client-reuse contract +# --------------------------------------------------------------------------- +# +# When a caller passes ``client=`` to fetch_and_stage, that client must be +# used for BOTH the GET /content and the POST /ack — no fresh +# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b +# implementation made one new client for GET and another for ack; the new +# shape lets BatchFetcher share one connection pool across an entire batch. + + +def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + supplied = MagicMock() + supplied.get = MagicMock(return_value=get_resp) + supplied.post = MagicMock(return_value=ack_resp) + # Sentinel: any code path that constructs httpx.Client when one was + # already supplied is a regression — count constructions. + constructed: list[Any] = [] + + class _ShouldNotBeCalled: + def __init__(self, *a, **kw): + constructed.append((a, kw)) + + monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled) + + local_uri = inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={"Authorization": "Bearer t"}, + client=supplied, + ) + assert local_uri is not None + assert constructed == [], "supplied client must be reused; no new Client should be constructed" + # GET + POST ack both went through the supplied client. + supplied.get.assert_called_once() + supplied.post.assert_called_once() + # Caller-owned client must NOT be closed by fetch_and_stage; the + # batch fetcher (or test) closes it once the whole batch is done. + supplied.close.assert_not_called() + + +def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=get_resp) + c.post = MagicMock(return_value=ack_resp) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + # Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one. + assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}" + # Same client must serve BOTH calls. + built[0].get.assert_called_once() + built[0].post.assert_called_once() + # Owned client must be closed by fetch_and_stage on the way out. + built[0].close.assert_called_once() + + +def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client(): + # Even on failure the supplied client must not be closed — the + # BatchFetcher owns the lifecycle for the whole batch. + row = _row(uri="platform-pending:ws-1/file-1") + supplied = MagicMock() + supplied.get = MagicMock(side_effect=RuntimeError("network down")) + supplied.post = MagicMock() # should not be reached on GET failure + inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=supplied, + ) + supplied.close.assert_not_called() + supplied.post.assert_not_called() + + +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch + URI cache barrier +# --------------------------------------------------------------------------- + + +def _row_with_id(act_id: str, file_id: str) -> dict: + """Helper: an upload-receive row with a distinct activity id + file id.""" + return { + "id": act_id, + "method": "chat_upload_receive", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 1, + }, + } + + +def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock: + """Build one MagicMock client that returns per-file_id responses + based on the file_id segment of the URL. + """ + client = MagicMock() + + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + for fid, resp in get_responses.items(): + if f"/pending-uploads/{fid}/content" in url: + return resp + return _make_resp(404) + + def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock: + return _make_resp(200) + + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(side_effect=_post) + return client + + +def test_batch_fetcher_runs_submitted_rows_concurrently(): + # Three rows whose .get() blocks for ~120ms each. With 4 workers the + # batch should complete in ~120ms (parallel), not ~360ms (serial). + # The 250ms ceiling accommodates CI scheduler jitter while still + # discriminating concurrent (~120ms) from serial (~360ms). + import time + + barrier_start = [0.0] + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.12) + for fid in ("a", "b", "c"): + if f"/pending-uploads/{fid}/content" in url: + return _make_resp(200, content=b"X", content_type="text/plain") + return _make_resp(404) + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=4, + ) + barrier_start[0] = time.time() + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + elapsed = time.time() - barrier_start[0] + bf.close() + + assert elapsed < 0.25, ( + f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s " + "(suggests serial execution — Phase 5b regression)" + ) + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated(): + """Pin the correctness invariant: when wait_all returns, the URI + cache is hot for every submitted row. Without this barrier the + inbox loop would process the chat-message row before its uploads + were staged, and rewrite_request_body would surface the un-rewritten + platform-pending: URI to the agent. + """ + import time + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.05) + return _make_resp(200, content=b"data", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() + # Cache must be hot for BOTH rows by the time wait_all returns. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None + + +def test_batch_fetcher_isolates_per_row_failure(): + """One failing fetch must not abort siblings. Sibling rows complete, + URI cache populates for them; the bad row's cache entry stays absent. + """ + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + if "/pending-uploads/bad/content" in url: + return _make_resp(500, text="upstream broken") + return _make_resp(200, content=b"ok", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-1", "good1")) + bf.submit(_row_with_id("act-2", "bad")) + bf.submit(_row_with_id("act-3", "good2")) + bf.wait_all() + + cache = inbox_uploads.get_cache() + assert cache.get("platform-pending:ws-1/good1") is not None + assert cache.get("platform-pending:ws-1/good2") is not None + assert cache.get("platform-pending:ws-1/bad") is None + + +def test_batch_fetcher_reuses_one_client_across_all_submits(): + """Every row in the batch must share the same client instance. This + is the connection-pool-reuse leg of the perf win: a second fetch + to the same host reuses the TCP+TLS handshake from the first. + """ + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + + # 3 GETs + 3 POST acks all on the same client — no per-row Client + # construction. + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_close_idempotent(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + bf.close() # second call must not raise + + +def test_batch_fetcher_submit_after_close_raises(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + with pytest.raises(RuntimeError, match="submit after close"): + bf.submit(_row_with_id("act-x", "x")) + + +def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch): + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + c.post = MagicMock(return_value=_make_resp(200)) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + + assert len(built) == 1, "expected one owned client per BatchFetcher" + built[0].close.assert_called_once() + + +def test_batch_fetcher_does_not_close_supplied_client(): + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + # Supplied client survives the BatchFetcher's close — caller's lifecycle. + client.close.assert_not_called() + + +def test_batch_fetcher_wait_all_no_op_on_empty_batch(): + client = MagicMock() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.wait_all() # nothing submitted; must not block, must not raise + client.get.assert_not_called() + client.post.assert_not_called() + + +def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): + # No client supplied + httpx import fails → BatchFetcher degrades + # gracefully: submit() returns None and the row is silently skipped. + import sys + + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + result = bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None From 5b5eacbb2946f475dae92aae6d4f57ee83c2a3b4 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:47:14 -0700 Subject: [PATCH 2/8] test(inbox): clean up daemon poller thread to prevent test cross-talk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_start_poller_thread_is_daemon spawned a daemon thread with no stop mechanism — the leaked thread polled every 10ms with the test's patched httpx.Client mock STILL ACTIVE for ~50ms after the test scope. Later tests that re-patched httpx.Client + asserted call counts on fetch_and_stage / Client construction got their assertions inflated by the leaked thread's iterations. Symptoms: test_poll_once_skips_chat_upload_row_from_queue saw fetch_and_stage called twice instead of once on Python 3.11 CI; test_batch_fetcher_owns_client_when_not_supplied saw two Client constructions instead of one in the full local suite. Both surfaced only after Phase 5b's BatchFetcher refactor changed the timing window that allowed the leaked thread to fire mid-test. Fix: extend start_poller_thread with an optional stop_event kwarg (backward compatible — production callers pass None and rely on the daemon flag for process-exit cleanup). The test now signals + joins on stop_event before exiting scope, so the thread is gone before any later test patches httpx. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 8 +++++++- workspace/tests/test_inbox.py | 26 ++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 5e2f02b1..6c131175 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -685,6 +685,7 @@ def start_poller_thread( platform_url: str, workspace_id: str, interval: float = POLL_INTERVAL_SECONDS, + stop_event: threading.Event | None = None, ) -> threading.Thread: """Spawn the poller as a daemon thread. Returns the Thread handle. @@ -696,13 +697,18 @@ def start_poller_thread( operator running ``ps -eL`` or eyeballing ``threading.enumerate()`` can tell which thread is which without reverse-engineering it from crash tracebacks. + + Pass ``stop_event`` to enable graceful shutdown — used by tests so + the daemon thread doesn't outlive the test that started it and race + with later tests' httpx patches. Production code passes None and + relies on the daemon flag for process-exit cleanup. """ name = "molecule-mcp-inbox-poller" if workspace_id: name = f"{name}-{workspace_id[:8]}" t = threading.Thread( target=_poll_loop, - args=(state, platform_url, workspace_id, interval), + args=(state, platform_url, workspace_id, interval, stop_event), name=name, daemon=True, ) diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index d62b2a0a..cbba9a3b 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -555,16 +555,34 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat def test_start_poller_thread_is_daemon(state: inbox.InboxState): """Daemon flag is required so the poller dies with the parent process; a non-daemon poller would leak across `claude` restarts - and write to a stale workspace.""" + and write to a stale workspace. + + Stop_event is plumbed so the thread cleans up at the end of the + test instead of leaking into later tests. Without cleanup, the + daemon's ~10ms tick races with later tests that patch httpx.Client + — the leaked thread sees their patched response and runs an + unwanted iteration of _poll_once that double-counts mocked calls + (caught when test_batch_fetcher_owns_client_when_not_supplied + surfaced this on Python 3.11 CI but not 3.13 local). + """ resp = _make_response(200, []) p, _ = _patch_httpx(resp) + stop_event = threading.Event() with p, patch("platform_auth.auth_headers", return_value={}): # Use a very short interval so the loop body runs at least once # before we exit the test. - t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01) + t = inbox.start_poller_thread( + state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event + ) time.sleep(0.05) - assert t.daemon is True - assert t.is_alive() + assert t.daemon is True + assert t.is_alive() + # Signal shutdown + wait for the thread to actually exit before + # we leave the test scope. Without this join, the leaked thread + # races with later tests' httpx patches. + stop_event.set() + t.join(timeout=2.0) + assert not t.is_alive(), "poller thread did not exit on stop_event" # --------------------------------------------------------------------------- From 81e83c05b73bae8cce15979103a941dc51554864 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:56:54 -0700 Subject: [PATCH 3/8] fix(inbox): drop unused batch_fetcher = None after end-of-batch drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lint nit from review bot — _drain_uploads() runs and the function immediately advances to the cursor save + return, so the local re-assign is dead code. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 1 - 1 file changed, 1 deletion(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c131175..cff95c6d 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -641,7 +641,6 @@ def _poll_once( # would race with the still-running fetches. if batch_fetcher is not None: _drain_uploads(batch_fetcher) - batch_fetcher = None if last_id is not None: state.save_cursor(last_id, cursor_key) From 6201d12533fc71215aa54418c66a3a3f2a0770e9 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:57:37 -0700 Subject: [PATCH 4/8] fix(memory-plugin): embed migrations into binary via go:embed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #2906 shipped the binary at /memory-plugin without the migrations directory. The plugin's runMigrations() resolved a relative path \`cmd/memory-plugin-postgres/migrations\` that exists in the build context but NOT in the runtime image. Every staging tenant boot failed with: memory-plugin-postgres: migrate: read migrations dir "cmd/memory-plugin-postgres/migrations": open cmd/memory-plugin-postgres/migrations: no such file or directory memory-plugin: ❌ /v1/health never returned 200 after 30s — aborting boot Caught on the staging redeploy fleet job after #2906 merged. Tenants stayed on the old image (CP redeploy correctly fail-fasted) but the new image was broken. Fix: \`//go:embed migrations/*.up.sql\` bundles the migrations into the binary at build time. No filesystem path dependency at runtime. * \`embed.FS\` embeds the .up.sql files alongside the binary. * runMigrations() reads from migrationsFS by default; MEMORY_PLUGIN_MIGRATIONS_DIR override path preserved for operators shipping custom migrations. * Names sorted alphabetically — pinned by a test so a future \`002_*.up.sql\` is guaranteed to run after \`001_*.up.sql\`. Tests: * TestMigrationsEmbedded_ContainsCreateTable — pins that the embed pattern matched files AND those files contain CREATE TABLE (catches both empty-pattern and wrong-files-embedded). * TestRunMigrationsFromEmbed_OrderingIsAlphabetic — pins sorted application order. Verified locally: \`go build\` succeeds, binary 9.3MB, \`strings\` shows the embedded SQL. Refs RFC #2728. Hotfix for #2906. --- .../cmd/memory-plugin-postgres/main.go | 87 +++++++++++++++---- .../migrations_embed_test.go | 72 +++++++++++++++ 2 files changed, 141 insertions(+), 18 deletions(-) create mode 100644 workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go diff --git a/workspace-server/cmd/memory-plugin-postgres/main.go b/workspace-server/cmd/memory-plugin-postgres/main.go index 148c1dd4..2a1b2dee 100644 --- a/workspace-server/cmd/memory-plugin-postgres/main.go +++ b/workspace-server/cmd/memory-plugin-postgres/main.go @@ -10,6 +10,7 @@ package main import ( "context" "database/sql" + "embed" "errors" "fmt" "log" @@ -17,6 +18,7 @@ import ( "net/http" "os" "os/signal" + "sort" "strings" "syscall" "time" @@ -26,6 +28,16 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/pgplugin" ) +// migrationsFS bundles the .up.sql files into the binary at build time +// so the prebuilt image doesn't need the source tree at runtime. The +// prior `os.ReadDir("cmd/memory-plugin-postgres/migrations")` path +// only resolved during `go test` from the repo root — in the published +// image the path didn't exist and boot failed after the 30s health gate +// (caught on staging redeploy 2026-05-05 after PR #2906). +// +//go:embed migrations/*.up.sql +var migrationsFS embed.FS + const ( envDatabaseURL = "MEMORY_PLUGIN_DATABASE_URL" envListenAddr = "MEMORY_PLUGIN_LISTEN_ADDR" @@ -149,32 +161,71 @@ func openDB(databaseURL string) (*sql.DB, error) { return db, nil } -// runMigrations applies the schema migrations bundled at -// cmd/memory-plugin-postgres/migrations/. Idempotent on repeat boot. +// runMigrations applies the schema migrations bundled into the binary +// via go:embed (see migrationsFS at the top of this file). Idempotent +// on repeat boot — every migration file uses CREATE … IF NOT EXISTS. // -// Implementation note: rather than embedding the full migrate engine, -// we read the migration files at boot from a known relative path. The -// down migrations are deliberately NOT applied here — that's a manual -// operator action. This keeps the binary tiny and avoids dragging in -// golang-migrate's drivers. +// The down migrations are deliberately NOT applied here — that's a +// manual operator action. This keeps the binary tiny and avoids +// dragging in golang-migrate's drivers. +// +// MEMORY_PLUGIN_MIGRATIONS_DIR (filesystem path) is honored as an +// override for operators who need to ship custom migrations alongside +// the binary without rebuilding. When unset (the common case) we read +// from the embedded FS. func runMigrations(db *sql.DB) error { - // Find the migrations directory. In `go run` mode it's relative - // to the cmd dir; in the prebuilt binary case it's expected next - // to the binary OR via env var override. - dir := os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR") - if dir == "" { - // Best-effort: try the cwd-relative path that works for `go test`. - dir = "cmd/memory-plugin-postgres/migrations" + if dir := strings.TrimSpace(os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")); dir != "" { + return runMigrationsFromDisk(db, dir) } - entries, err := os.ReadDir(dir) + return runMigrationsFromEmbed(db) +} + +// runMigrationsFromEmbed applies the *.up.sql files bundled into the +// binary at build time. Order is alphabetical (matches the on-disk +// behavior of os.ReadDir on Linux for the same set of names). +func runMigrationsFromEmbed(db *sql.DB) error { + entries, err := migrationsFS.ReadDir("migrations") if err != nil { - return fmt.Errorf("read migrations dir %q: %w", dir, err) + return fmt.Errorf("read embedded migrations: %w", err) } + names := make([]string, 0, len(entries)) for _, e := range entries { if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { continue } - path := dir + "/" + e.Name() + names = append(names, e.Name()) + } + sort.Strings(names) + for _, name := range names { + data, err := migrationsFS.ReadFile("migrations/" + name) + if err != nil { + return fmt.Errorf("read embedded %q: %w", name, err) + } + if _, err := db.Exec(string(data)); err != nil { + return fmt.Errorf("apply %q: %w", name, err) + } + log.Printf("applied embedded migration %s", name) + } + return nil +} + +// runMigrationsFromDisk preserves the legacy filesystem-path mode for +// operator-supplied custom migrations. +func runMigrationsFromDisk(db *sql.DB, dir string) error { + entries, err := os.ReadDir(dir) + if err != nil { + return fmt.Errorf("read migrations dir %q: %w", dir, err) + } + names := make([]string, 0, len(entries)) + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + names = append(names, e.Name()) + } + sort.Strings(names) + for _, name := range names { + path := dir + "/" + name data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("read %q: %w", path, err) @@ -182,7 +233,7 @@ func runMigrations(db *sql.DB) error { if _, err := db.Exec(string(data)); err != nil { return fmt.Errorf("apply %q: %w", path, err) } - log.Printf("applied migration %s", e.Name()) + log.Printf("applied disk migration %s (from %s)", name, dir) } return nil } diff --git a/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go b/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go new file mode 100644 index 00000000..f2f0b785 --- /dev/null +++ b/workspace-server/cmd/memory-plugin-postgres/migrations_embed_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "strings" + "testing" +) + +// TestMigrationsEmbedded_ContainsCreateTable pins that the migrations +// are bundled into the binary at build time, NOT loaded from a +// filesystem path that doesn't exist at runtime in the published image. +// +// Pre-fix: PR #2906 shipped the binary without the migrations dir; +// `os.ReadDir("cmd/memory-plugin-postgres/migrations")` errored on every +// tenant boot, the 30s health gate aborted the container, and the +// staging redeploy fleet job marked all tenants as failed. Embedding +// the migrations into the binary removes the runtime path entirely. +func TestMigrationsEmbedded_ContainsCreateTable(t *testing.T) { + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + t.Fatalf("embedded migrations dir unreadable: %v", err) + } + if len(entries) == 0 { + t.Fatal("embedded migrations dir is empty — go:embed pattern matched no files") + } + + var seenUp bool + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + seenUp = true + data, err := migrationsFS.ReadFile("migrations/" + e.Name()) + if err != nil { + t.Errorf("read embedded %q: %v", e.Name(), err) + continue + } + if !strings.Contains(string(data), "CREATE TABLE") { + t.Errorf("embedded %q has no CREATE TABLE — wrong file embedded?", e.Name()) + } + } + if !seenUp { + t.Fatal("no *.up.sql in embedded migrations — runtime would have no schema to apply") + } +} + +// TestRunMigrationsFromEmbed_OrderingIsAlphabetic pins that we apply +// migrations in deterministic alphabetical order, not in whatever +// arbitrary order migrationsFS.ReadDir happens to return. With one +// migration today this is moot, but a future second migration ('002_…') +// MUST run after '001_…' or the schema is broken. +// +// We can't easily exercise db.Exec here (no test DB); instead pin the +// sort step on the directory listing itself. +func TestRunMigrationsFromEmbed_OrderingIsAlphabetic(t *testing.T) { + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + t.Fatalf("embedded migrations dir unreadable: %v", err) + } + var names []string + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") { + continue + } + names = append(names, e.Name()) + } + for i := 1; i < len(names); i++ { + if names[i-1] > names[i] { + t.Errorf("ReadDir returned non-sorted names; runMigrationsFromEmbed must sort. "+ + "Got %q before %q", names[i-1], names[i]) + } + } +} From eec4ea2e7d8e15c34fed6398033d5c9f173d6e32 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:59:43 -0700 Subject: [PATCH 5/8] chore: delete TeamHandler.Collapse + docs cleanup (closes #2864) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multi-model retrospective review of #2856 (Phase 1 Expand removal) flagged that TeamHandler.Collapse is unreachable from the canvas UI: the "Collapse Team" button calls PATCH /workspaces/:id { collapsed } (visual flag toggle on canvas_layouts), NOT POST /workspaces/:id/collapse. The destructive POST route — which stops EC2s, marks children removed, and deletes layouts — has zero UI callers (verified via grep across canvas/, scripts/, and the MCP tool registry; only docs referenced it). Two semantically different operations had been sharing the word "Collapse": - Visual collapse (canvas) → PATCH { collapsed: true }. Hides children visually. Reversible. UI-only. - Destructive collapse (POST /collapse) → Stops + marks removed. Irreversible. No caller. Deleting the destructive one + its supporting machinery: - workspace-server/internal/handlers/team.go (entirely) - workspace-server/internal/handlers/team_test.go (entirely) - POST /collapse route + teamh init in router.go - findTemplateDirByName helper (zero non-test callers after Expand was deleted in #2856; package-private so no out-of-package consumers) - NewTeamHandler constructor (no callers after route removed) Plus stale doc references (the most dangerous was the MCP wrapper mapping in mcp-server-setup.md — anyone generating MCP tool wrappers from that table was wiring a 404): - docs/agent-runtime/team-expansion.md (deleted entirely — whole guide taught the deleted flow) - docs/api-reference.md (dropped two team.go rows) - docs/api-protocol/platform-api.md (dropped /expand + /collapse rows) - docs/architecture/molecule-technical-doc.md (dropped /expand + /collapse rows) - docs/guides/mcp-server-setup.md (dropped expand_team + collapse_team MCP wrapper mappings) - docs/glossary.md (dropped "(org template expand_team)" parenthetical) - docs/frontend/canvas.md (dropped broken link to deleted team-expansion.md) Kept: docs/architecture/backends.md mention of "TeamHandler.Expand (#2367) bypassed routing on Start" — correct historical context for the AST gate's existence, no live route reference. Visual-collapse path unaffected: canvas/src/components/ContextMenu.tsx:227 → api.patch — unchanged canvas/src/components/WorkspaceNode.tsx:128 → api.patch — unchanged go vet ./... clean. go test ./internal/handlers/ -count 1 — all green (4.3s, no regression). Net: -388/+10 = ~378 lines removed. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/agent-runtime/team-expansion.md | 111 --------------- docs/api-reference.md | 2 - docs/architecture/molecule-technical-doc.md | 2 - docs/frontend/canvas.md | 1 - docs/glossary.md | 2 +- docs/guides/mcp-server-setup.md | 2 - workspace-server/internal/handlers/team.go | 132 ------------------ .../internal/handlers/team_test.go | 130 ----------------- workspace-server/internal/router/router.go | 16 ++- 9 files changed, 10 insertions(+), 388 deletions(-) delete mode 100644 docs/agent-runtime/team-expansion.md delete mode 100644 workspace-server/internal/handlers/team.go delete mode 100644 workspace-server/internal/handlers/team_test.go diff --git a/docs/agent-runtime/team-expansion.md b/docs/agent-runtime/team-expansion.md deleted file mode 100644 index 5785dd13..00000000 --- a/docs/agent-runtime/team-expansion.md +++ /dev/null @@ -1,111 +0,0 @@ -# Team Expansion (Recursive Workspaces) - -When a workspace is expanded into a team, it gains sub-workspaces while its own agent remains as the **team lead** (coordinator). This is recursive — sub-workspaces can themselves be expanded into teams, infinitely deep. - -## How It Works - -When Developer PM is expanded into a team: - -``` -Business Core - | - +-- Developer PM (agent stays, becomes coordinator) - | - +-- Frontend Agent (sub-workspace, private scope) - +-- Backend Agent (sub-workspace, private scope) - +-- QA Agent (sub-workspace, private scope) -``` - -- Developer PM's agent **still exists** and acts as coordinator -- Developer PM receives incoming A2A messages from Business Core -- Developer PM's agent decides how to delegate to sub-workspaces -- Sub-workspaces talk to Developer PM and to each other (same level) -- Sub-workspaces **cannot** talk to Business Core or any workspace outside the team - -## Communication Rules - -| Direction | Allowed? | Example | -|-----------|----------|---------| -| Parent level -> team lead | Yes | Business Core -> Developer PM | -| Team lead -> sub-workspaces | Yes | Developer PM -> Frontend Agent | -| Sub-workspace -> team lead | Yes | Frontend Agent -> Developer PM | -| Sub-workspace <-> sibling | Yes | Frontend Agent <-> Backend Agent | -| Outside -> sub-workspace directly | No (403) | Business Core -> Frontend Agent | -| Sub-workspace -> outside directly | No | Frontend Agent -> Business Core | - -The team lead (Developer PM) is the **only** bridge between the team's internal world and the outside. - -## Scoped Registry - -Sub-workspaces register in the platform registry but with a **private scope**. The registry knows about them but enforces access control. - -``` -Registry: - Business Core :8001 scope: public - Developer PM :8002 scope: public - Frontend Agent :8010 scope: private, parent=Developer PM - Backend Agent :8011 scope: private, parent=Developer PM - QA Agent :8012 scope: private, parent=Developer PM -``` - -- The platform can always discover any workspace (for provisioning, monitoring) -- The parent workspace can discover its sub-workspaces -- Sub-workspaces can discover their siblings (same parent) -- Outside workspaces get a **403 Forbidden** if they try to discover a private sub-workspace - -## How to Expand - -Expansion is triggered via `POST /workspaces/:id/expand`. The platform reads the `sub_workspaces` list from the workspace's config and provisions each one. On the canvas, users right-click a workspace node and select "Expand into team." - -Collapsing is the inverse: `POST /workspaces/:id/collapse`. Sub-workspaces are stopped and removed. - -## What Happens on Expansion - -When Developer PM is expanded into a team, the hierarchy changes but the outside view doesn't. Business Core's parent/child relationship to Developer PM is unaffected — Developer PM still responds to the same A2A endpoint. - -The events fired: -- `WORKSPACE_EXPANDED` with the new `sub_workspace_ids` in the payload -- `WORKSPACE_PROVISIONING` for each new sub-workspace -- `WORKSPACE_ONLINE` for each sub-workspace as they come up - -Communication rules are automatically derived from the new hierarchy — no manual wiring needed. - -## Canvas Behavior - -- Children render as embedded mini-cards (`TeamMemberChip`) inside the parent node, not as separate canvas nodes -- Each mini-card shows full status: gradient bar, name, tier badge, skills pills, active tasks, descendant count -- **Recursive rendering** up to 3 levels deep (`MAX_NESTING_DEPTH = 3`) — sub-cards can contain their own "Team" sections -- Parent node dynamically resizes: 210-280px (no children), 320-450px (children), 400-560px (grandchildren) -- Eject button (sky-blue arrow icon) on hover extracts a child from the team -- "Extract from Team" also available in the right-click context menu -- Double-click a team node to zoom/fit to the parent area -- The parent workspace node shows a badge with total descendant count - -## Collapsing a Team - -The inverse of expansion, triggered via `POST /workspaces/:id/collapse`: - -1. Each sub-workspace agent wraps up current work and writes a handoff document to memory -2. Sub-workspaces are stopped and removed -3. The team lead's agent goes back to handling everything directly -4. A `WORKSPACE_COLLAPSED` event fires - -Sub-workspace memory is cleaned up based on backend (see [Memory — Cleanup](../architecture/memory.md#cleanup-on-workspace-deletion)). - -## Deleting a Team Workspace - -When a team workspace is deleted: -1. Platform shows a warning listing all sub-workspaces that will be deleted -2. User can **drag sub-workspaces out** of the team before confirming (promotes them to the parent level) -3. On confirmation, cascade delete removes the parent and all remaining sub-workspaces -4. `WORKSPACE_REMOVED` events fire for each deleted workspace - -## Related Docs - -- [Communication Rules](../api-protocol/communication-rules.md) — Full access control model -- [Core Concepts](../product/core-concepts.md) — Workspace fundamentals -- [System Prompt Structure](./system-prompt-structure.md) — How peer capabilities are injected -- [Provisioner](../architecture/provisioner.md) — How sub-workspaces are deployed -- [Registry & Heartbeat](../api-protocol/registry-and-heartbeat.md) — How registration works -- [Event Log](../architecture/event-log.md) — Events fired during expansion -- [Canvas UI](../frontend/canvas.md) — Visual behavior of teams diff --git a/docs/api-reference.md b/docs/api-reference.md index e1a75668..12e94a3c 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -41,8 +41,6 @@ Full contract: `docs/runbooks/admin-auth.md`. | GET | /admin/workspaces/:id/test-token | admin_test_token.go — mint a fresh bearer token for E2E scripts; returns 404 unless `MOLECULE_ENV != production` or `MOLECULE_ENABLE_TEST_TOKENS=1` | | GET/POST/DELETE | /admin/secrets[/:key] | secrets.go — legacy aliases for /settings/secrets | | WS | /workspaces/:id/terminal | terminal.go | -| POST | /workspaces/:id/expand | team.go | -| POST | /workspaces/:id/collapse | team.go | | POST/GET | /workspaces/:id/approvals | approvals.go | | POST | /workspaces/:id/approvals/:id/decide | approvals.go | | GET | /approvals/pending | approvals.go | diff --git a/docs/architecture/molecule-technical-doc.md b/docs/architecture/molecule-technical-doc.md index 0d9c653c..cd3dc957 100644 --- a/docs/architecture/molecule-technical-doc.md +++ b/docs/architecture/molecule-technical-doc.md @@ -336,8 +336,6 @@ This same logic governs: A2A delegation, memory scope enforcement, activity visi | Method | Endpoint | Purpose | |--------|----------|---------| -| `POST` | `/workspaces/:id/expand` | Expand workspace into team (become coordinator) | -| `POST` | `/workspaces/:id/collapse` | Collapse team back to single workspace | ### Files, Terminal, Templates, Bundles (8 endpoints) diff --git a/docs/frontend/canvas.md b/docs/frontend/canvas.md index 8d59c80f..fc103bd6 100644 --- a/docs/frontend/canvas.md +++ b/docs/frontend/canvas.md @@ -186,4 +186,3 @@ So the UI now exposes more operational failure state directly instead of silentl - [Quickstart](../quickstart.md) - [Platform API](../api-protocol/platform-api.md) - [Workspace Runtime](../agent-runtime/workspace-runtime.md) -- [Team Expansion](../agent-runtime/team-expansion.md) diff --git a/docs/glossary.md b/docs/glossary.md index f0343a38..b3535ae8 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -18,7 +18,7 @@ lands in the watch list with a colliding term, add a row here. | **plugin** | A directory under `plugins/` packaging one or more skills or an MCP server wrapper, installable per-workspace via `POST /workspaces/:id/plugins`. Governed by `plugin.yaml`. | **Langflow**: a visual UI node / component in a flowchart. **CrewAI**: a Python-importable callable registered as a capability. | | **agent** | A persistent containerized workspace running continuously — an identity with memory, a role, and a schedule. Not a one-shot invocation. | Most frameworks (AutoGPT, LangChain agents, OpenAI Assistants): a stateless function-call loop. No persistence between invocations unless explicitly checkpointed. | | **flow** | A task execution within a workspace — a request enters, the agent runs tools, emits a response, logs activity. No explicit graph abstraction. | **Langflow**: a directed graph of nodes you author visually. **LangGraph**: a stateful graph of callable nodes. Our "flow" is an imperative timeline, not a graph. | -| **team** | A named cluster of workspaces under a PM (org template `expand_team`). Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. | +| **team** | A named cluster of workspaces under a PM . Used for role grouping in Canvas. | **CrewAI**: a "crew" is a sequence of agents that pass a task through a declared order. Our "team" is an org-chart abstraction, not an execution order. | | **skill** | A directory with `SKILL.md` that an agent invokes via the `Skill` tool. Skills are documentation + optional scripts that teach an agent a recipe. | **Anthropic Skills API**: nearly identical. **CrewAI tool**: closer to our plugin's MCP tool, not our skill. | | **channel** | An outbound/inbound social integration (Telegram, Slack, …) per-workspace, wired in `workspace_channels`. | Slack's "channel": the container for messages. We use "channel" for the adapter + credentials, not the conversation itself. | | **runtime** | The execution engine image tag for a workspace: one of `langgraph`, `claude-code`, `openclaw`, `crewai`, `autogen`, `deepagents`, `hermes`. | **LangGraph runtime**: the Python process running the graph. We use "runtime" for the Docker image + adapter pairing, not the inner process. | diff --git a/docs/guides/mcp-server-setup.md b/docs/guides/mcp-server-setup.md index aacc554a..5539ba97 100644 --- a/docs/guides/mcp-server-setup.md +++ b/docs/guides/mcp-server-setup.md @@ -166,8 +166,6 @@ list_workspaces | MCP Tool | API Route | Method | Description | |----------|-----------|--------|-------------| -| `expand_team` | `/workspaces/:id/expand` | POST | Expand team node | -| `collapse_team` | `/workspaces/:id/collapse` | POST | Collapse team node | ### Templates & Bundles diff --git a/workspace-server/internal/handlers/team.go b/workspace-server/internal/handlers/team.go deleted file mode 100644 index 0c536020..00000000 --- a/workspace-server/internal/handlers/team.go +++ /dev/null @@ -1,132 +0,0 @@ -package handlers - -import ( - "encoding/json" - "log" - "net/http" - "os" - "path/filepath" - - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" - "github.com/gin-gonic/gin" - "gopkg.in/yaml.v3" -) - -// TeamHandler now hosts only Collapse — the visual "expand" action is -// canvas-side and creating children goes through the regular -// WorkspaceHandler.Create path with parent_id set, like any other -// workspace. Every workspace can have children; "team" is just the -// state of having children. The old Expand handler bulk-created -// children by reading sub_workspaces from a parent's config and was -// non-idempotent — calling it N times leaked N×children EC2s, which -// is how tenant-hongming accumulated 72 stale workspaces. -type TeamHandler struct { - wh *WorkspaceHandler - b *events.Broadcaster -} - -// NewTeamHandler constructs a TeamHandler. wh is used by Collapse to -// route StopWorkspaceAuto through the backend dispatcher. -func NewTeamHandler(b *events.Broadcaster, wh *WorkspaceHandler, platformURL, configsDir string) *TeamHandler { - return &TeamHandler{wh: wh, b: b} -} - -// Collapse handles POST /workspaces/:id/collapse -// Stops and removes all child workspaces. -func (h *TeamHandler) Collapse(c *gin.Context) { - parentID := c.Param("id") - ctx := c.Request.Context() - - // Find children - rows, err := db.DB.QueryContext(ctx, - `SELECT id, name FROM workspaces WHERE parent_id = $1 AND status != 'removed'`, parentID) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query children"}) - return - } - defer rows.Close() - - removed := make([]string, 0) - for rows.Next() { - var childID, childName string - if rows.Scan(&childID, &childName) != nil { - continue - } - - // Stop the workload via the backend dispatcher (CP for SaaS, - // Docker for self-hosted). Pre-2026-05-05 this was - // `if h.provisioner != nil { h.provisioner.Stop(...) }`, which - // silently skipped on every SaaS tenant — child EC2s kept running - // after team-collapse until the orphan sweeper caught them - // (issue #2813). - if err := h.wh.StopWorkspaceAuto(ctx, childID); err != nil { - log.Printf("Team collapse: stop %s failed: %v — orphan sweeper will reconcile", childID, err) - } - - // Mark as removed - if _, err := db.DB.ExecContext(ctx, - `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusRemoved, childID); err != nil { - log.Printf("Team collapse: failed to remove workspace %s: %v", childID, err) - } - if _, err := db.DB.ExecContext(ctx, - `DELETE FROM canvas_layouts WHERE workspace_id = $1`, childID); err != nil { - log.Printf("Team collapse: failed to delete layout for %s: %v", childID, err) - } - - h.b.RecordAndBroadcast(ctx, "WORKSPACE_REMOVED", childID, map[string]interface{}{}) - - removed = append(removed, childName) - } - - h.b.RecordAndBroadcast(ctx, "WORKSPACE_COLLAPSED", parentID, map[string]interface{}{ - "removed_children": removed, - }) - - c.JSON(http.StatusOK, gin.H{ - "status": "collapsed", - "removed": removed, - }) -} - -// findTemplateDirByName resolves a workspace name to its template -// directory. Kept here because callers outside this package may use -// it, even though the in-package consumer (Expand) is gone. -// -// TODO: relocate alongside the templates handler if no other callers -// surface, or delete entirely after a deprecation cycle. -func findTemplateDirByName(configsDir, name string) string { - normalized := normalizeName(name) - - candidate := filepath.Join(configsDir, normalized) - if _, err := os.Stat(filepath.Join(candidate, "config.yaml")); err == nil { - return candidate - } - - // Fall back to scanning all dirs - entries, err := os.ReadDir(configsDir) - if err != nil { - return "" - } - for _, e := range entries { - if !e.IsDir() { - continue - } - cfgPath := filepath.Join(configsDir, e.Name(), "config.yaml") - data, err := os.ReadFile(cfgPath) - if err != nil { - continue - } - var cfg struct { - Name string `yaml:"name"` - } - if json.Unmarshal(data, &cfg) == nil && cfg.Name == name { - return filepath.Join(configsDir, e.Name()) - } - if yaml.Unmarshal(data, &cfg) == nil && cfg.Name == name { - return filepath.Join(configsDir, e.Name()) - } - } - return "" -} diff --git a/workspace-server/internal/handlers/team_test.go b/workspace-server/internal/handlers/team_test.go deleted file mode 100644 index e87a92ae..00000000 --- a/workspace-server/internal/handlers/team_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package handlers - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "testing" - - "github.com/DATA-DOG/go-sqlmock" - "github.com/gin-gonic/gin" -) - -// ---------- TeamHandler: Collapse ---------- - -func TestTeamCollapse_NoChildren(t *testing.T) { - mock := setupTestDB(t) - setupTestRedis(t) - broadcaster := newTestBroadcaster() - handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs") - - // No children - mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). - WithArgs("ws-parent"). - WillReturnRows(sqlmock.NewRows([]string{"id", "name"})) - - // WORKSPACE_COLLAPSED broadcast - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Params = gin.Params{{Key: "id", Value: "ws-parent"}} - c.Request = httptest.NewRequest("POST", "/", nil) - - handler.Collapse(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var resp map[string]interface{} - json.Unmarshal(w.Body.Bytes(), &resp) - if resp["status"] != "collapsed" { - t.Errorf("expected status 'collapsed', got %v", resp["status"]) - } -} - -func TestTeamCollapse_WithChildren(t *testing.T) { - mock := setupTestDB(t) - setupTestRedis(t) - broadcaster := newTestBroadcaster() - handler := NewTeamHandler(broadcaster, NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), "http://localhost:8080", "/tmp/configs") - - // Two children - mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). - WithArgs("ws-parent"). - WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). - AddRow("child-1", "Worker A"). - AddRow("child-2", "Worker B")) - - // UPDATE + DELETE + broadcast for child-1 - mock.ExpectExec("UPDATE workspaces SET status ="). - WithArgs("child-1"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM canvas_layouts"). - WithArgs("child-1"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // UPDATE + DELETE + broadcast for child-2 - mock.ExpectExec("UPDATE workspaces SET status ="). - WithArgs("child-2"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("DELETE FROM canvas_layouts"). - WithArgs("child-2"). - WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // WORKSPACE_COLLAPSED broadcast for parent - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Params = gin.Params{{Key: "id", Value: "ws-parent"}} - c.Request = httptest.NewRequest("POST", "/", nil) - - handler.Collapse(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var resp map[string]interface{} - json.Unmarshal(w.Body.Bytes(), &resp) - removed, ok := resp["removed"].([]interface{}) - if !ok || len(removed) != 2 { - t.Errorf("expected 2 removed children, got %v", resp["removed"]) - } -} -// ---------- findTemplateDirByName helper ---------- - -func TestFindTemplateDirByName_DirectMatch(t *testing.T) { - dir := t.TempDir() - subDir := filepath.Join(dir, "mybot") - os.MkdirAll(subDir, 0755) - os.WriteFile(filepath.Join(subDir, "config.yaml"), []byte("name: MyBot"), 0644) - - result := findTemplateDirByName(dir, "mybot") - if result != subDir { - t.Errorf("expected %s, got %s", subDir, result) - } -} - -func TestFindTemplateDirByName_NotFound(t *testing.T) { - dir := t.TempDir() - result := findTemplateDirByName(dir, "nonexistent") - if result != "" { - t.Errorf("expected empty string, got %s", result) - } -} - -func TestFindTemplateDirByName_InvalidConfigsDir(t *testing.T) { - result := findTemplateDirByName("/nonexistent/path", "anything") - if result != "" { - t.Errorf("expected empty string for invalid dir, got %s", result) - } -} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index d6d7b2d7..ae928f2f 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -243,13 +243,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // entire platform. Gated behind AdminAuth (issue #180). r.GET("/approvals/pending", middleware.AdminAuth(db.DB), apph.ListAll) - // Team handlers — Collapse only. The bulk-Expand path is gone: - // every workspace can have children via the regular CreateWorkspace - // flow with parent_id set, so a separate handler that bulk-creates - // from sub_workspaces (and was non-idempotent — calling it twice - // duplicated the team) earned its way out. - teamh := handlers.NewTeamHandler(broadcaster, wh, platformURL, configsDir) - wsAuth.POST("/collapse", teamh.Collapse) + // (TeamHandler is gone — #2864.) The visual canvas Collapse + // button calls PATCH /workspaces/:id { collapsed: true/false } + // (presentational toggle on canvas_layouts), NOT the destructive + // POST /collapse that stopped + removed children. The + // destructive route had zero UI callers (verified via grep + // across canvas/, scripts/, and the MCP tool registry — only + // docs referenced it). team.go + team_test.go + the route + // + helpers (findTemplateDirByName, NewTeamHandler) are + // deleted; visual collapse is unaffected. // Agents ah := handlers.NewAgentHandler(broadcaster) From 83454e5efd0f571b3c9c9b9b0466c6fb5620538e Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 12:30:11 -0700 Subject: [PATCH 6/8] feat(workspace-server): structured logging at provisioning boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds internal/provlog with a single Event(name, fields) helper that emits JSON-tagged single-line records to the standard logger. Five boundary sites instrumented for #2867: provision.start — workspace_dispatchers.go (sync + async) provision.skip_existing — org_import.go idempotency hit provision.ec2_started — cp_provisioner.go after RunInstances provision.ec2_stopped — cp_provisioner.go after TerminateInstances ack restart.pre_stop — workspace_restart.go before Stop dispatch These pair with the existing human-prose log.Printf lines (kept). The new records are grep+jq friendly so a future log-aggregation pipeline can reconstruct per-workspace provision timelines without parsing the operator messages — this is the "and debug loggers so it dont happen again" half of the leak-prevention work. Tests: - provlog: emits evt-prefixed JSON, nil-tolerant, marshal-error fallback preserves event boundary, single-line output pinned. - handlers: provlog_emit_test.go pins three call-site contracts: provisionWorkspaceAutoSync emits provision.start with sync=true, stopForRestart emits restart.pre_stop with backend=cp on SaaS, and backend=none when both backends are nil. Field taxonomy is convenience for ops, not contract — payload can grow additively without breaking callers. Behavior gate is the event name + boundary location, per feedback_behavior_based_ast_gates.md. Refs #2867 (PR-D structured logging at provisioning boundaries) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/org_import.go | 11 ++ .../internal/handlers/provlog_emit_test.go | 112 ++++++++++++++++++ .../handlers/workspace_dispatchers.go | 17 +++ .../internal/handlers/workspace_restart.go | 11 ++ .../internal/provisioner/cp_provisioner.go | 12 ++ workspace-server/internal/provlog/provlog.go | 48 ++++++++ .../internal/provlog/provlog_test.go | 97 +++++++++++++++ 7 files changed, 308 insertions(+) create mode 100644 workspace-server/internal/handlers/provlog_emit_test.go create mode 100644 workspace-server/internal/provlog/provlog.go create mode 100644 workspace-server/internal/provlog/provlog_test.go diff --git a/workspace-server/internal/handlers/org_import.go b/workspace-server/internal/handlers/org_import.go index 8f4d9a07..639c8ba9 100644 --- a/workspace-server/internal/handlers/org_import.go +++ b/workspace-server/internal/handlers/org_import.go @@ -21,6 +21,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" "github.com/google/uuid" ) @@ -96,6 +97,16 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX } if existing { log.Printf("Org import: %q already exists (id=%s) — skipping create+provision, recursing into children for partial-match", ws.Name, existingID) + parentRef := "" + if parentID != nil { + parentRef = *parentID + } + provlog.Event("provision.skip_existing", map[string]any{ + "name": ws.Name, + "existing_id": existingID, + "parent_id": parentRef, + "tier": tier, + }) *results = append(*results, map[string]interface{}{ "id": existingID, "name": ws.Name, diff --git a/workspace-server/internal/handlers/provlog_emit_test.go b/workspace-server/internal/handlers/provlog_emit_test.go new file mode 100644 index 00000000..6681c203 --- /dev/null +++ b/workspace-server/internal/handlers/provlog_emit_test.go @@ -0,0 +1,112 @@ +package handlers + +// provlog_emit_test.go — pins that the structured-logging emit sites +// added for #2867 PR-D actually fire when their boundary is crossed. +// +// These are call-site contract tests, not provlog package tests (those +// live next to the helper). The assertion is "this dispatcher path +// emits this event name" — if a refactor moves the call out of the +// boundary helper, the gate fails. Fields are NOT pinned here on +// purpose; the field set is convenience for ops, not contract for the +// emit point. Pinning fields would block additive evolution of the +// payload (see also feedback_behavior_based_ast_gates.md). + +import ( + "bytes" + "context" + "log" + "strings" + "sync" + "testing" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" +) + +// captureProvLog redirects the global logger to a buffer for the test +// duration. provlog.Event uses log.Printf, so this is the only seam. +// Returned mutex protects against concurrent reads from the goroutine +// fired by provisionWorkspaceAuto (the goroutine never returns in +// these tests because Start() is stubbed, but the buffer can still be +// touched by it racing the assertion). +func captureProvLog(t *testing.T) (read func() string) { + t.Helper() + var buf bytes.Buffer + var mu sync.Mutex + prevWriter := log.Writer() + prevFlags := log.Flags() + log.SetFlags(0) + log.SetOutput(&safeWriter{buf: &buf, mu: &mu}) + t.Cleanup(func() { + log.SetOutput(prevWriter) + log.SetFlags(prevFlags) + }) + return func() string { + mu.Lock() + defer mu.Unlock() + return buf.String() + } +} + +// TestProvisionWorkspaceAutoSync_EmitsProvisionStart — sync variant is +// chosen for the assertion path because it returns once the (stubbed) +// Start() has been called, so we know the emit has flushed. The async +// variant would race a goroutine. +func TestProvisionWorkspaceAutoSync_EmitsProvisionStart(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{cpProv: &trackingCPProv{}} + // Best-effort: the body will hit DB code under provisionWorkspaceCP + // — we only need the emit at the entry, which fires unconditionally + // before the dispatch. Recovering from any later panic keeps the + // test focused. + defer func() { _ = recover() }() + h.provisionWorkspaceAutoSync("ws-test-1", "tmpl", nil, models.CreateWorkspacePayload{ + Name: "n", Tier: 4, Runtime: "claude-code", + }) + got := read() + if !strings.Contains(got, "evt: provision.start ") { + t.Fatalf("expected provision.start emit, got log:\n%s", got) + } + if !strings.Contains(got, `"workspace_id":"ws-test-1"`) { + t.Errorf("workspace_id not in payload: %s", got) + } + if !strings.Contains(got, `"sync":true`) { + t.Errorf("sync flag not pinned for sync dispatcher: %s", got) + } +} + +// TestStopForRestart_EmitsRestartPreStop — emit fires before the actual +// Stop call, so the trackingCPProv stub doesn't need to be wired for +// real Stop semantics. Backend label "cp" pinned because that's the +// SaaS path; we don't pin "docker" or "none" branches here (separate +// tests would only re-test the trivial branch label switch). +func TestStopForRestart_EmitsRestartPreStop(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{cpProv: &trackingCPProv{}} + defer func() { _ = recover() }() + h.stopForRestart(context.Background(), "ws-restart-1") + got := read() + if !strings.Contains(got, "evt: restart.pre_stop ") { + t.Fatalf("expected restart.pre_stop emit, got log:\n%s", got) + } + if !strings.Contains(got, `"workspace_id":"ws-restart-1"`) { + t.Errorf("workspace_id not in payload: %s", got) + } + if !strings.Contains(got, `"backend":"cp"`) { + t.Errorf("backend label missing or wrong: %s", got) + } +} + +// TestStopForRestart_EmitsBackendNoneWhenUnwired — pin the no-backend +// branch so a future refactor that drops the label switch is caught. +// This is the silent-Stop case (workspace_dispatchers.go:StopWorkspaceAuto +// returns nil for unwired backends); the emit ensures the operator can +// still see the boundary in the log. +func TestStopForRestart_EmitsBackendNoneWhenUnwired(t *testing.T) { + read := captureProvLog(t) + h := &WorkspaceHandler{} // both nil + h.stopForRestart(context.Background(), "ws-restart-2") + got := read() + if !strings.Contains(got, `"backend":"none"`) { + t.Fatalf("expected backend=none for unwired handler: %s", got) + } +} diff --git a/workspace-server/internal/handlers/workspace_dispatchers.go b/workspace-server/internal/handlers/workspace_dispatchers.go index 18ede255..3df25877 100644 --- a/workspace-server/internal/handlers/workspace_dispatchers.go +++ b/workspace-server/internal/handlers/workspace_dispatchers.go @@ -35,6 +35,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" ) // HasProvisioner reports whether either backend (CP or local Docker) is @@ -101,6 +102,14 @@ func (h *WorkspaceHandler) DefaultTier() int { // lives in prepareProvisionContext (shared by both per-backend // goroutines). func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool { + provlog.Event("provision.start", map[string]any{ + "workspace_id": workspaceID, + "name": payload.Name, + "tier": payload.Tier, + "runtime": payload.Runtime, + "template": payload.Template, + "sync": false, + }) if h.cpProv != nil { go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) return true @@ -136,6 +145,14 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri // Keep these two helpers in sync — when one grows a new arm (third // backend, retry semantics), the other should too. func (h *WorkspaceHandler) provisionWorkspaceAutoSync(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) bool { + provlog.Event("provision.start", map[string]any{ + "workspace_id": workspaceID, + "name": payload.Name, + "tier": payload.Tier, + "runtime": payload.Runtime, + "template": payload.Template, + "sync": true, + }) if h.cpProv != nil { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) return true diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 3b3097c4..c5712be5 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -12,6 +12,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/gin-gonic/gin" ) @@ -431,6 +432,16 @@ func coalesceRestart(workspaceID string, cycle func()) { // NPE'd before reaching the reprovision step — which is why every SaaS dead- // agent incident pre-this-fix required manual restart from canvas. func (h *WorkspaceHandler) stopForRestart(ctx context.Context, workspaceID string) { + backend := "none" + if h.provisioner != nil { + backend = "docker" + } else if h.cpProv != nil { + backend = "cp" + } + provlog.Event("restart.pre_stop", map[string]any{ + "workspace_id": workspaceID, + "backend": backend, + }) if h.provisioner != nil { h.provisioner.Stop(ctx, workspaceID) return diff --git a/workspace-server/internal/provisioner/cp_provisioner.go b/workspace-server/internal/provisioner/cp_provisioner.go index edc67d9f..bdc5bff7 100644 --- a/workspace-server/internal/provisioner/cp_provisioner.go +++ b/workspace-server/internal/provisioner/cp_provisioner.go @@ -14,6 +14,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" ) // CPProvisionerAPI is the contract WorkspaceHandler uses to talk to the @@ -214,6 +215,13 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, } log.Printf("CP provisioner: workspace %s → EC2 instance %s (%s)", cfg.WorkspaceID, result.InstanceID, result.State) + provlog.Event("provision.ec2_started", map[string]any{ + "workspace_id": cfg.WorkspaceID, + "instance_id": result.InstanceID, + "state": result.State, + "tier": cfg.Tier, + "runtime": cfg.Runtime, + }) return result.InstanceID, nil } @@ -273,6 +281,10 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { return fmt.Errorf("cp provisioner: stop %s: unexpected %d: %s", workspaceID, resp.StatusCode, strings.TrimSpace(string(body))) } + provlog.Event("provision.ec2_stopped", map[string]any{ + "workspace_id": workspaceID, + "instance_id": instanceID, + }) return nil } diff --git a/workspace-server/internal/provlog/provlog.go b/workspace-server/internal/provlog/provlog.go new file mode 100644 index 00000000..4434c238 --- /dev/null +++ b/workspace-server/internal/provlog/provlog.go @@ -0,0 +1,48 @@ +// Package provlog emits structured, single-line JSON log records for +// provisioning-lifecycle boundaries (workspace create, EC2 start/stop, +// restart, idempotency skips). Records share a stable `evt:` prefix and +// JSON payload so a future grep|jq pipeline (or a Loki/Datadog ingest) +// can reconstruct the per-workspace timeline without parsing the +// human-prose log lines that already exist. +// +// Existing log.Printf lines are intentionally NOT replaced — they +// remain the operator-facing message. Event() emits a paired structured +// record alongside, additive only. +// +// Event taxonomy (extend by appending; never rename): +// +// provision.start — workspace row inserted, EC2 about to launch +// provision.skip_existing — idempotency hit, no new EC2 +// provision.ec2_started — RunInstances returned an instance id +// provision.ec2_stopped — TerminateInstances acknowledged +// restart.pre_stop — Restart handler about to call Stop +// +// Required fields per event are documented at each call site. +package provlog + +import ( + "encoding/json" + "log" +) + +// Event writes a single line of the form: +// +// evt: {"k":"v",...} +// +// to the standard logger. JSON encoding errors are silently swallowed — +// a logging helper must never panic the request path. fields may be +// nil; the empty payload `{}` is still useful to mark an event boundary. +func Event(name string, fields map[string]any) { + if fields == nil { + fields = map[string]any{} + } + payload, err := json.Marshal(fields) + if err != nil { + // Fall back to a static payload so the event boundary still + // appears in the log. The marshal error itself is recorded + // on a best-effort basis. + log.Printf("evt: %s {\"_marshal_err\":%q}", name, err.Error()) + return + } + log.Printf("evt: %s %s", name, payload) +} diff --git a/workspace-server/internal/provlog/provlog_test.go b/workspace-server/internal/provlog/provlog_test.go new file mode 100644 index 00000000..7d2f5f5f --- /dev/null +++ b/workspace-server/internal/provlog/provlog_test.go @@ -0,0 +1,97 @@ +package provlog + +import ( + "bytes" + "encoding/json" + "log" + "strings" + "testing" +) + +// captureLog redirects the default logger to a buffer for the duration +// of fn and returns whatever was written. +func captureLog(t *testing.T, fn func()) string { + t.Helper() + var buf bytes.Buffer + prevWriter := log.Writer() + prevFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) // strip date/time so assertions stay deterministic + t.Cleanup(func() { + log.SetOutput(prevWriter) + log.SetFlags(prevFlags) + }) + fn() + return buf.String() +} + +func TestEvent_EmitsEvtPrefixAndJSONPayload(t *testing.T) { + out := captureLog(t, func() { + Event("provision.start", map[string]any{ + "workspace_id": "ws-123", + "tier": 4, + "runtime": "claude-code", + }) + }) + out = strings.TrimSpace(out) + if !strings.HasPrefix(out, "evt: provision.start ") { + t.Fatalf("expected evt-prefixed line, got %q", out) + } + jsonPart := strings.TrimPrefix(out, "evt: provision.start ") + var got map[string]any + if err := json.Unmarshal([]byte(jsonPart), &got); err != nil { + t.Fatalf("payload not valid JSON: %v (raw=%q)", err, jsonPart) + } + if got["workspace_id"] != "ws-123" { + t.Errorf("workspace_id field lost: %+v", got) + } + // JSON unmarshal turns numbers into float64 — exact-equal compare. + if got["tier"].(float64) != 4 { + t.Errorf("tier field lost: %+v", got) + } + if got["runtime"] != "claude-code" { + t.Errorf("runtime field lost: %+v", got) + } +} + +func TestEvent_NilFieldsEmitsEmptyObject(t *testing.T) { + out := captureLog(t, func() { + Event("restart.pre_stop", nil) + }) + if !strings.Contains(out, "evt: restart.pre_stop {}") { + t.Fatalf("nil fields should emit empty object, got %q", out) + } +} + +func TestEvent_PreservesEventBoundaryOnUnmarshalableValue(t *testing.T) { + // A channel cannot be marshaled by encoding/json — verify we still + // emit the event boundary with a recorded marshal error. This is + // the structural guarantee: the call site never sees a panic, and + // the event name is always present in the log. + out := captureLog(t, func() { + Event("provision.ec2_started", map[string]any{ + "chan": make(chan int), + }) + }) + if !strings.Contains(out, "evt: provision.ec2_started ") { + t.Fatalf("event boundary missing on marshal error: %q", out) + } + if !strings.Contains(out, "_marshal_err") { + t.Fatalf("expected _marshal_err sentinel, got %q", out) + } +} + +func TestEvent_SingleLineOutput(t *testing.T) { + // Log aggregators line-split on \n. A multi-line emit would silently + // fragment the JSON across two records — pin single-line shape. + out := captureLog(t, func() { + Event("provision.skip_existing", map[string]any{ + "existing_id": "ws-abc", + "name": "child-1", + }) + }) + trimmed := strings.TrimRight(out, "\n") + if strings.Contains(trimmed, "\n") { + t.Fatalf("event line must be single-line, got %q", out) + } +} From 39931acd9c21a3e0d450d8d20f9cbf265691e156 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 12:34:41 -0700 Subject: [PATCH 7/8] fix(inbox-uploads): cancel BatchFetcher futures on wait_all timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The deadline contract was incomplete: wait_all logged the timeout but close() then called executor.shutdown(wait=True), which blocked on the leaked workers — undoing the user-facing timeout. The inbox poll loop would stall indefinitely on a hung /content fetch instead of returning to chat-message processing. Fix: wait_all now flips self._timed_out and cancels queued (not-yet- started) futures; close() reads that flag and switches to shutdown(wait=False, cancel_futures=True) on the timeout path. Currently-running workers can't be interrupted by Python's threading model, but they're now detached daemons whose blocking httpx call no longer gates the next poll. Healthy path (no timeout) keeps the existing drain-and-wait so a still-queued ack POST isn't dropped mid-write. Two new tests pin both legs of the contract end-to-end: - close-after-timeout-doesn't-block: hung worker, wait_all(0.05s) fires the timeout, close() returns in <1s instead of waiting ~5s for the worker to come back. - close-without-timeout-still-drains: 2 slow workers, wait_all completes cleanly, close() drains both ack POSTs. Resolves the BatchFetcher timeout-cancellation finding from the post-merge five-axis review of Phase 5b. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox_uploads.py | 44 +++++++++++--- workspace/tests/test_inbox_uploads.py | 84 +++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 7 deletions(-) diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 913efdcd..69fa53aa 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -547,6 +547,9 @@ class BatchFetcher: ) self._futures: list[concurrent.futures.Future[Any]] = [] self._closed = False + # Flipped to True by wait_all when the timeout fires; close() + # reads this to decide between drain-and-wait vs cancel-queued. + self._timed_out = False def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: """Submit ``row`` for fetch + stage + ack. Non-blocking — the @@ -580,8 +583,12 @@ class BatchFetcher: exception propagating up to here is unexpected and we don't want one bad fetch to abort the whole batch. - Timeouts are also logged + swallowed; the caller will move on - and the un-acked rows will be retried by the next poll. + Timeouts are also logged + swallowed AND record the timed-out + futures on ``self._timed_out`` so ``close`` can cancel them + without paying their full latency. Without this hand-off, + ``close()``'s ``shutdown(wait=True)`` would block on the leaked + workers and undo the user-facing timeout — the inbox poll loop + would stall indefinitely on a hung /content fetch. """ if not self._futures: return @@ -606,22 +613,45 @@ class BatchFetcher: len(not_done), timeout, ) + # Mark these futures so close() knows to cancel-not-wait. We + # cancel queued-but-not-started ones immediately; futures + # already running can't be cancelled (Python's threading + # model), but close() will pass cancel_futures=True so any + # remaining queued items don't run. + for fut in not_done: + fut.cancel() + self._timed_out = True def close(self) -> None: """Tear down the executor + (if owned) the httpx client. Idempotent. After close, ``submit`` raises and the BatchFetcher cannot be reused — construct a fresh one for the next poll. + + If ``wait_all`` reported a timeout, shutdown skips the + ``wait=True`` drain and instead asks the executor to drop queued + futures (``cancel_futures=True``). Currently-running workers + can't be interrupted by Python's threading model, but the poll + loop returns immediately rather than blocking on a hung fetch. """ if self._closed: return self._closed = True - # Drain remaining futures so worker threads aren't killed mid- - # request. wait=True is the safe default; for an inbox poller a - # 60s tail at shutdown is acceptable since uploads in flight are - # the only thing close() is called between. + timed_out = getattr(self, "_timed_out", False) try: - self._executor.shutdown(wait=True) + if timed_out: + # cancel_futures landed in Python 3.9 — guarded for older + # interpreters via a TypeError fallback. Drop queued + # tasks; running ones will exit when their httpx call + # eventually returns or the daemon thread dies. + try: + self._executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + self._executor.shutdown(wait=False) + else: + # Healthy path: wait for in-flight work so we don't + # interrupt a fetch mid-write. + self._executor.shutdown(wait=True) except Exception as exc: # noqa: BLE001 logger.warning("inbox_uploads: executor shutdown error: %s", exc) if self._own_client and self._client is not None: diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index c13cea70..37446760 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -1034,3 +1034,87 @@ def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): else: sys.modules.pop("httpx", None) assert result is None + + +def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers(): + """The deadline contract: when wait_all times out, close() must NOT + block waiting for the leaked worker threads. Otherwise the inbox + poll loop stalls indefinitely on a hung /content fetch — undoing + the user-facing timeout. + + Strategy: build a client whose .get() blocks on a threading.Event + that the test never sets. Submit a row, wait_all with a tiny + timeout, then time close(). If close() drained-and-waited it would + block until we set the event (i.e., forever in this test). + """ + import threading + import time + + blocker = threading.Event() # never set — workers stay running + + def _hang_get(url, headers=None): + # Wait at most ~5s so a buggy implementation eventually unblocks + # the test instead of timing out the whole pytest run, but + # nothing legitimate should reach this fallback. + blocker.wait(timeout=5.0) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_hang_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=1, # serialize so submitting 1 keeps the worker busy + ) + bf.submit(_row_with_id("act-a", "a")) + # Tiny timeout — wait_all must report the future as not_done. + bf.wait_all(timeout=0.05) + t0 = time.time() + bf.close() + elapsed = time.time() - t0 + # Unblock the lingering worker so it doesn't pollute later tests. + blocker.set() + + # Without the cancel-on-timeout fix, close() would block until + # blocker.set() — i.e., the full ~5s. With the fix it returns + # immediately because shutdown(wait=False) doesn't drain. + assert elapsed < 1.0, ( + f"close() blocked for {elapsed:.2f}s after wait_all timeout — " + "cancel-on-timeout regression: close() is draining instead of bailing" + ) + + +def test_batch_fetcher_close_without_timeout_still_drains(): + """Negative leg of the timeout contract: when wait_all completes + cleanly (no timeout), close() must KEEP its drain-and-wait + behavior so a still-queued ack POST isn't dropped mid-write. + """ + import time + + def _slow_get(url, headers=None): + time.sleep(0.05) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=2, + ) + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() # generous default timeout — should not fire + bf.close() + + # All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait. + assert client.get.call_count == 2 + assert client.post.call_count == 2 From 412dec0d876d5511e77a237a8f33c57470ce25e5 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 12:39:03 -0700 Subject: [PATCH 8/8] fix(memory-plugin): gate sidecar spawn on cutover-active MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #2906 spawned the sidecar unconditionally on every tenant boot. The plugin's first migration runs \`CREATE EXTENSION vector\` which fails on tenant Postgres without pgvector preinstalled — every staging tenant redeploy aborted at the 30s health gate. CP fail-fast kept running tenants on the prior image (no outage), but the new image was DOA. Caught on staging redeploy 2026-05-05 19:23 with \`pq: extension "vector" is not available\`. Fix: only spawn the sidecar when the operator has flipped the cutover flag — \`MEMORY_V2_CUTOVER=true\` OR \`MEMORY_PLUGIN_URL\` is set. * Aligns the entrypoint to the same opt-in posture wiring.go already uses (it skips building the client when MEMORY_PLUGIN_URL is empty). * Until cutover, the sidecar isn't even running — no migration, no health gate, no boot-time pgvector dependency. * Operators activating cutover already redeploy with the new env vars set; that's when the sidecar starts. By definition they've verified pgvector is available before flipping. * MEMORY_PLUGIN_DISABLE=1 escape hatch preserved; harness fix #2915 becomes belt-and-suspenders (still respected). Both Dockerfile and entrypoint-tenant.sh updated. Behavior change for existing deployments: zero (cutover env vars still unset → sidecar still inert, but now also not running). Refs RFC #2728. Hotfix for #2906; supersedes the migration-path fragility class (the sidecar isn't doing migrations on tenants that won't use it). --- workspace-server/Dockerfile | 27 ++++++++++++++++++--------- workspace-server/entrypoint-tenant.sh | 21 +++++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/workspace-server/Dockerfile b/workspace-server/Dockerfile index ecf43fab..d6754312 100644 --- a/workspace-server/Dockerfile +++ b/workspace-server/Dockerfile @@ -63,21 +63,30 @@ fi # Memory v2 sidecar (built-in postgres plugin). Co-located with the # main server so operators flipping MEMORY_V2_CUTOVER=true don't need -# to provision a separate service. Stays inert at the protocol layer -# until that env var is set — the workspace-server's wiring.go skips -# building the client without MEMORY_PLUGIN_URL, so the running plugin -# is a no-op for traffic. +# to provision a separate service. # -# Env defaults: +# Spawn-gating: only start the sidecar when the operator has indicated +# they want it — either MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set. +# Without that signal, the sidecar adds zero value (the platform's +# wiring.go skips building the client too) but pays a real cost: the +# plugin's first migration runs `CREATE EXTENSION vector`, which fails +# on tenant Postgres without pgvector preinstalled and aborts container +# boot via the 30s health gate. Caught on staging redeploy 2026-05-05. +# +# Env defaults (when sidecar IS spawned): # MEMORY_PLUGIN_DATABASE_URL = $DATABASE_URL (share existing Postgres; # plugin's `memory_namespaces` / `memory_records` tables coexist # with `agent_memories` and the rest of the platform schema — # no conflicts. Operator can override with a separate URL.) -# MEMORY_PLUGIN_LISTEN_ADDR = :9100 +# MEMORY_PLUGIN_LISTEN_ADDR = 127.0.0.1:9100 # -# Set MEMORY_PLUGIN_DISABLE=1 to skip launching the sidecar entirely -# (e.g. an operator running the plugin externally on a separate host). -if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then +# Set MEMORY_PLUGIN_DISABLE=1 to force-skip the sidecar even with +# cutover env set (e.g. running the plugin externally on a separate host). +memory_plugin_wanted="" +if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then + memory_plugin_wanted=1 +fi +if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then : "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}" : "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}" export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR diff --git a/workspace-server/entrypoint-tenant.sh b/workspace-server/entrypoint-tenant.sh index 8059cc1c..0f2d6dde 100644 --- a/workspace-server/entrypoint-tenant.sh +++ b/workspace-server/entrypoint-tenant.sh @@ -21,14 +21,23 @@ PORT=3000 HOSTNAME=0.0.0.0 node server.js & CANVAS_PID=$! # Memory v2 sidecar (built-in postgres plugin). See Dockerfile entrypoint -# comment for rationale. Stays inert at the protocol layer until the -# operator sets MEMORY_V2_CUTOVER=true; running it is cheap. +# comment for rationale. # -# Defaults the plugin's DATABASE_URL to the tenant's DATABASE_URL so -# operators don't need to configure two of them. Plugin tables coexist -# with the platform schema. +# Spawn-gating: only start the sidecar when the operator has indicated +# they want it (MEMORY_V2_CUTOVER=true OR MEMORY_PLUGIN_URL set). +# Without that signal, the sidecar adds zero value and risks aborting +# tenant boot via the 30s health gate when the tenant Postgres lacks +# pgvector. Caught on staging redeploy 2026-05-05: +# pq: extension "vector" is not available +# +# Defaults (when sidecar IS spawned): MEMORY_PLUGIN_DATABASE_URL +# falls back to the tenant's DATABASE_URL. MEMORY_PLUGIN_PID="" -if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$DATABASE_URL" ]; then +memory_plugin_wanted="" +if [ "$MEMORY_V2_CUTOVER" = "true" ] || [ -n "$MEMORY_PLUGIN_URL" ]; then + memory_plugin_wanted=1 +fi +if [ -z "$MEMORY_PLUGIN_DISABLE" ] && [ -n "$memory_plugin_wanted" ] && [ -n "$DATABASE_URL" ]; then : "${MEMORY_PLUGIN_DATABASE_URL:=$DATABASE_URL}" : "${MEMORY_PLUGIN_LISTEN_ADDR:=:9100}" export MEMORY_PLUGIN_DATABASE_URL MEMORY_PLUGIN_LISTEN_ADDR