Compare commits

..

1 Commits

Author SHA1 Message Date
b4de9c97f4 docs: clarify pytest-asyncio is an optional test dep in CLAUDE.md
All checks were successful
Test / test (3.12) (pull_request) Successful in 1m48s
Test / test (3.13) (pull_request) Successful in 1m50s
Test / test (3.11) (pull_request) Successful in 1m54s
Without `pip install -e '.[test]'`, pytest silently skips 4 async tests
in test_sdk.py (all marked @pytest.mark.asyncio). Clarify the two-step
install so new contributors don't waste time debugging silent test failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 09:01:17 +00:00
11 changed files with 27 additions and 659 deletions

View File

@ -1 +0,0 @@
verify-fix-1778444420

View File

@ -28,43 +28,27 @@ Format per entry:
## KI-001 — RemoteAgentClient does not implement inbound A2A server
**File:** `molecule_agent/client.py`, `molecule_agent/a2a_server.py`, `molecule_agent/inbound.py`
**Status:** ✅ Resolved
**File:** `molecule_agent/client.py`
**Status:** Known limitation; not yet implemented
**Severity:** Medium
**Platform phase:** Phase 30.8b
### Resolution
The SDK now ships two inbound delivery paths:
### Symptom
`RemoteAgentClient` can call other workspaces via A2A (outbound), but cannot
receive inbound A2A calls. Any workspace that tries to delegate to or message
this agent will get a connection refused or timeout.
**Push mode (`A2AServer`)** — `molecule_agent.a2a_server.A2AServer` exposes an HTTP
server with a `POST /a2a/inbound` endpoint. It runs in a background daemon thread
alongside the client's heartbeat loop. Use with `PushDelivery` from `inbound.py`:
### Impact
Agents running outside the platform's Docker network via `molecule_agent` are
one-directional. Platform agents cannot push work to them — the remote agent
must poll or be provisioned with a publicly reachable webhook endpoint.
```python
from molecule_agent import RemoteAgentClient, A2AServer
from molecule_agent.inbound import PushDelivery
server = A2AServer(agent_id=workspace_id, inbound_url="https://...", message_handler=my_handler)
server.start_in_background()
client = RemoteAgentClient(workspace_id=workspace_id, platform_url=...)
client.reported_url = server.inbound_url # register with this URL
client.register()
# Pass PushDelivery so run_agent_loop doesn't also poll
client.run_agent_loop(handler=my_handler, delivery=PushDelivery(client, server))
```
**Poll mode (`PollDelivery`)** — for agents behind NAT or without a public endpoint,
the SDK's `PollDelivery` polls `GET /workspaces/:id/activity` on a configurable
interval (default 5s). Both paths feed the same `MessageHandler` callback.
`run_agent_loop` picks `PollDelivery` automatically when no explicit delivery is passed.
### Files added
- `molecule_agent/a2a_server.py``A2AServer` class; `HTTPServer` + `_A2AHandler`
running in a daemon thread; handles `POST /a2a/inbound`, async/sync handlers,
graceful stop.
- `molecule_agent/inbound.py``InboundDelivery` protocol, `PollDelivery`,
`PushDelivery` (wraps `A2AServer`), `InboundMessage`, `MessageHandler`.
- `RemoteAgentClient.run_agent_loop` updated to accept any `InboundDelivery`.
### Suggested fix
Add an `A2AServerMixin` class that exposes a `FastAPI` or `flask` route
(`POST /a2a/inbound`) and runs in a background thread alongside the client's
heartbeat loop. Register the inbound URL with the platform via the
`/registry/discover` update endpoint when the server starts. See Phase 30.8b
in the platform `PLAN.md`.
---
@ -253,41 +237,3 @@ def _is_hex(value: str) -> bool:
`tests/conftest.py` exists with the `_CaptureHandler` stub definition.
`pytest tests/test_call_peer_errors.py` runs all 12 tests cleanly.
`pytest tests/` collects all test files with no collection errors.
---
## KI-009 — `run_heartbeat_loop()` does not honour external stop signals
**File:** `molecule_agent/client.py` (`RemoteAgentClient.run_heartbeat_loop`,
`RemoteAgentClient.run_agent_loop`)
**Status:** ✅ Resolved (PR: `feat/ki-009-stop-event`)
**Severity:** Low
### Resolution
Added `stop_event: threading.Event | None = None` parameter to both
`run_heartbeat_loop()` and `run_agent_loop()`. When set, the event is checked
at the start of each loop iteration (before `max_iterations`). When the event
is set, the loop exits immediately with return value `"stopped"`. The check
is ordered before `max_iterations` so a signal always wins.
Callers achieve graceful shutdown by setting the event from a SIGTERM handler:
```python
import signal, threading
from molecule_agent import RemoteAgentClient
stop = threading.Event()
client = RemoteAgentClient(...)
def sigterm_handler(signum, frame):
stop.set()
signal.signal(signal.SIGTERM, sigterm_handler)
terminal = client.run_heartbeat_loop(stop_event=stop)
# terminal == "stopped" when killed cleanly
```
Tests added: `test_run_loop_exits_on_stop_event`,
`test_run_loop_respects_stop_event_between_iterations` in
`tests/test_remote_agent.py`; `test_run_agent_loop_exits_on_stop_event`
in `tests/test_inbound.py`.

