From 475da5b64c831df3d7a543b791f8e68f71aa7909 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 14:28:58 -0700 Subject: [PATCH] refactor(workspace): extract inbox tools from a2a_tools.py (RFC #2873 iter 4e) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Continues the OSS-shape refactor. After iters 4a-4d (rbac, delegation, memory, messaging) the only behavior left in ``a2a_tools.py`` was ``report_activity`` plus three thin inbox-tool wrappers and the ``_enrich_inbound_for_agent`` helper. This iter extracts the inbox slice to ``a2a_tools_inbox.py`` so the kitchen-sink module shrinks from 280 LOC to ~165 LOC of imports + report_activity + back-compat re-export blocks. Extracted symbols: - ``_INBOX_NOT_ENABLED_MSG`` (sentinel) - ``_enrich_inbound_for_agent`` (poll-path peer enrichment helper) - ``tool_inbox_peek`` - ``tool_inbox_pop`` - ``tool_wait_for_message`` Re-exports (`from a2a_tools_inbox import …`) preserve the public ``a2a_tools.tool_inbox_*`` surface so existing tests + call sites continue to resolve unchanged. New tests in test_a2a_tools_inbox_split.py: 1. **Drift gate (5)** — every previously-public symbol on a2a_tools is the EXACT same object as a2a_tools_inbox.foo (`is`, not `==`), catches a future "wrap with logging" refactor that silently loses existing test coverage. 2. **Import contract (1)** — a2a_tools_inbox does NOT eagerly import a2a_tools at module load. Pins the layered architecture: the extracted slice depends on ``inbox`` + a lazy ``a2a_client`` import, never on the kitchen-sink that re-exports it. 3. **_enrich_inbound_for_agent branches (5)** — peer_id-empty (canvas_user) returns dict unchanged; missing peer_id key same; a2a_client unavailable (test harness, partial install) degrades gracefully with a bare envelope; registry hit populates peer_name + peer_role + agent_card_url; registry miss still surfaces agent_card_url (constructable from peer_id alone). The full timeout-clamp / validation / JSON-shape behavior matrix for the three wrappers stays in test_a2a_tools_inbox_wrappers.py — those tests pass identically against both the alias and the underlying impl. Wiring updates: - ``scripts/build_runtime_package.py``: add ``a2a_tools_inbox`` to ``TOP_LEVEL_MODULES`` so it ships in the runtime wheel and the drift gate doesn't fail the next publish. - ``.github/workflows/ci.yml``: add ``a2a_tools_inbox.py`` to ``CRITICAL_FILES`` so the 75% MCP/inbox/auth per-file floor applies — this is now where the inbox-delivery code actually lives. --- .github/workflows/ci.yml | 1 + scripts/build_runtime_package.py | 1 + workspace/a2a_tools.py | 134 ++----------- workspace/a2a_tools_inbox.py | 140 ++++++++++++++ workspace/tests/test_a2a_tools_inbox_split.py | 181 ++++++++++++++++++ 5 files changed, 334 insertions(+), 123 deletions(-) create mode 100644 workspace/a2a_tools_inbox.py create mode 100644 workspace/tests/test_a2a_tools_inbox_split.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f2512cb9..3fda3fac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -387,6 +387,7 @@ jobs: "a2a_mcp_server.py" "mcp_cli.py" "a2a_tools.py" + "a2a_tools_inbox.py" "inbox.py" "platform_auth.py" ) diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index e4ced302..30bac7e8 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -56,6 +56,7 @@ TOP_LEVEL_MODULES = { "a2a_mcp_server", "a2a_tools", "a2a_tools_delegation", + "a2a_tools_inbox", "a2a_tools_memory", "a2a_tools_messaging", "a2a_tools_rbac", diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 9e923222..1b1ef267 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -154,127 +154,15 @@ from a2a_tools_memory import ( # noqa: E402 (import after the top-of-module im ) -# --------------------------------------------------------------------------- -# Inbox tools — inbound delivery for the standalone molecule-mcp path. -# --------------------------------------------------------------------------- -# -# The InboxState singleton is set by mcp_cli before the MCP server starts -# (see workspace/inbox.py for the rationale). In-container runtimes never -# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and -# these tools surface an informational error rather than raising. -# -# When-to-use guidance (mirrored in platform_tools/registry.py): agents -# in standalone-runtime mode should call ``wait_for_message`` to block -# on the next inbound message after they've emitted a reply, forming -# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting -# the queue without consuming; ``inbox_pop`` removes a handled message. - -_INBOX_NOT_ENABLED_MSG = ( - "Error: inbox polling is not enabled in this runtime. The standalone " - "molecule-mcp wrapper activates it; in-container runtimes receive " - "messages via push delivery and do not need these tools." +# Inbox tool handlers — extracted to a2a_tools_inbox (RFC #2873 iter 4e). +# Re-imported here so call sites + tests that reference +# ``a2a_tools.tool_inbox_peek`` / ``tool_inbox_pop`` / ``tool_wait_for_message`` +# / ``_enrich_inbound_for_agent`` / ``_INBOX_NOT_ENABLED_MSG`` keep +# resolving identically. +from a2a_tools_inbox import ( # noqa: E402 (import after the top-of-module imports) + _INBOX_NOT_ENABLED_MSG, + _enrich_inbound_for_agent, + tool_inbox_peek, + tool_inbox_pop, + tool_wait_for_message, ) - - -def _enrich_inbound_for_agent(d: dict) -> dict: - """Add peer_name / peer_role / agent_card_url to a poll-path message. - - The PUSH path (a2a_mcp_server._build_channel_notification) already - enriches the meta dict with these fields, so a Claude Code host - with channel-push sees them. The POLL path goes through - InboxMessage.to_dict, which is intentionally identity-free (the - storage layer doesn't know about the registry cache). Without this - helper, every non-Claude-Code MCP client that uses inbox_peek / - wait_for_message gets a plain message and the receiving agent - can't tell who's writing — breaking the contract documented in - a2a_mcp_server.py:303-345 ("In both paths the same fields apply"). - - Cache-first non-blocking enrichment (same shape as push): on cache - miss the helper returns the bare message; the next call within the - 5-min TTL hits the warm cache. Failure to enrich is non-fatal — - the agent still gets text + peer_id + kind + activity_id, just - without the friendly identity. - """ - peer_id = d.get("peer_id") or "" - if not peer_id: - # canvas_user — no peer to enrich; helper returns the plain - # message unchanged so the canvas reply path still works. - return d - try: - from a2a_client import ( # local import — avoid module-load cycle - _agent_card_url_for, - enrich_peer_metadata_nonblocking, - ) - except Exception: # noqa: BLE001 - # If a2a_client is unavailable (test harness, partial install), - # degrade gracefully — agent still gets the bare envelope. - return d - record = enrich_peer_metadata_nonblocking(peer_id) - if record is not None: - if name := record.get("name"): - d["peer_name"] = name - if role := record.get("role"): - d["peer_role"] = role - # agent_card_url is constructable from peer_id alone — surface it - # even when registry enrichment misses, so the receiving agent has - # a single endpoint to hit for the peer's full capability list. - d["agent_card_url"] = _agent_card_url_for(peer_id) - return d - -async def tool_inbox_peek(limit: int = 10) -> str: - """Return up to ``limit`` pending inbound messages without removing them.""" - import inbox # local import — avoids a circular dep at module load - - state = inbox.get_state() - if state is None: - return _INBOX_NOT_ENABLED_MSG - messages = state.peek(limit=limit if isinstance(limit, int) else 10) - return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages]) - - -async def tool_inbox_pop(activity_id: str) -> str: - """Remove a message from the inbox queue by activity_id.""" - import inbox - - state = inbox.get_state() - if state is None: - return _INBOX_NOT_ENABLED_MSG - if not isinstance(activity_id, str) or not activity_id: - return "Error: activity_id is required." - removed = state.pop(activity_id) - if removed is None: - return json.dumps({"removed": False, "activity_id": activity_id}) - return json.dumps({"removed": True, "activity_id": activity_id}) - - -async def tool_wait_for_message(timeout_secs: float = 60.0) -> str: - """Block until a new message arrives or ``timeout_secs`` elapses. - - Returns the head message non-destructively; the agent decides - whether to ``inbox_pop`` it after acting. - """ - import asyncio - - import inbox - - state = inbox.get_state() - if state is None: - return _INBOX_NOT_ENABLED_MSG - - try: - timeout = float(timeout_secs) - except (TypeError, ValueError): - timeout = 60.0 - # Cap at 300s — Claude Code's default tool timeout is ~10min, and - # blocking longer than 5min wastes the prompt cache window for - # nothing useful. Operators who want longer can call repeatedly. - timeout = max(0.0, min(timeout, 300.0)) - - # The threading.Event-based wait would block the asyncio loop. - # Run it on the default executor so the MCP server can keep - # processing other JSON-RPC requests while we sleep. - loop = asyncio.get_running_loop() - message = await loop.run_in_executor(None, state.wait, timeout) - if message is None: - return json.dumps({"timeout": True, "timeout_secs": timeout}) - return json.dumps(_enrich_inbound_for_agent(message.to_dict())) diff --git a/workspace/a2a_tools_inbox.py b/workspace/a2a_tools_inbox.py new file mode 100644 index 00000000..36f4406c --- /dev/null +++ b/workspace/a2a_tools_inbox.py @@ -0,0 +1,140 @@ +"""Inbox tool handlers — single-concern slice of the a2a_tools surface. + +Standalone-runtime path for inbound-message delivery (push-mode runtimes +get messages via the channel-tag synthesis in a2a_mcp_server). The +``InboxState`` singleton is set by ``mcp_cli`` before the MCP server +starts; in-container runtimes never call ``inbox.activate(...)`` so +``inbox.get_state()`` returns None and these tools surface an +informational error instead of raising. + +When-to-use guidance for agents (mirrored in +``platform_tools/registry.py``): + - ``wait_for_message``: block until a new inbound message arrives, then + decide what to do with it; forms the loop ``wait → respond → wait``. + - ``inbox_peek``: inspect the queue non-destructively. + - ``inbox_pop``: remove a handled message by activity_id. + +Extracted from ``a2a_tools.py`` in RFC #2873 iter 4e so the kitchen-sink +module shrinks to a back-compat shim. The extraction also makes the +``_enrich_inbound_for_agent`` helper unit-testable in isolation — +previously it was buried in ``a2a_tools`` and only exercised through +the inbox wrappers, leaving its peer-id-empty / cache-miss / registry- +unavailable branches under-covered. +""" +from __future__ import annotations + +import asyncio +import json + + +# Surfaced when the inbox subsystem is not initialised. Returned by the +# three inbox tool wrappers below so the agent gets a clear "this +# runtime delivers via push" message instead of a NameError. +_INBOX_NOT_ENABLED_MSG = ( + "Error: inbox polling is not enabled in this runtime. The standalone " + "molecule-mcp wrapper activates it; in-container runtimes receive " + "messages via push delivery and do not need these tools." +) + + +def _enrich_inbound_for_agent(d: dict) -> dict: + """Add peer_name / peer_role / agent_card_url to a poll-path message. + + The PUSH path (a2a_mcp_server._build_channel_notification) already + enriches the meta dict with these fields, so a Claude Code host + with channel-push sees them. The POLL path goes through + InboxMessage.to_dict, which is intentionally identity-free (the + storage layer doesn't know about the registry cache). Without this + helper, every non-Claude-Code MCP client that uses inbox_peek / + wait_for_message gets a plain message and the receiving agent + can't tell who's writing — breaking the contract documented in + a2a_mcp_server.py:303-345 ("In both paths the same fields apply"). + + Cache-first non-blocking enrichment (same shape as push): on cache + miss the helper returns the bare message; the next call within the + 5-min TTL hits the warm cache. Failure to enrich is non-fatal — + the agent still gets text + peer_id + kind + activity_id, just + without the friendly identity. + """ + peer_id = d.get("peer_id") or "" + if not peer_id: + # canvas_user — no peer to enrich; helper returns the plain + # message unchanged so the canvas reply path still works. + return d + try: + from a2a_client import ( # local import — avoid module-load cycle + _agent_card_url_for, + enrich_peer_metadata_nonblocking, + ) + except Exception: # noqa: BLE001 + # If a2a_client is unavailable (test harness, partial install), + # degrade gracefully — agent still gets the bare envelope. + return d + record = enrich_peer_metadata_nonblocking(peer_id) + if record is not None: + if name := record.get("name"): + d["peer_name"] = name + if role := record.get("role"): + d["peer_role"] = role + # agent_card_url is constructable from peer_id alone — surface it + # even when registry enrichment misses, so the receiving agent has + # a single endpoint to hit for the peer's full capability list. + d["agent_card_url"] = _agent_card_url_for(peer_id) + return d + + +async def tool_inbox_peek(limit: int = 10) -> str: + """Return up to ``limit`` pending inbound messages without removing them.""" + import inbox # local import — avoids a circular dep at module load + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + messages = state.peek(limit=limit if isinstance(limit, int) else 10) + return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages]) + + +async def tool_inbox_pop(activity_id: str) -> str: + """Remove a message from the inbox queue by activity_id.""" + import inbox + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + if not isinstance(activity_id, str) or not activity_id: + return "Error: activity_id is required." + removed = state.pop(activity_id) + if removed is None: + return json.dumps({"removed": False, "activity_id": activity_id}) + return json.dumps({"removed": True, "activity_id": activity_id}) + + +async def tool_wait_for_message(timeout_secs: float = 60.0) -> str: + """Block until a new message arrives or ``timeout_secs`` elapses. + + Returns the head message non-destructively; the agent decides + whether to ``inbox_pop`` it after acting. + """ + import inbox + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + + try: + timeout = float(timeout_secs) + except (TypeError, ValueError): + timeout = 60.0 + # Cap at 300s — Claude Code's default tool timeout is ~10min, and + # blocking longer than 5min wastes the prompt cache window for + # nothing useful. Operators who want longer can call repeatedly. + timeout = max(0.0, min(timeout, 300.0)) + + # The threading.Event-based wait would block the asyncio loop. + # Run it on the default executor so the MCP server can keep + # processing other JSON-RPC requests while we sleep. + loop = asyncio.get_running_loop() + message = await loop.run_in_executor(None, state.wait, timeout) + if message is None: + return json.dumps({"timeout": True, "timeout_secs": timeout}) + return json.dumps(_enrich_inbound_for_agent(message.to_dict())) diff --git a/workspace/tests/test_a2a_tools_inbox_split.py b/workspace/tests/test_a2a_tools_inbox_split.py new file mode 100644 index 00000000..bf6df29c --- /dev/null +++ b/workspace/tests/test_a2a_tools_inbox_split.py @@ -0,0 +1,181 @@ +"""Drift gate + import-contract tests for ``a2a_tools_inbox`` (RFC #2873 iter 4e). + +The full behavior matrix for the three inbox tool wrappers lives in +``test_a2a_tools_inbox_wrappers.py`` (kept on the public ``a2a_tools`` +module so the same tests pin both the alias and the underlying impl). + +This file pins: + + 1. **Drift gate** — every previously-public symbol on ``a2a_tools`` + (``tool_inbox_peek``, ``tool_inbox_pop``, ``tool_wait_for_message``, + ``_enrich_inbound_for_agent``, ``_INBOX_NOT_ENABLED_MSG``) is the + EXACT same object as ``a2a_tools_inbox.foo``. Refactor wrapping + silently loses existing test coverage; this gate makes that drift + fail fast. + 2. **Import contract** — ``a2a_tools_inbox`` does NOT pull in + ``a2a_tools`` at module-load time (the layered architecture: it + depends only on stdlib + a lazy import of ``inbox`` + a lazy + import of ``a2a_client``, never the kitchen-sink module that + re-exports it). + 3. **_enrich_inbound_for_agent** branches that the wrapper tests + can't easily reach: peer_id-empty (canvas_user) returns the + dict unchanged; a2a_client unavailable degrades gracefully. +""" +from __future__ import annotations + +import sys + +import pytest + + +@pytest.fixture(autouse=True) +def _require_workspace_id(monkeypatch): + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "http://test.invalid") + yield + + +# ============== Drift gate ============== + +class TestBackCompatAliases: + def test_tool_inbox_peek_alias(self): + import a2a_tools + import a2a_tools_inbox + assert a2a_tools.tool_inbox_peek is a2a_tools_inbox.tool_inbox_peek + + def test_tool_inbox_pop_alias(self): + import a2a_tools + import a2a_tools_inbox + assert a2a_tools.tool_inbox_pop is a2a_tools_inbox.tool_inbox_pop + + def test_tool_wait_for_message_alias(self): + import a2a_tools + import a2a_tools_inbox + assert ( + a2a_tools.tool_wait_for_message is a2a_tools_inbox.tool_wait_for_message + ) + + def test_enrich_helper_alias(self): + import a2a_tools + import a2a_tools_inbox + assert ( + a2a_tools._enrich_inbound_for_agent + is a2a_tools_inbox._enrich_inbound_for_agent + ) + + def test_inbox_not_enabled_msg_alias(self): + import a2a_tools + import a2a_tools_inbox + assert ( + a2a_tools._INBOX_NOT_ENABLED_MSG is a2a_tools_inbox._INBOX_NOT_ENABLED_MSG + ) + + +# ============== Import contract ============== + +class TestImportContract: + def test_inbox_module_does_not_import_a2a_tools_eagerly(self): + # Force a fresh load of a2a_tools_inbox without a2a_tools in sight. + for k in [k for k in list(sys.modules) if k in ( + "a2a_tools_inbox", "a2a_tools", + )]: + sys.modules.pop(k, None) + import a2a_tools_inbox # noqa: F401 — load only + + # a2a_tools_inbox MUST NOT have caused a2a_tools to load. The + # extracted module sits BELOW the kitchen-sink in the layering; + # the dependency arrow points the other direction. + assert "a2a_tools" not in sys.modules, ( + "a2a_tools_inbox eagerly imported a2a_tools — the kitchen-sink " + "module must not be a load-time dependency of its slices." + ) + + +# ============== _enrich_inbound_for_agent branches ============== + +class TestEnrichInboundForAgent: + def test_canvas_user_returns_dict_unchanged(self): + # peer_id empty → canvas_user → no enrichment, no a2a_client touch. + from a2a_tools_inbox import _enrich_inbound_for_agent + + msg = {"activity_id": "a-1", "kind": "canvas_user", "peer_id": ""} + result = _enrich_inbound_for_agent(msg) + assert result is msg # same dict, mutated in place if at all + assert "peer_name" not in result + assert "peer_role" not in result + assert "agent_card_url" not in result + + def test_missing_peer_id_key_returns_unchanged(self): + from a2a_tools_inbox import _enrich_inbound_for_agent + + msg = {"activity_id": "a-2", "kind": "canvas_user"} # no peer_id key + result = _enrich_inbound_for_agent(msg) + assert result is msg + assert "agent_card_url" not in result + + def test_a2a_client_unavailable_degrades_gracefully(self, monkeypatch): + # Simulate a2a_client import failing (test harness, partial + # install). The helper must return the bare envelope, not raise. + from a2a_tools_inbox import _enrich_inbound_for_agent + + # Force an ImportError by poisoning sys.modules. + import builtins + real_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "a2a_client": + raise ImportError("simulated a2a_client unavailable") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + msg = {"activity_id": "a-3", "kind": "peer_agent", "peer_id": "ws-x"} + result = _enrich_inbound_for_agent(msg) + # Bare envelope back — no peer_name, no agent_card_url. Crucially + # the helper did NOT raise, so the inbox tool surfaces the message + # to the agent even when the registry is unreachable. + assert result is msg + assert "peer_name" not in result + assert "agent_card_url" not in result + + def test_registry_record_populates_peer_name_and_role(self, monkeypatch): + from a2a_tools_inbox import _enrich_inbound_for_agent + + # Stub out the lazy-imported a2a_client functions. + import sys + import types + fake_a2a_client = types.SimpleNamespace( + _agent_card_url_for=lambda pid: f"http://test/agent/{pid}", + enrich_peer_metadata_nonblocking=lambda pid: { + "name": "PeerOne", + "role": "worker", + }, + ) + monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client) + + msg = {"activity_id": "a-4", "kind": "peer_agent", "peer_id": "ws-1"} + result = _enrich_inbound_for_agent(msg) + assert result["peer_name"] == "PeerOne" + assert result["peer_role"] == "worker" + assert result["agent_card_url"] == "http://test/agent/ws-1" + + def test_registry_miss_keeps_agent_card_url(self, monkeypatch): + # On registry cache miss the helper still surfaces agent_card_url + # because it's constructable from peer_id alone — preserves the + # contract that the receiving agent always has somewhere to + # fetch the peer's full capability list. + from a2a_tools_inbox import _enrich_inbound_for_agent + + import sys + import types + fake_a2a_client = types.SimpleNamespace( + _agent_card_url_for=lambda pid: f"http://test/agent/{pid}", + enrich_peer_metadata_nonblocking=lambda pid: None, # cache miss + ) + monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client) + + msg = {"activity_id": "a-5", "kind": "peer_agent", "peer_id": "ws-2"} + result = _enrich_inbound_for_agent(msg) + assert "peer_name" not in result + assert "peer_role" not in result + assert result["agent_card_url"] == "http://test/agent/ws-2"