View File

@ -10,22 +10,6 @@ This is the client side of [Phase 30](../../../PLAN.md). The platform side
ships in the same release; this package is just the SDK an agent author
imports.
## What this is / what this isn't
| | `molecule_agent` (this package) | `molecule-ai-workspace-runtime` (separate PyPI wheel) |
|---|---|---|
| **Where it runs** | OUTSIDE Molecule workspaces — your laptop, CI runner, external cloud VM, sidecar service | INSIDE the workspace container, started by the platform |
| **What it talks to** | The platform's HTTP API (`/registry/*`, `/workspaces/:id/*`) | The platform's MCP server (`molecule_*` tools) plus the platform-managed A2A bus |
| **What it exposes** | `RemoteAgentClient`, `A2AServer`, `PollDelivery`, `MessageHandler` | `BaseAdapter`, `a2a_tools`, runtime capabilities, smoke-contract hooks |
| **Who installs it** | You, the external-agent author, via `pip install molecule-sdk` | The platform, baked into the workspace template image at provision time |
| **Auth model** | Bearer token minted by `POST /registry/register`, cached at `~/.molecule/<id>/.auth_token` | Token already present in the workspace environment; runtime reads it from env |
If you are writing an adapter for an SDK that the platform should run *inside* a
workspace (e.g. langchain, crewai, hermes), you want
[`molecule-ai-workspace-runtime`](https://pypi.org/project/molecule-ai-workspace-runtime/),
not this package. See <https://doc.moleculesai.app/docs/runtime-mcp> for the
in-workspace-runtime authoring guide.
## Install
```bash
@ -72,7 +56,7 @@ A runnable demo with full setup walkthrough lives at
| `poll_state()` | 30.4 | Lightweight `{status, paused, deleted}` poll |
| `heartbeat(...)` | 30.1 | Single bearer-authed heartbeat |
| `get_peers()` / `discover_peer()` | 30.6 | Sibling URL discovery with TTL cache |
| `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback; response may be wrapped in OFFSEC-003 boundary markers — use ``strip_a2a_boundary()`` to remove them |
| `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback |
| `fetch_inbound(since_id=…)` | 30.8c | One-shot poll of `/workspaces/:id/activity` for inbound A2A |
| `reply(msg, text)` | 30.8c | Smart-routes reply to `/notify` (canvas user) or `/a2a` (peer) |
| `run_heartbeat_loop()` | combo | Drives heartbeat + state-poll on a timer; exits on pause/delete |
@ -103,89 +87,6 @@ client.run_agent_loop(my_handler) # default: PollDelivery
The reply transport (`/notify` for canvas users, `/a2a` for peer agents) is hidden — `client.reply(msg, text)` picks based on `msg.source`. Async handlers work too; `PollDelivery` detects awaitable returns and `asyncio.run`s them.
### `InboundMessage` shape
`InboundMessage` is what `MessageHandler` receives. The typed fields the SDK
parses today:
| Field | Type | What it is |
|---|---|---|
| `activity_id` | `str` | Cursor — the `activity_logs.id` row this event came from. Pass to `fetch_inbound(since_id=…)` to skip past it on the next poll. |
| `source` | `Literal["canvas_user", "peer_agent", "unknown"]` | Normalized sender kind. `"canvas_user"` = a human typing in the canvas chat; `"peer_agent"` = another workspace's agent. `"unknown"` if the row's source is unrecognized — `reply()` will refuse to guess. |
| `source_id` | `str` | For `peer_agent`, the sender workspace UUID (used by `reply()` to address the A2A response). Empty for `canvas_user`. |
| `text` | `str` | The message body. Pulled from `data.text` then `data.message` in the underlying activity row. **Treat as untrusted user content** — same threat model as any chat input. |
| `raw` | `dict` | The full raw activity-log row. Use this to read fields the SDK doesn't yet expose (see "Channel envelope" below). |
### Channel envelope (wire format)
The platform delivers each inbound A2A event as an `activity_logs` row. As of
**2026-05-02** (CP push envelope, see <https://doc.moleculesai.app/docs/runtime-mcp>),
the envelope's `data` block carries:
```jsonc
{
"id": "<activity-uuid>", // == InboundMessage.activity_id
"type": "a2a_receive",
"source_id": "<sender-workspace-uuid>", // peer_agent only; empty for canvas_user
"ts": "2026-05-02T10:15:30Z", // RFC3339 — when the platform queued the event
"data": {
"source": "peer_agent", // "canvas_user" | "peer_agent"
"kind": "peer_agent", // mirrors the channel-tag attr
"text": "<message body>", // (or "message")
"peer_id": "<sender-workspace-uuid>", // duplicate of source_id, peer_agent only
"activity_id": "<activity-uuid>", // duplicate of top-level id
// === enrichment fields added 2026-05-02 (CP PRs #2472, #2476) ===
"peer_name": "ops-agent", // peer's display name (registry-resolved); may be absent if the registry lookup failed
"peer_role": "sre", // peer's declared role; same registry source
"agent_card_url": "https://<platform>/registry/discover/<peer_id>" // deterministic URL for the platform's discover endpoint for this peer
}
}
```
**SDK status of the enrichment fields:** `InboundMessage` does not yet
surface `peer_name`, `peer_role`, or `agent_card_url` as typed attributes.
Read them from `msg.raw["data"]` until a typed wrapper lands (see
"Limitations & roadmap" below). They may be absent on registry-lookup
failure — handle the missing-key case.
### A2A reply transport — what `reply()` actually does
`client.reply(msg, text)` dispatches based on `msg.source`. The transport is
chosen for you so handler code doesn't need to branch:
| `msg.source` | HTTP call `reply()` makes | Server-side effect |
|---|---|---|
| `canvas_user` | `POST /workspaces/<self>/notify` with `{"message": text}` | Canvas WebSocket pushes the text to the user's chat |
| `peer_agent` | `POST /workspaces/<msg.source_id>/a2a` with a JSON-RPC `message/send` envelope; sets `X-Source-Workspace-Id: <self>` and `X-Workspace-ID: <self>` | Platform routes the JSON-RPC message to the peer workspace's inbound A2A endpoint |
| `unknown` | Raises `ValueError` | The SDK refuses to guess. Inspect `msg.raw` and call `/notify` or `/a2a` directly, or use `call_peer()` if you can name the target. |
`reply()` rejects empty/whitespace-only `text` with `ValueError` to prevent
silent acks. On non-2xx the underlying `requests.HTTPError` propagates so the
handler can decide whether to retry, surface to its observability, or fail
loudly.
### OFFSEC-003 — A2A peer response trust boundary
As of the OFFSEC-003 platform rollout, peer A2A responses are wrapped in
trust-boundary markers before being returned to callers::
[A2A_RESULT_FROM_PEER]<peer response text>[/A2A_RESULT_FROM_PEER]
The markers signal that the enclosed content is untrusted third-party output.
Use ``strip_a2a_boundary()`` to remove them before passing the response to
your agent context::
from molecule_agent import RemoteAgentClient, strip_a2a_boundary
result = client.call_peer(target_id, "do the thing")
raw_text = result.get("result", {}).get("text", "")
trusted_text = strip_a2a_boundary(raw_text)
The function returns the input unchanged if the markers are absent (platform
versions older than the OFFSEC-003 rollout), so it is safe to call on any
response.
## CLI: `molecule_agent connect`
One command bootstraps the full poll-mode loop. No code beyond your handler:
@ -209,55 +110,14 @@ def echo(msg, client):
All flags also read from environment variables (`MOLECULE_PLATFORM_URL`, `MOLECULE_WORKSPACE_ID`, `MOLECULE_WORKSPACE_TOKEN`, `MOLECULE_POLL_INTERVAL`, `MOLECULE_CURSOR_FILE`). SIGTERM/SIGINT shut the loop down cleanly.
## What it doesn't do (yet) — Limitations & roadmap
These are server-supported features that the SDK has not yet wrapped, plus
known protocol gaps. Each entry is named so a follow-up issue / PR can
reference it directly.
## What it doesn't do (yet)
- **No long-poll.** Activity polling is fixed-cadence (default 5s). Server-side long-poll support would cut p50 inbound latency to ~0; tracked separately.
- **No automatic reconnect after token loss.** If `~/.molecule/<id>/.auth_token`
is deleted, you'll need to re-issue the token via the platform admin (since
`POST /registry/register` is idempotent — it won't mint a second token for
a workspace that already has one).
- **`fetch_inbound()` `peer_id` and `before_ts` filters (resolved).**
`RemoteAgentClient.fetch_inbound()` now accepts `peer_id` (narrow to events
from a specific peer workspace) and `before_ts` (RFC3339 cutoff for backlog
replay) as optional parameters. Both are forwarded to the activity endpoint.
- **`InboundMessage` peer enrichment fields (resolved).**
`InboundMessage` now exposes `peer_name`, `peer_role`, and `agent_card_url`
as typed string attributes (default ``""`` when absent). The `raw` dict
remains available for any future envelope fields not yet wrapped.
- **Tenant + Origin headers are not auto-injected.** When the platform is
deployed multi-tenant on the SaaS edge (`*.staging.moleculesai.app`,
`*.moleculesai.app`), the WAF requires:
- `X-Molecule-Org-Id: <org-uuid>` — TenantGuard middleware uses this to
pin the request to the right tenant; missing-header requests 404
- `Origin: <PLATFORM_URL>``/workspaces/*` and `/registry/*/peers`
silently rewrite to Next.js without it (returns an empty 404, easy
to misdiagnose as auth)
A follow-up PR will accept `org_id` and `origin` constructor kwargs and
inject the headers automatically.
- **Tenant + Origin headers (resolved).**
`RemoteAgentClient` now accepts `org_id` and `origin` constructor kwargs and
injects them automatically on every request:
```python
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="…",
platform_url="https://<your-tenant>.moleculesai.app",
org_id="<your-org-uuid>", # sets X-Molecule-Org-Id
origin="https://<your-tenant>.moleculesai.app", # sets Origin
)
```
## Design choices
- **Blocking (`requests`), not async.** Drops into any runtime — script,

View File

@ -39,7 +39,6 @@ from .client import (
PeerInfo,
RemoteAgentClient,
WorkspaceState,
strip_a2a_boundary,
verify_plugin_sha256,
)
from .inbound import (
@ -72,7 +71,6 @@ __all__ = [
"DEFAULT_POLL_INTERVAL",
"compute_plugin_sha256",
"verify_plugin_sha256",
"strip_a2a_boundary",
"__version__",
]
__version__ = "0.1.0"

View File

@ -27,7 +27,6 @@ import os
import random
import subprocess
import tarfile
import threading
import time
import uuid
from dataclasses import dataclass, field
@ -91,43 +90,6 @@ def make_idempotency_key(task_text: str) -> str:
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
# ── A2A boundary marker stripping (OFFSEC-003) ───────────────────────────────
_A2A_BOUNDARY_START = "[A2A_RESULT_FROM_PEER]"
_A2A_BOUNDARY_END = "[/A2A_RESULT_FROM_PEER]"
def strip_a2a_boundary(text: str) -> str:
"""Strip OFFSEC-003 trust-boundary markers from a peer A2A response.
The platform wraps peer A2A responses in::
[A2A_RESULT_FROM_PEER]<content>[/A2A_RESULT_FROM_PEER]
to mark them as untrusted third-party content. Call this helper to
remove the wrapper before passing the content to your agent context:
Usage::
result = client.call_peer(target_id, "do the thing")
text = result.get("result", {}).get("text", "")
content = strip_a2a_boundary(text)
Returns the interior content (everything between the two markers).
Returns the input unchanged if the boundary markers are absent (the caller
may be talking to a platform version older than the OFFSEC-003 rollout).
Returns ``""`` for ``None`` or empty input.
"""
if not text:
return ""
start = text.find(_A2A_BOUNDARY_START)
end = text.find(_A2A_BOUNDARY_END)
if start != -1 and end != -1 and end > start:
return text[start + len(_A2A_BOUNDARY_START):end].strip()
return text
def _safe_extract_tar(tf: tarfile.TarFile, dest: Path) -> None:
"""Extract a tarfile, refusing entries that would escape `dest`
and logging skipped symlinks/hardlinks.
@ -306,13 +268,6 @@ class RemoteAgentClient:
0700 permissions if missing.
heartbeat_interval: Seconds between heartbeats in the run loop.
state_poll_interval: Seconds between state polls in the run loop.
org_id: Optional tenant UUID for multi-tenant SaaS deployments
(``*.moleculesai.app``). When set the client sends
``X-Molecule-Org-Id: <org_id>`` on every request so the WAF
can route to the correct tenant.
origin: Optional origin string for multi-tenant SaaS deployments.
When set the client sends ``Origin: <origin>`` on every
request, preventing silent Next.js rewrites on SaaS edges.
"""
def __init__(
@ -326,8 +281,6 @@ class RemoteAgentClient:
state_poll_interval: float = DEFAULT_STATE_POLL_INTERVAL,
url_cache_ttl: float = DEFAULT_URL_CACHE_TTL,
session: requests.Session | None = None,
org_id: str = "",
origin: str = "",
) -> None:
self.workspace_id = workspace_id
self.platform_url = platform_url.rstrip("/")
@ -336,8 +289,6 @@ class RemoteAgentClient:
self.heartbeat_interval = heartbeat_interval
self.state_poll_interval = state_poll_interval
self.url_cache_ttl = url_cache_ttl
self._org_id = org_id
self._origin = origin
# Phase 30.6 — sibling URL cache keyed by workspace id. Values are
# (url, expires_at_unix_seconds). Process-memory only; we re-fetch
# on restart because agent lifetimes are short enough that
@ -430,14 +381,9 @@ class RemoteAgentClient:
def _auth_headers(self) -> dict[str, str]:
tok = self.load_token()
headers: dict[str, str] = {}
if tok:
headers["Authorization"] = f"Bearer {tok}"
if self._org_id:
headers["X-Molecule-Org-Id"] = self._org_id
if self._origin:
headers["Origin"] = self._origin
return headers
if not tok:
return {}
return {"Authorization": f"Bearer {tok}"}
def _get_with_retry(
self,
@ -753,12 +699,10 @@ class RemoteAgentClient:
since_id: str | None = None,
limit: int = 100,
type: str = "a2a_receive",
peer_id: str | None = None,
before_ts: str | None = None,
) -> list["InboundMessage"]:
"""Fetch one batch of inbound A2A activity rows.
Hits ``GET /workspaces/:id/activity?type=&since_id=&peer_id=&before_ts=&limit=``.
Hits ``GET /workspaces/:id/activity?type=&since_id=&limit=``.
Returns the rows newer than ``since_id`` in oldest-first order,
parsed into :class:`~molecule_agent.inbound.InboundMessage`.
@ -774,13 +718,6 @@ class RemoteAgentClient:
type: Activity-row type filter. Default ``"a2a_receive"``;
pass another type to consume different streams (e.g.
``"workspace_state_changed"``).
peer_id: Narrow to events sourced from a specific peer workspace.
Pass a workspace UUID to receive only A2A messages from that
peer. ``None`` (default) accepts messages from any source.
before_ts: RFC3339 timestamp only rows older than this cutoff
are returned. Use to fetch a historical backlog window (e.g.
replaying missed messages after a restart). ``None`` means
no upper-bound cutoff; the server returns the most recent rows.
Returns:
List of :class:`InboundMessage`, oldest first. May be empty.
@ -798,10 +735,6 @@ class RemoteAgentClient:
params: dict[str, str] = {"type": type, "limit": str(int(limit))}
if since_id:
params["since_id"] = since_id
if peer_id:
params["peer_id"] = peer_id
if before_ts:
params["before_ts"] = before_ts
url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity"
resp = self._session.get(
url,
@ -922,7 +855,6 @@ class RemoteAgentClient:
delivery: "InboundDelivery | None" = None,
max_iterations: int | None = None,
task_supplier: "callable | None" = None,
stop_event: threading.Event | None = None,
) -> str:
"""Combined heartbeat + state-poll + inbound-delivery loop.
@ -948,14 +880,10 @@ class RemoteAgentClient:
task_supplier: Optional zero-arg callable returning a dict
``{"current_task": str, "active_tasks": int}`` reported on
each heartbeat (same contract as :py:meth:`run_heartbeat_loop`).
stop_event: Optional :py:class:`threading.Event` that, when set,
causes the loop to exit cleanly with return value ``"stopped"``.
Call ``stop_event.set()`` from a SIGTERM handler to achieve
graceful shutdown. Ignored when ``None``.
Returns:
The terminal status: ``"paused"``, ``"removed"``,
``"max_iterations"``, or ``"stopped"``.
The terminal status: ``"paused"``, ``"removed"``, or
``"max_iterations"``.
Errors from the activity poll, heartbeat, or state poll are
logged and the loop continues a transient platform hiccup
@ -970,8 +898,6 @@ class RemoteAgentClient:
i = 0
try:
while True:
if stop_event is not None and stop_event.is_set():
return "stopped"
if max_iterations is not None and i >= max_iterations:
return "max_iterations"
i += 1
@ -1232,11 +1158,10 @@ class RemoteAgentClient:
self,
max_iterations: int | None = None,
task_supplier: "callable | None" = None,
stop_event: threading.Event | None = None,
) -> str:
"""Drive heartbeat + state-poll on a timer. Returns the terminal
status when the loop exits (``"paused"``, ``"removed"``,
``"max_iterations"``, or ``"stopped"``).
status when the loop exits (``"paused"``, ``"removed"``, or
``"max_iterations"``).
Args:
max_iterations: Stop after N loop iterations. None = run until
@ -1245,10 +1170,6 @@ class RemoteAgentClient:
task_supplier: Optional zero-arg callable returning a dict
``{"current_task": str, "active_tasks": int}`` fetched
each iteration. Lets the agent report what it's doing.
stop_event: Optional :py:class:`threading.Event` that, when set,
causes the loop to exit cleanly with return value ``"stopped"``.
Call ``stop_event.set()`` from a SIGTERM handler to achieve
graceful shutdown. Ignored when ``None``.
The loop sends one heartbeat + one state poll per iteration; the
next iteration sleeps for ``heartbeat_interval`` seconds. Errors
@ -1258,8 +1179,6 @@ class RemoteAgentClient:
"""
i = 0
while True:
if stop_event is not None and stop_event.is_set():
return "stopped"
if max_iterations is not None and i >= max_iterations:
return "max_iterations"
i += 1

View File

@ -67,12 +67,6 @@ class InboundMessage:
* ``unknown`` the activity row didn't carry a recognizable source.
:py:meth:`RemoteAgentClient.reply` raises ``ValueError`` rather than
guess.
``peer_name``, ``peer_role``, and ``agent_card_url`` are enrichment fields
added to the channel envelope on 2026-05-02 (CP PRs #2472, #2476). They
are resolved from the platform's registry at delivery time and may be absent
if the registry lookup failed or the sender hasn't registered yet. Always
check for absence rather than assuming they are populated.
"""
activity_id: str
@ -80,12 +74,6 @@ class InboundMessage:
source_id: str
text: str
raw: dict[str, Any] = field(default_factory=dict)
# Enrichment fields — populated from row["data"]["peer_name"],
# row["data"]["peer_role"], row["data"]["agent_card_url"].
# May be empty strings if the registry lookup failed.
peer_name: str = ""
peer_role: str = ""
agent_card_url: str = ""
class CursorLostError(Exception):
@ -146,9 +134,6 @@ def _parse_activity_row(row: dict[str, Any]) -> InboundMessage | None:
source_id=source_id,
text=text,
raw=row,
peer_name=str(data.get("peer_name") or ""),
peer_role=str(data.get("peer_role") or ""),
agent_card_url=str(data.get("agent_card_url") or ""),
)

View File

@ -153,83 +153,6 @@ def test_parse_activity_row_text_alt_key():
assert msg.text == "alt"
def test_parse_activity_row_enrichment_fields():
"""peer_name, peer_role, agent_card_url are extracted from row["data"]."""
row = {
"id": "act-enriched",
"source_id": "peer-ops-01",
"data": {
"source": "peer_agent",
"text": "status report",
"peer_name": "ops-agent",
"peer_role": "sre",
"agent_card_url": "https://platform.example/registry/discover/peer-ops-01",
},
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.peer_name == "ops-agent"
assert msg.peer_role == "sre"
assert msg.agent_card_url == "https://platform.example/registry/discover/peer-ops-01"
def test_parse_activity_row_enrichment_fields_absent():
"""When enrichment fields are absent, InboundMessage fields default to ""."""
row = {
"id": "act-no-enrich",
"source_id": "peer-x",
"data": {"source": "peer_agent", "text": "hello"},
# no peer_name, peer_role, agent_card_url
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.peer_name == ""
assert msg.peer_role == ""
assert msg.agent_card_url == ""
def test_parse_activity_row_enrichment_fields_null_becomes_empty():
"""null values in enrichment fields become "" (not the string "None")."""
row = {
"id": "act-null-enrich",
"source_id": "peer-y",
"data": {
"source": "peer_agent",
"text": "ping",
"peer_name": None,
"peer_role": None,
"agent_card_url": None,
},
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.peer_name == ""
assert msg.peer_role == ""
assert msg.agent_card_url == ""
def test_parse_activity_row_enrichment_in_canvas_user_row():
"""Enrichment fields are parsed even when source is canvas_user (edge case
where the platform enriches the row even for user-sourced messages)."""
row = {
"id": "act-user-enrich",
"source_id": "user",
"data": {
"source": "canvas_user",
"text": "hi",
"peer_name": "someone",
"peer_role": "human",
"agent_card_url": "https://platform.example/registry/discover/user-uuid",
},
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "canvas_user"
assert msg.peer_name == "someone"
assert msg.peer_role == "human"
assert msg.agent_card_url == "https://platform.example/registry/discover/user-uuid"
# ---------------------------------------------------------------------------
# fetch_inbound
# ---------------------------------------------------------------------------
@ -327,61 +250,6 @@ def test_fetch_inbound_429_retries_via_get_with_retry(
assert client._session.get.call_count == 2
def test_fetch_inbound_peer_id_filter():
"""peer_id param is forwarded to the GET as a query parameter."""
session = MagicMock()
rows = [{"id": "act-peer", "source_id": "peer-ops", "data": {"source": "peer_agent", "text": "hi"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(peer_id="peer-ops")
params = session.get.call_args.kwargs["params"]
assert params["peer_id"] == "peer-ops"
def test_fetch_inbound_before_ts_filter():
"""before_ts param is forwarded to the GET as a query parameter."""
session = MagicMock()
rows = [{"id": "act-old", "data": {"source": "canvas_user", "text": "hi"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(before_ts="2026-05-01T00:00:00Z")
params = session.get.call_args.kwargs["params"]
assert params["before_ts"] == "2026-05-01T00:00:00Z"
def test_fetch_inbound_combined_filters():
"""peer_id and before_ts can be used together."""
session = MagicMock()
rows = [{"id": "act-combo", "source_id": "peer-x", "data": {"source": "peer_agent", "text": "combo"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(peer_id="peer-x", before_ts="2026-05-09T12:00:00Z")
params = session.get.call_args.kwargs["params"]
assert params["peer_id"] == "peer-x"
assert params["before_ts"] == "2026-05-09T12:00:00Z"
# ---------------------------------------------------------------------------
# reply()
# ---------------------------------------------------------------------------
@ -770,29 +638,3 @@ def test_run_agent_loop_swallows_task_supplier_exception(
hb_kwargs = client.heartbeat.call_args.kwargs
assert hb_kwargs["current_task"] == ""
assert hb_kwargs["active_tasks"] == 0
# ---------------------------------------------------------------------------
# run_agent_loop — stop_event (KI-009)
# ---------------------------------------------------------------------------
def test_run_agent_loop_exits_on_stop_event(client: RemoteAgentClient, monkeypatch):
"""stop_event.set() before calling the loop causes immediate 'stopped' exit."""
import threading
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client.heartbeat = MagicMock() # avoid actual HTTP calls
client.poll_state = MagicMock(return_value=None)
stop = threading.Event()
stop.set() # signal stop BEFORE entering the loop
terminal = client.run_agent_loop(
lambda *_: None, max_iterations=999, stop_event=stop
)
assert terminal == "stopped"
# No heartbeat attempted — stop_event fired before the first iteration
assert client.heartbeat.call_count == 0

View File

@ -334,66 +334,6 @@ def test_run_loop_task_supplier_reported(client: RemoteAgentClient, monkeypatch)
assert body["active_tasks"] == 1
# ---------------------------------------------------------------------------
# run_heartbeat_loop — stop_event (KI-009)
# ---------------------------------------------------------------------------
def test_run_loop_exits_on_stop_event(client: RemoteAgentClient, monkeypatch):
"""stop_event.set() before calling the loop causes immediate 'stopped' exit,
before the first heartbeat is sent."""
import threading
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client._session.get.return_value = FakeResponse(
200, {"status": "online", "paused": False, "deleted": False}
)
stop = threading.Event()
stop.set() # signal stop BEFORE entering the loop
terminal = client.run_heartbeat_loop(max_iterations=999, stop_event=stop)
assert terminal == "stopped"
# Zero heartbeats sent — stop_event fired before the first iteration body
assert client._session.post.call_count == 0
def test_run_loop_respects_stop_event_between_iterations(
client: RemoteAgentClient, monkeypatch
):
"""stop_event.set() mid-run causes exit after the current iteration finishes."""
import threading
import molecule_agent.client as mod
# Don't stub sleep — we need the event to fire *between* iterations
call_count = [0]
def fake_sleep(s):
call_count[0] += 1
if call_count[0] == 2:
stop.set() # signal stop after the second iteration
# otherwise no-op so the test doesn't wait
monkeypatch.setattr(mod.time, "sleep", fake_sleep)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client._session.get.return_value = FakeResponse(
200, {"status": "online", "paused": False, "deleted": False}
)
stop = threading.Event()
terminal = client.run_heartbeat_loop(max_iterations=999, stop_event=stop)
assert terminal == "stopped"
# Two full iterations completed before stop was honoured
assert client._session.post.call_count == 2
assert client._session.get.call_count == 2
# ---------------------------------------------------------------------------
# WorkspaceState dataclass
# ---------------------------------------------------------------------------
@ -769,7 +709,7 @@ def test_install_plugin_404_raises_with_useful_url(client: RemoteAgentClient):
import hashlib
from molecule_agent.client import make_idempotency_key, strip_a2a_boundary
from molecule_agent.client import make_idempotency_key
def test_delegate_posts_task_and_idempotency_key(client: RemoteAgentClient):
@ -810,78 +750,6 @@ def test_delegate_sends_bearer_and_workspace_headers(client: RemoteAgentClient):
assert kwargs["headers"]["X-Workspace-ID"] == "ws-abc-123"
def test_auth_headers_injects_org_id_and_origin():
"""org_id and origin kwargs are injected into every request headers."""
session = MagicMock()
session.post.return_value = FakeResponse(200, {})
client = RemoteAgentClient(
workspace_id="ws-test",
platform_url="https://platform.example.com",
org_id="org-uuid-123",
origin="https://tenant.moleculesai.app",
session=session,
)
client.save_token("tok")
client.delegate(task="x", target_id="peer")
hdrs = session.post.call_args[1]["headers"]
assert hdrs["Authorization"] == "Bearer tok"
assert hdrs["X-Molecule-Org-Id"] == "org-uuid-123"
assert hdrs["Origin"] == "https://tenant.moleculesai.app"
def test_auth_headers_org_id_only():
"""origin can be omitted when only org_id is needed."""
session = MagicMock()
session.post.return_value = FakeResponse(200, {})
client = RemoteAgentClient(
workspace_id="ws-test",
platform_url="https://platform.example.com",
org_id="org-uuid-456",
session=session,
)
client.save_token("tok")
client.poll_state()
hdrs = session.get.call_args[1]["headers"]
assert hdrs["Authorization"] == "Bearer tok"
assert hdrs["X-Molecule-Org-Id"] == "org-uuid-456"
assert "Origin" not in hdrs
def test_auth_headers_origin_only():
"""org_id can be omitted when only origin is needed."""
session = MagicMock()
session.post.return_value = FakeResponse(200, {})
client = RemoteAgentClient(
workspace_id="ws-test",
platform_url="https://platform.example.com",
origin="https://other-tenant.moleculesai.app",
session=session,
)
client.save_token("tok")
client.poll_state()
hdrs = session.get.call_args[1]["headers"]
assert hdrs["Authorization"] == "Bearer tok"
assert "X-Molecule-Org-Id" not in hdrs
assert hdrs["Origin"] == "https://other-tenant.moleculesai.app"
def test_auth_headers_no_extra_when_unset():
"""When neither org_id nor origin is set, headers contain only auth."""
session = MagicMock()
session.post.return_value = FakeResponse(200, {})
client = RemoteAgentClient(
workspace_id="ws-test",
platform_url="https://platform.example.com",
session=session,
)
client.save_token("tok")
client.poll_state()
hdrs = session.get.call_args[1]["headers"]
assert hdrs["Authorization"] == "Bearer tok"
assert "X-Molecule-Org-Id" not in hdrs
assert "Origin" not in hdrs
def test_delegate_raises_on_http_error(client: RemoteAgentClient):
client.save_token("tok")
client._session.post.return_value = FakeResponse(500, {"error": "boom"})
@ -943,55 +811,6 @@ def test_make_idempotency_key_deterministic():
assert a == b
# ---------------------------------------------------------------------------
# strip_a2a_boundary — OFFSEC-003 trust-boundary marker stripping
# ---------------------------------------------------------------------------
def test_strip_a2a_boundary_basic():
"""Interior text between the two markers is returned."""
wrapped = "[A2A_RESULT_FROM_PEER]hello world[/A2A_RESULT_FROM_PEER]"
assert strip_a2a_boundary(wrapped) == "hello world"
def test_strip_a2a_boundary_strips_whitespace_edges():
"""Trailing/leading whitespace inside the boundary is stripped."""
wrapped = "[A2A_RESULT_FROM_PEER] peer reply [/A2A_RESULT_FROM_PEER]"
assert strip_a2a_boundary(wrapped) == "peer reply"
def test_strip_a2a_boundary_no_markers_returns_unchanged():
"""Without both markers present the input passes through unchanged."""
assert strip_a2a_boundary("plain text with no markers") == "plain text with no markers"
def test_strip_a2a_boundary_only_start_returns_unchanged():
"""Only a start marker — no-op to stay safe during mid-rollout."""
assert strip_a2a_boundary("[A2A_RESULT_FROM_PEER]unclosed") == "[A2A_RESULT_FROM_PEER]unclosed"
def test_strip_a2a_boundary_only_end_returns_unchanged():
"""Only an end marker — no-op."""
assert strip_a2a_boundary("[/A2A_RESULT_FROM_PEER]no start") == "[/A2A_RESULT_FROM_PEER]no start"
def test_strip_a2a_boundary_empty_returns_empty():
assert strip_a2a_boundary("") == ""
assert strip_a2a_boundary(None) == "" # type: ignore[arg-type]
def test_strip_a2a_boundary_end_before_start_returns_unchanged():
"""If end marker appears before start, treat as no-op."""
text = "[/A2A_RESULT_FROM_PEER]X[A2A_RESULT_FROM_PEER]"
assert strip_a2a_boundary(text) == text
def test_strip_a2a_boundary_multiline_content():
"""Multiline interior content is preserved (stripped at edges only)."""
wrapped = "[A2A_RESULT_FROM_PEER]\n step one\n step two\n[/A2A_RESULT_FROM_PEER]"
assert strip_a2a_boundary(wrapped) == "step one\n step two"
# ---------------------------------------------------------------------------
# _safe_extract_tar
# ---------------------------------------------------------------------------