Compare commits
1 Commits
main
...
fix/clarif
| Author | SHA1 | Date | |
|---|---|---|---|
| b4de9c97f4 |
@ -1 +0,0 @@
|
||||
verify-fix-1778444420
|
||||
@ -28,43 +28,27 @@ Format per entry:
|
||||
|
||||
## KI-001 — RemoteAgentClient does not implement inbound A2A server
|
||||
|
||||
**File:** `molecule_agent/client.py`, `molecule_agent/a2a_server.py`, `molecule_agent/inbound.py`
|
||||
**Status:** ✅ Resolved
|
||||
**File:** `molecule_agent/client.py`
|
||||
**Status:** Known limitation; not yet implemented
|
||||
**Severity:** Medium
|
||||
**Platform phase:** Phase 30.8b
|
||||
|
||||
### Resolution
|
||||
The SDK now ships two inbound delivery paths:
|
||||
### Symptom
|
||||
`RemoteAgentClient` can call other workspaces via A2A (outbound), but cannot
|
||||
receive inbound A2A calls. Any workspace that tries to delegate to or message
|
||||
this agent will get a connection refused or timeout.
|
||||
|
||||
**Push mode (`A2AServer`)** — `molecule_agent.a2a_server.A2AServer` exposes an HTTP
|
||||
server with a `POST /a2a/inbound` endpoint. It runs in a background daemon thread
|
||||
alongside the client's heartbeat loop. Use with `PushDelivery` from `inbound.py`:
|
||||
### Impact
|
||||
Agents running outside the platform's Docker network via `molecule_agent` are
|
||||
one-directional. Platform agents cannot push work to them — the remote agent
|
||||
must poll or be provisioned with a publicly reachable webhook endpoint.
|
||||
|
||||
```python
|
||||
from molecule_agent import RemoteAgentClient, A2AServer
|
||||
from molecule_agent.inbound import PushDelivery
|
||||
|
||||
server = A2AServer(agent_id=workspace_id, inbound_url="https://...", message_handler=my_handler)
|
||||
server.start_in_background()
|
||||
client = RemoteAgentClient(workspace_id=workspace_id, platform_url=...)
|
||||
client.reported_url = server.inbound_url # register with this URL
|
||||
client.register()
|
||||
# Pass PushDelivery so run_agent_loop doesn't also poll
|
||||
client.run_agent_loop(handler=my_handler, delivery=PushDelivery(client, server))
|
||||
```
|
||||
|
||||
**Poll mode (`PollDelivery`)** — for agents behind NAT or without a public endpoint,
|
||||
the SDK's `PollDelivery` polls `GET /workspaces/:id/activity` on a configurable
|
||||
interval (default 5s). Both paths feed the same `MessageHandler` callback.
|
||||
`run_agent_loop` picks `PollDelivery` automatically when no explicit delivery is passed.
|
||||
|
||||
### Files added
|
||||
- `molecule_agent/a2a_server.py` — `A2AServer` class; `HTTPServer` + `_A2AHandler`
|
||||
running in a daemon thread; handles `POST /a2a/inbound`, async/sync handlers,
|
||||
graceful stop.
|
||||
- `molecule_agent/inbound.py` — `InboundDelivery` protocol, `PollDelivery`,
|
||||
`PushDelivery` (wraps `A2AServer`), `InboundMessage`, `MessageHandler`.
|
||||
- `RemoteAgentClient.run_agent_loop` updated to accept any `InboundDelivery`.
|
||||
### Suggested fix
|
||||
Add an `A2AServerMixin` class that exposes a `FastAPI` or `flask` route
|
||||
(`POST /a2a/inbound`) and runs in a background thread alongside the client's
|
||||
heartbeat loop. Register the inbound URL with the platform via the
|
||||
`/registry/discover` update endpoint when the server starts. See Phase 30.8b
|
||||
in the platform `PLAN.md`.
|
||||
|
||||
---
|
||||
|
||||
@ -253,41 +237,3 @@ def _is_hex(value: str) -> bool:
|
||||
`tests/conftest.py` exists with the `_CaptureHandler` stub definition.
|
||||
`pytest tests/test_call_peer_errors.py` runs all 12 tests cleanly.
|
||||
`pytest tests/` collects all test files with no collection errors.
|
||||
|
||||
---
|
||||
|
||||
## KI-009 — `run_heartbeat_loop()` does not honour external stop signals
|
||||
|
||||
**File:** `molecule_agent/client.py` (`RemoteAgentClient.run_heartbeat_loop`,
|
||||
`RemoteAgentClient.run_agent_loop`)
|
||||
**Status:** ✅ Resolved (PR: `feat/ki-009-stop-event`)
|
||||
**Severity:** Low
|
||||
|
||||
### Resolution
|
||||
Added `stop_event: threading.Event | None = None` parameter to both
|
||||
`run_heartbeat_loop()` and `run_agent_loop()`. When set, the event is checked
|
||||
at the start of each loop iteration (before `max_iterations`). When the event
|
||||
is set, the loop exits immediately with return value `"stopped"`. The check
|
||||
is ordered before `max_iterations` so a signal always wins.
|
||||
|
||||
Callers achieve graceful shutdown by setting the event from a SIGTERM handler:
|
||||
|
||||
```python
|
||||
import signal, threading
|
||||
from molecule_agent import RemoteAgentClient
|
||||
|
||||
stop = threading.Event()
|
||||
client = RemoteAgentClient(...)
|
||||
|
||||
def sigterm_handler(signum, frame):
|
||||
stop.set()
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm_handler)
|
||||
terminal = client.run_heartbeat_loop(stop_event=stop)
|
||||
# terminal == "stopped" when killed cleanly
|
||||
```
|
||||
|
||||
Tests added: `test_run_loop_exits_on_stop_event`,
|
||||
`test_run_loop_respects_stop_event_between_iterations` in
|
||||
`tests/test_remote_agent.py`; `test_run_agent_loop_exits_on_stop_event`
|
||||
in `tests/test_inbound.py`.
|
||||
|
||||
@ -10,22 +10,6 @@ This is the client side of [Phase 30](../../../PLAN.md). The platform side
|
||||
ships in the same release; this package is just the SDK an agent author
|
||||
imports.
|
||||
|
||||
## What this is / what this isn't
|
||||
|
||||
| | `molecule_agent` (this package) | `molecule-ai-workspace-runtime` (separate PyPI wheel) |
|
||||
|---|---|---|
|
||||
| **Where it runs** | OUTSIDE Molecule workspaces — your laptop, CI runner, external cloud VM, sidecar service | INSIDE the workspace container, started by the platform |
|
||||
| **What it talks to** | The platform's HTTP API (`/registry/*`, `/workspaces/:id/*`) | The platform's MCP server (`molecule_*` tools) plus the platform-managed A2A bus |
|
||||
| **What it exposes** | `RemoteAgentClient`, `A2AServer`, `PollDelivery`, `MessageHandler` | `BaseAdapter`, `a2a_tools`, runtime capabilities, smoke-contract hooks |
|
||||
| **Who installs it** | You, the external-agent author, via `pip install molecule-sdk` | The platform, baked into the workspace template image at provision time |
|
||||
| **Auth model** | Bearer token minted by `POST /registry/register`, cached at `~/.molecule/<id>/.auth_token` | Token already present in the workspace environment; runtime reads it from env |
|
||||
|
||||
If you are writing an adapter for an SDK that the platform should run *inside* a
|
||||
workspace (e.g. langchain, crewai, hermes), you want
|
||||
[`molecule-ai-workspace-runtime`](https://pypi.org/project/molecule-ai-workspace-runtime/),
|
||||
not this package. See <https://doc.moleculesai.app/docs/runtime-mcp> for the
|
||||
in-workspace-runtime authoring guide.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
@ -72,7 +56,7 @@ A runnable demo with full setup walkthrough lives at
|
||||
| `poll_state()` | 30.4 | Lightweight `{status, paused, deleted}` poll |
|
||||
| `heartbeat(...)` | 30.1 | Single bearer-authed heartbeat |
|
||||
| `get_peers()` / `discover_peer()` | 30.6 | Sibling URL discovery with TTL cache |
|
||||
| `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback; response may be wrapped in OFFSEC-003 boundary markers — use ``strip_a2a_boundary()`` to remove them |
|
||||
| `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback |
|
||||
| `fetch_inbound(since_id=…)` | 30.8c | One-shot poll of `/workspaces/:id/activity` for inbound A2A |
|
||||
| `reply(msg, text)` | 30.8c | Smart-routes reply to `/notify` (canvas user) or `/a2a` (peer) |
|
||||
| `run_heartbeat_loop()` | combo | Drives heartbeat + state-poll on a timer; exits on pause/delete |
|
||||
@ -103,89 +87,6 @@ client.run_agent_loop(my_handler) # default: PollDelivery
|
||||
|
||||
The reply transport (`/notify` for canvas users, `/a2a` for peer agents) is hidden — `client.reply(msg, text)` picks based on `msg.source`. Async handlers work too; `PollDelivery` detects awaitable returns and `asyncio.run`s them.
|
||||
|
||||
### `InboundMessage` shape
|
||||
|
||||
`InboundMessage` is what `MessageHandler` receives. The typed fields the SDK
|
||||
parses today:
|
||||
|
||||
| Field | Type | What it is |
|
||||
|---|---|---|
|
||||
| `activity_id` | `str` | Cursor — the `activity_logs.id` row this event came from. Pass to `fetch_inbound(since_id=…)` to skip past it on the next poll. |
|
||||
| `source` | `Literal["canvas_user", "peer_agent", "unknown"]` | Normalized sender kind. `"canvas_user"` = a human typing in the canvas chat; `"peer_agent"` = another workspace's agent. `"unknown"` if the row's source is unrecognized — `reply()` will refuse to guess. |
|
||||
| `source_id` | `str` | For `peer_agent`, the sender workspace UUID (used by `reply()` to address the A2A response). Empty for `canvas_user`. |
|
||||
| `text` | `str` | The message body. Pulled from `data.text` then `data.message` in the underlying activity row. **Treat as untrusted user content** — same threat model as any chat input. |
|
||||
| `raw` | `dict` | The full raw activity-log row. Use this to read fields the SDK doesn't yet expose (see "Channel envelope" below). |
|
||||
|
||||
### Channel envelope (wire format)
|
||||
|
||||
The platform delivers each inbound A2A event as an `activity_logs` row. As of
|
||||
**2026-05-02** (CP push envelope, see <https://doc.moleculesai.app/docs/runtime-mcp>),
|
||||
the envelope's `data` block carries:
|
||||
|
||||
```jsonc
|
||||
{
|
||||
"id": "<activity-uuid>", // == InboundMessage.activity_id
|
||||
"type": "a2a_receive",
|
||||
"source_id": "<sender-workspace-uuid>", // peer_agent only; empty for canvas_user
|
||||
"ts": "2026-05-02T10:15:30Z", // RFC3339 — when the platform queued the event
|
||||
"data": {
|
||||
"source": "peer_agent", // "canvas_user" | "peer_agent"
|
||||
"kind": "peer_agent", // mirrors the channel-tag attr
|
||||
"text": "<message body>", // (or "message")
|
||||
"peer_id": "<sender-workspace-uuid>", // duplicate of source_id, peer_agent only
|
||||
"activity_id": "<activity-uuid>", // duplicate of top-level id
|
||||
|
||||
// === enrichment fields added 2026-05-02 (CP PRs #2472, #2476) ===
|
||||
"peer_name": "ops-agent", // peer's display name (registry-resolved); may be absent if the registry lookup failed
|
||||
"peer_role": "sre", // peer's declared role; same registry source
|
||||
"agent_card_url": "https://<platform>/registry/discover/<peer_id>" // deterministic URL for the platform's discover endpoint for this peer
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**SDK status of the enrichment fields:** `InboundMessage` does not yet
|
||||
surface `peer_name`, `peer_role`, or `agent_card_url` as typed attributes.
|
||||
Read them from `msg.raw["data"]` until a typed wrapper lands (see
|
||||
"Limitations & roadmap" below). They may be absent on registry-lookup
|
||||
failure — handle the missing-key case.
|
||||
|
||||
### A2A reply transport — what `reply()` actually does
|
||||
|
||||
`client.reply(msg, text)` dispatches based on `msg.source`. The transport is
|
||||
chosen for you so handler code doesn't need to branch:
|
||||
|
||||
| `msg.source` | HTTP call `reply()` makes | Server-side effect |
|
||||
|---|---|---|
|
||||
| `canvas_user` | `POST /workspaces/<self>/notify` with `{"message": text}` | Canvas WebSocket pushes the text to the user's chat |
|
||||
| `peer_agent` | `POST /workspaces/<msg.source_id>/a2a` with a JSON-RPC `message/send` envelope; sets `X-Source-Workspace-Id: <self>` and `X-Workspace-ID: <self>` | Platform routes the JSON-RPC message to the peer workspace's inbound A2A endpoint |
|
||||
| `unknown` | Raises `ValueError` | The SDK refuses to guess. Inspect `msg.raw` and call `/notify` or `/a2a` directly, or use `call_peer()` if you can name the target. |
|
||||
|
||||
`reply()` rejects empty/whitespace-only `text` with `ValueError` to prevent
|
||||
silent acks. On non-2xx the underlying `requests.HTTPError` propagates so the
|
||||
handler can decide whether to retry, surface to its observability, or fail
|
||||
loudly.
|
||||
|
||||
### OFFSEC-003 — A2A peer response trust boundary
|
||||
|
||||
As of the OFFSEC-003 platform rollout, peer A2A responses are wrapped in
|
||||
trust-boundary markers before being returned to callers::
|
||||
|
||||
[A2A_RESULT_FROM_PEER]<peer response text>[/A2A_RESULT_FROM_PEER]
|
||||
|
||||
The markers signal that the enclosed content is untrusted third-party output.
|
||||
Use ``strip_a2a_boundary()`` to remove them before passing the response to
|
||||
your agent context::
|
||||
|
||||
from molecule_agent import RemoteAgentClient, strip_a2a_boundary
|
||||
|
||||
result = client.call_peer(target_id, "do the thing")
|
||||
raw_text = result.get("result", {}).get("text", "")
|
||||
trusted_text = strip_a2a_boundary(raw_text)
|
||||
|
||||
The function returns the input unchanged if the markers are absent (platform
|
||||
versions older than the OFFSEC-003 rollout), so it is safe to call on any
|
||||
response.
|
||||
|
||||
## CLI: `molecule_agent connect`
|
||||
|
||||
One command bootstraps the full poll-mode loop. No code beyond your handler:
|
||||
@ -209,55 +110,14 @@ def echo(msg, client):
|
||||
|
||||
All flags also read from environment variables (`MOLECULE_PLATFORM_URL`, `MOLECULE_WORKSPACE_ID`, `MOLECULE_WORKSPACE_TOKEN`, `MOLECULE_POLL_INTERVAL`, `MOLECULE_CURSOR_FILE`). SIGTERM/SIGINT shut the loop down cleanly.
|
||||
|
||||
## What it doesn't do (yet) — Limitations & roadmap
|
||||
|
||||
These are server-supported features that the SDK has not yet wrapped, plus
|
||||
known protocol gaps. Each entry is named so a follow-up issue / PR can
|
||||
reference it directly.
|
||||
## What it doesn't do (yet)
|
||||
|
||||
- **No long-poll.** Activity polling is fixed-cadence (default 5s). Server-side long-poll support would cut p50 inbound latency to ~0; tracked separately.
|
||||
|
||||
- **No automatic reconnect after token loss.** If `~/.molecule/<id>/.auth_token`
|
||||
is deleted, you'll need to re-issue the token via the platform admin (since
|
||||
`POST /registry/register` is idempotent — it won't mint a second token for
|
||||
a workspace that already has one).
|
||||
|
||||
- **`fetch_inbound()` `peer_id` and `before_ts` filters (resolved).**
|
||||
`RemoteAgentClient.fetch_inbound()` now accepts `peer_id` (narrow to events
|
||||
from a specific peer workspace) and `before_ts` (RFC3339 cutoff for backlog
|
||||
replay) as optional parameters. Both are forwarded to the activity endpoint.
|
||||
|
||||
- **`InboundMessage` peer enrichment fields (resolved).**
|
||||
`InboundMessage` now exposes `peer_name`, `peer_role`, and `agent_card_url`
|
||||
as typed string attributes (default ``""`` when absent). The `raw` dict
|
||||
remains available for any future envelope fields not yet wrapped.
|
||||
|
||||
- **Tenant + Origin headers are not auto-injected.** When the platform is
|
||||
deployed multi-tenant on the SaaS edge (`*.staging.moleculesai.app`,
|
||||
`*.moleculesai.app`), the WAF requires:
|
||||
- `X-Molecule-Org-Id: <org-uuid>` — TenantGuard middleware uses this to
|
||||
pin the request to the right tenant; missing-header requests 404
|
||||
- `Origin: <PLATFORM_URL>` — `/workspaces/*` and `/registry/*/peers`
|
||||
silently rewrite to Next.js without it (returns an empty 404, easy
|
||||
to misdiagnose as auth)
|
||||
A follow-up PR will accept `org_id` and `origin` constructor kwargs and
|
||||
inject the headers automatically.
|
||||
|
||||
- **Tenant + Origin headers (resolved).**
|
||||
`RemoteAgentClient` now accepts `org_id` and `origin` constructor kwargs and
|
||||
injects them automatically on every request:
|
||||
|
||||
```python
|
||||
from molecule_agent import RemoteAgentClient
|
||||
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="…",
|
||||
platform_url="https://<your-tenant>.moleculesai.app",
|
||||
org_id="<your-org-uuid>", # sets X-Molecule-Org-Id
|
||||
origin="https://<your-tenant>.moleculesai.app", # sets Origin
|
||||
)
|
||||
```
|
||||
|
||||
## Design choices
|
||||
|
||||
- **Blocking (`requests`), not async.** Drops into any runtime — script,
|
||||
|
||||
@ -39,7 +39,6 @@ from .client import (
|
||||
PeerInfo,
|
||||
RemoteAgentClient,
|
||||
WorkspaceState,
|
||||
strip_a2a_boundary,
|
||||
verify_plugin_sha256,
|
||||
)
|
||||
from .inbound import (
|
||||
@ -72,7 +71,6 @@ __all__ = [
|
||||
"DEFAULT_POLL_INTERVAL",
|
||||
"compute_plugin_sha256",
|
||||
"verify_plugin_sha256",
|
||||
"strip_a2a_boundary",
|
||||
"__version__",
|
||||
]
|
||||
__version__ = "0.1.0"
|
||||
|
||||
@ -27,7 +27,6 @@ import os
|
||||
import random
|
||||
import subprocess
|
||||
import tarfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
@ -91,43 +90,6 @@ def make_idempotency_key(task_text: str) -> str:
|
||||
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
# ── A2A boundary marker stripping (OFFSEC-003) ───────────────────────────────
|
||||
|
||||
_A2A_BOUNDARY_START = "[A2A_RESULT_FROM_PEER]"
|
||||
_A2A_BOUNDARY_END = "[/A2A_RESULT_FROM_PEER]"
|
||||
|
||||
|
||||
def strip_a2a_boundary(text: str) -> str:
|
||||
"""Strip OFFSEC-003 trust-boundary markers from a peer A2A response.
|
||||
|
||||
The platform wraps peer A2A responses in::
|
||||
|
||||
[A2A_RESULT_FROM_PEER]<content>[/A2A_RESULT_FROM_PEER]
|
||||
|
||||
to mark them as untrusted third-party content. Call this helper to
|
||||
remove the wrapper before passing the content to your agent context:
|
||||
|
||||
Usage::
|
||||
|
||||
result = client.call_peer(target_id, "do the thing")
|
||||
text = result.get("result", {}).get("text", "")
|
||||
content = strip_a2a_boundary(text)
|
||||
|
||||
Returns the interior content (everything between the two markers).
|
||||
Returns the input unchanged if the boundary markers are absent (the caller
|
||||
may be talking to a platform version older than the OFFSEC-003 rollout).
|
||||
Returns ``""`` for ``None`` or empty input.
|
||||
"""
|
||||
if not text:
|
||||
return ""
|
||||
start = text.find(_A2A_BOUNDARY_START)
|
||||
end = text.find(_A2A_BOUNDARY_END)
|
||||
if start != -1 and end != -1 and end > start:
|
||||
return text[start + len(_A2A_BOUNDARY_START):end].strip()
|
||||
return text
|
||||
|
||||
|
||||
|
||||
def _safe_extract_tar(tf: tarfile.TarFile, dest: Path) -> None:
|
||||
"""Extract a tarfile, refusing entries that would escape `dest`
|
||||
and logging skipped symlinks/hardlinks.
|
||||
@ -306,13 +268,6 @@ class RemoteAgentClient:
|
||||
0700 permissions if missing.
|
||||
heartbeat_interval: Seconds between heartbeats in the run loop.
|
||||
state_poll_interval: Seconds between state polls in the run loop.
|
||||
org_id: Optional tenant UUID for multi-tenant SaaS deployments
|
||||
(``*.moleculesai.app``). When set the client sends
|
||||
``X-Molecule-Org-Id: <org_id>`` on every request so the WAF
|
||||
can route to the correct tenant.
|
||||
origin: Optional origin string for multi-tenant SaaS deployments.
|
||||
When set the client sends ``Origin: <origin>`` on every
|
||||
request, preventing silent Next.js rewrites on SaaS edges.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -326,8 +281,6 @@ class RemoteAgentClient:
|
||||
state_poll_interval: float = DEFAULT_STATE_POLL_INTERVAL,
|
||||
url_cache_ttl: float = DEFAULT_URL_CACHE_TTL,
|
||||
session: requests.Session | None = None,
|
||||
org_id: str = "",
|
||||
origin: str = "",
|
||||
) -> None:
|
||||
self.workspace_id = workspace_id
|
||||
self.platform_url = platform_url.rstrip("/")
|
||||
@ -336,8 +289,6 @@ class RemoteAgentClient:
|
||||
self.heartbeat_interval = heartbeat_interval
|
||||
self.state_poll_interval = state_poll_interval
|
||||
self.url_cache_ttl = url_cache_ttl
|
||||
self._org_id = org_id
|
||||
self._origin = origin
|
||||
# Phase 30.6 — sibling URL cache keyed by workspace id. Values are
|
||||
# (url, expires_at_unix_seconds). Process-memory only; we re-fetch
|
||||
# on restart because agent lifetimes are short enough that
|
||||
@ -430,14 +381,9 @@ class RemoteAgentClient:
|
||||
|
||||
def _auth_headers(self) -> dict[str, str]:
|
||||
tok = self.load_token()
|
||||
headers: dict[str, str] = {}
|
||||
if tok:
|
||||
headers["Authorization"] = f"Bearer {tok}"
|
||||
if self._org_id:
|
||||
headers["X-Molecule-Org-Id"] = self._org_id
|
||||
if self._origin:
|
||||
headers["Origin"] = self._origin
|
||||
return headers
|
||||
if not tok:
|
||||
return {}
|
||||
return {"Authorization": f"Bearer {tok}"}
|
||||
|
||||
def _get_with_retry(
|
||||
self,
|
||||
@ -753,12 +699,10 @@ class RemoteAgentClient:
|
||||
since_id: str | None = None,
|
||||
limit: int = 100,
|
||||
type: str = "a2a_receive",
|
||||
peer_id: str | None = None,
|
||||
before_ts: str | None = None,
|
||||
) -> list["InboundMessage"]:
|
||||
"""Fetch one batch of inbound A2A activity rows.
|
||||
|
||||
Hits ``GET /workspaces/:id/activity?type=…&since_id=…&peer_id=…&before_ts=…&limit=…``.
|
||||
Hits ``GET /workspaces/:id/activity?type=…&since_id=…&limit=…``.
|
||||
Returns the rows newer than ``since_id`` in oldest-first order,
|
||||
parsed into :class:`~molecule_agent.inbound.InboundMessage`.
|
||||
|
||||
@ -774,13 +718,6 @@ class RemoteAgentClient:
|
||||
type: Activity-row type filter. Default ``"a2a_receive"``;
|
||||
pass another type to consume different streams (e.g.
|
||||
``"workspace_state_changed"``).
|
||||
peer_id: Narrow to events sourced from a specific peer workspace.
|
||||
Pass a workspace UUID to receive only A2A messages from that
|
||||
peer. ``None`` (default) accepts messages from any source.
|
||||
before_ts: RFC3339 timestamp — only rows older than this cutoff
|
||||
are returned. Use to fetch a historical backlog window (e.g.
|
||||
replaying missed messages after a restart). ``None`` means
|
||||
no upper-bound cutoff; the server returns the most recent rows.
|
||||
|
||||
Returns:
|
||||
List of :class:`InboundMessage`, oldest first. May be empty.
|
||||
@ -798,10 +735,6 @@ class RemoteAgentClient:
|
||||
params: dict[str, str] = {"type": type, "limit": str(int(limit))}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
if peer_id:
|
||||
params["peer_id"] = peer_id
|
||||
if before_ts:
|
||||
params["before_ts"] = before_ts
|
||||
url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity"
|
||||
resp = self._session.get(
|
||||
url,
|
||||
@ -922,7 +855,6 @@ class RemoteAgentClient:
|
||||
delivery: "InboundDelivery | None" = None,
|
||||
max_iterations: int | None = None,
|
||||
task_supplier: "callable | None" = None,
|
||||
stop_event: threading.Event | None = None,
|
||||
) -> str:
|
||||
"""Combined heartbeat + state-poll + inbound-delivery loop.
|
||||
|
||||
@ -948,14 +880,10 @@ class RemoteAgentClient:
|
||||
task_supplier: Optional zero-arg callable returning a dict
|
||||
``{"current_task": str, "active_tasks": int}`` reported on
|
||||
each heartbeat (same contract as :py:meth:`run_heartbeat_loop`).
|
||||
stop_event: Optional :py:class:`threading.Event` that, when set,
|
||||
causes the loop to exit cleanly with return value ``"stopped"``.
|
||||
Call ``stop_event.set()`` from a SIGTERM handler to achieve
|
||||
graceful shutdown. Ignored when ``None``.
|
||||
|
||||
Returns:
|
||||
The terminal status: ``"paused"``, ``"removed"``,
|
||||
``"max_iterations"``, or ``"stopped"``.
|
||||
The terminal status: ``"paused"``, ``"removed"``, or
|
||||
``"max_iterations"``.
|
||||
|
||||
Errors from the activity poll, heartbeat, or state poll are
|
||||
logged and the loop continues — a transient platform hiccup
|
||||
@ -970,8 +898,6 @@ class RemoteAgentClient:
|
||||
i = 0
|
||||
try:
|
||||
while True:
|
||||
if stop_event is not None and stop_event.is_set():
|
||||
return "stopped"
|
||||
if max_iterations is not None and i >= max_iterations:
|
||||
return "max_iterations"
|
||||
i += 1
|
||||
@ -1232,11 +1158,10 @@ class RemoteAgentClient:
|
||||
self,
|
||||
max_iterations: int | None = None,
|
||||
task_supplier: "callable | None" = None,
|
||||
stop_event: threading.Event | None = None,
|
||||
) -> str:
|
||||
"""Drive heartbeat + state-poll on a timer. Returns the terminal
|
||||
status when the loop exits (``"paused"``, ``"removed"``,
|
||||
``"max_iterations"``, or ``"stopped"``).
|
||||
status when the loop exits (``"paused"``, ``"removed"``, or
|
||||
``"max_iterations"``).
|
||||
|
||||
Args:
|
||||
max_iterations: Stop after N loop iterations. None = run until
|
||||
@ -1245,10 +1170,6 @@ class RemoteAgentClient:
|
||||
task_supplier: Optional zero-arg callable returning a dict
|
||||
``{"current_task": str, "active_tasks": int}`` fetched
|
||||
each iteration. Lets the agent report what it's doing.
|
||||
stop_event: Optional :py:class:`threading.Event` that, when set,
|
||||
causes the loop to exit cleanly with return value ``"stopped"``.
|
||||
Call ``stop_event.set()`` from a SIGTERM handler to achieve
|
||||
graceful shutdown. Ignored when ``None``.
|
||||
|
||||
The loop sends one heartbeat + one state poll per iteration; the
|
||||
next iteration sleeps for ``heartbeat_interval`` seconds. Errors
|
||||
@ -1258,8 +1179,6 @@ class RemoteAgentClient:
|
||||
"""
|
||||
i = 0
|
||||
while True:
|
||||
if stop_event is not None and stop_event.is_set():
|
||||
return "stopped"
|
||||
if max_iterations is not None and i >= max_iterations:
|
||||
return "max_iterations"
|
||||
i += 1
|
||||
|
||||
@ -67,12 +67,6 @@ class InboundMessage:
|
||||
* ``unknown`` — the activity row didn't carry a recognizable source.
|
||||
:py:meth:`RemoteAgentClient.reply` raises ``ValueError`` rather than
|
||||
guess.
|
||||
|
||||
``peer_name``, ``peer_role``, and ``agent_card_url`` are enrichment fields
|
||||
added to the channel envelope on 2026-05-02 (CP PRs #2472, #2476). They
|
||||
are resolved from the platform's registry at delivery time and may be absent
|
||||
if the registry lookup failed or the sender hasn't registered yet. Always
|
||||
check for absence rather than assuming they are populated.
|
||||
"""
|
||||
|
||||
activity_id: str
|
||||
@ -80,12 +74,6 @@ class InboundMessage:
|
||||
source_id: str
|
||||
text: str
|
||||
raw: dict[str, Any] = field(default_factory=dict)
|
||||
# Enrichment fields — populated from row["data"]["peer_name"],
|
||||
# row["data"]["peer_role"], row["data"]["agent_card_url"].
|
||||
# May be empty strings if the registry lookup failed.
|
||||
peer_name: str = ""
|
||||
peer_role: str = ""
|
||||
agent_card_url: str = ""
|
||||
|
||||
|
||||
class CursorLostError(Exception):
|
||||
@ -146,9 +134,6 @@ def _parse_activity_row(row: dict[str, Any]) -> InboundMessage | None:
|
||||
source_id=source_id,
|
||||
text=text,
|
||||
raw=row,
|
||||
peer_name=str(data.get("peer_name") or ""),
|
||||
peer_role=str(data.get("peer_role") or ""),
|
||||
agent_card_url=str(data.get("agent_card_url") or ""),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -153,83 +153,6 @@ def test_parse_activity_row_text_alt_key():
|
||||
assert msg.text == "alt"
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields():
|
||||
"""peer_name, peer_role, agent_card_url are extracted from row["data"]."""
|
||||
row = {
|
||||
"id": "act-enriched",
|
||||
"source_id": "peer-ops-01",
|
||||
"data": {
|
||||
"source": "peer_agent",
|
||||
"text": "status report",
|
||||
"peer_name": "ops-agent",
|
||||
"peer_role": "sre",
|
||||
"agent_card_url": "https://platform.example/registry/discover/peer-ops-01",
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == "ops-agent"
|
||||
assert msg.peer_role == "sre"
|
||||
assert msg.agent_card_url == "https://platform.example/registry/discover/peer-ops-01"
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields_absent():
|
||||
"""When enrichment fields are absent, InboundMessage fields default to ""."""
|
||||
row = {
|
||||
"id": "act-no-enrich",
|
||||
"source_id": "peer-x",
|
||||
"data": {"source": "peer_agent", "text": "hello"},
|
||||
# no peer_name, peer_role, agent_card_url
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == ""
|
||||
assert msg.peer_role == ""
|
||||
assert msg.agent_card_url == ""
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields_null_becomes_empty():
|
||||
"""null values in enrichment fields become "" (not the string "None")."""
|
||||
row = {
|
||||
"id": "act-null-enrich",
|
||||
"source_id": "peer-y",
|
||||
"data": {
|
||||
"source": "peer_agent",
|
||||
"text": "ping",
|
||||
"peer_name": None,
|
||||
"peer_role": None,
|
||||
"agent_card_url": None,
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == ""
|
||||
assert msg.peer_role == ""
|
||||
assert msg.agent_card_url == ""
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_in_canvas_user_row():
|
||||
"""Enrichment fields are parsed even when source is canvas_user (edge case
|
||||
where the platform enriches the row even for user-sourced messages)."""
|
||||
row = {
|
||||
"id": "act-user-enrich",
|
||||
"source_id": "user",
|
||||
"data": {
|
||||
"source": "canvas_user",
|
||||
"text": "hi",
|
||||
"peer_name": "someone",
|
||||
"peer_role": "human",
|
||||
"agent_card_url": "https://platform.example/registry/discover/user-uuid",
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.source == "canvas_user"
|
||||
assert msg.peer_name == "someone"
|
||||
assert msg.peer_role == "human"
|
||||
assert msg.agent_card_url == "https://platform.example/registry/discover/user-uuid"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# fetch_inbound
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -327,61 +250,6 @@ def test_fetch_inbound_429_retries_via_get_with_retry(
|
||||
assert client._session.get.call_count == 2
|
||||
|
||||
|
||||
def test_fetch_inbound_peer_id_filter():
|
||||
"""peer_id param is forwarded to the GET as a query parameter."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-peer", "source_id": "peer-ops", "data": {"source": "peer_agent", "text": "hi"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(peer_id="peer-ops")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["peer_id"] == "peer-ops"
|
||||
|
||||
|
||||
def test_fetch_inbound_before_ts_filter():
|
||||
"""before_ts param is forwarded to the GET as a query parameter."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-old", "data": {"source": "canvas_user", "text": "hi"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(before_ts="2026-05-01T00:00:00Z")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["before_ts"] == "2026-05-01T00:00:00Z"
|
||||
|
||||
|
||||
def test_fetch_inbound_combined_filters():
|
||||
"""peer_id and before_ts can be used together."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-combo", "source_id": "peer-x", "data": {"source": "peer_agent", "text": "combo"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(peer_id="peer-x", before_ts="2026-05-09T12:00:00Z")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["peer_id"] == "peer-x"
|
||||
assert params["before_ts"] == "2026-05-09T12:00:00Z"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# reply()
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -770,29 +638,3 @@ def test_run_agent_loop_swallows_task_supplier_exception(
|
||||
hb_kwargs = client.heartbeat.call_args.kwargs
|
||||
assert hb_kwargs["current_task"] == ""
|
||||
assert hb_kwargs["active_tasks"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_agent_loop — stop_event (KI-009)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_agent_loop_exits_on_stop_event(client: RemoteAgentClient, monkeypatch):
|
||||
"""stop_event.set() before calling the loop causes immediate 'stopped' exit."""
|
||||
import threading
|
||||
import molecule_agent.client as mod
|
||||
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
|
||||
|
||||
client.save_token("t")
|
||||
client.heartbeat = MagicMock() # avoid actual HTTP calls
|
||||
client.poll_state = MagicMock(return_value=None)
|
||||
|
||||
stop = threading.Event()
|
||||
stop.set() # signal stop BEFORE entering the loop
|
||||
terminal = client.run_agent_loop(
|
||||
lambda *_: None, max_iterations=999, stop_event=stop
|
||||
)
|
||||
|
||||
assert terminal == "stopped"
|
||||
# No heartbeat attempted — stop_event fired before the first iteration
|
||||
assert client.heartbeat.call_count == 0
|
||||
|
||||
@ -334,66 +334,6 @@ def test_run_loop_task_supplier_reported(client: RemoteAgentClient, monkeypatch)
|
||||
assert body["active_tasks"] == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_heartbeat_loop — stop_event (KI-009)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_loop_exits_on_stop_event(client: RemoteAgentClient, monkeypatch):
|
||||
"""stop_event.set() before calling the loop causes immediate 'stopped' exit,
|
||||
before the first heartbeat is sent."""
|
||||
import threading
|
||||
import molecule_agent.client as mod
|
||||
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
|
||||
|
||||
client.save_token("t")
|
||||
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
|
||||
client._session.get.return_value = FakeResponse(
|
||||
200, {"status": "online", "paused": False, "deleted": False}
|
||||
)
|
||||
|
||||
stop = threading.Event()
|
||||
stop.set() # signal stop BEFORE entering the loop
|
||||
terminal = client.run_heartbeat_loop(max_iterations=999, stop_event=stop)
|
||||
|
||||
assert terminal == "stopped"
|
||||
# Zero heartbeats sent — stop_event fired before the first iteration body
|
||||
assert client._session.post.call_count == 0
|
||||
|
||||
|
||||
def test_run_loop_respects_stop_event_between_iterations(
|
||||
client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""stop_event.set() mid-run causes exit after the current iteration finishes."""
|
||||
import threading
|
||||
import molecule_agent.client as mod
|
||||
|
||||
# Don't stub sleep — we need the event to fire *between* iterations
|
||||
call_count = [0]
|
||||
|
||||
def fake_sleep(s):
|
||||
call_count[0] += 1
|
||||
if call_count[0] == 2:
|
||||
stop.set() # signal stop after the second iteration
|
||||
# otherwise no-op so the test doesn't wait
|
||||
|
||||
monkeypatch.setattr(mod.time, "sleep", fake_sleep)
|
||||
|
||||
client.save_token("t")
|
||||
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
|
||||
client._session.get.return_value = FakeResponse(
|
||||
200, {"status": "online", "paused": False, "deleted": False}
|
||||
)
|
||||
|
||||
stop = threading.Event()
|
||||
terminal = client.run_heartbeat_loop(max_iterations=999, stop_event=stop)
|
||||
|
||||
assert terminal == "stopped"
|
||||
# Two full iterations completed before stop was honoured
|
||||
assert client._session.post.call_count == 2
|
||||
assert client._session.get.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WorkspaceState dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -769,7 +709,7 @@ def test_install_plugin_404_raises_with_useful_url(client: RemoteAgentClient):
|
||||
|
||||
import hashlib
|
||||
|
||||
from molecule_agent.client import make_idempotency_key, strip_a2a_boundary
|
||||
from molecule_agent.client import make_idempotency_key
|
||||
|
||||
|
||||
def test_delegate_posts_task_and_idempotency_key(client: RemoteAgentClient):
|
||||
@ -810,78 +750,6 @@ def test_delegate_sends_bearer_and_workspace_headers(client: RemoteAgentClient):
|
||||
assert kwargs["headers"]["X-Workspace-ID"] == "ws-abc-123"
|
||||
|
||||
|
||||
def test_auth_headers_injects_org_id_and_origin():
|
||||
"""org_id and origin kwargs are injected into every request headers."""
|
||||
session = MagicMock()
|
||||
session.post.return_value = FakeResponse(200, {})
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-test",
|
||||
platform_url="https://platform.example.com",
|
||||
org_id="org-uuid-123",
|
||||
origin="https://tenant.moleculesai.app",
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.delegate(task="x", target_id="peer")
|
||||
hdrs = session.post.call_args[1]["headers"]
|
||||
assert hdrs["Authorization"] == "Bearer tok"
|
||||
assert hdrs["X-Molecule-Org-Id"] == "org-uuid-123"
|
||||
assert hdrs["Origin"] == "https://tenant.moleculesai.app"
|
||||
|
||||
|
||||
def test_auth_headers_org_id_only():
|
||||
"""origin can be omitted when only org_id is needed."""
|
||||
session = MagicMock()
|
||||
session.post.return_value = FakeResponse(200, {})
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-test",
|
||||
platform_url="https://platform.example.com",
|
||||
org_id="org-uuid-456",
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.poll_state()
|
||||
hdrs = session.get.call_args[1]["headers"]
|
||||
assert hdrs["Authorization"] == "Bearer tok"
|
||||
assert hdrs["X-Molecule-Org-Id"] == "org-uuid-456"
|
||||
assert "Origin" not in hdrs
|
||||
|
||||
|
||||
def test_auth_headers_origin_only():
|
||||
"""org_id can be omitted when only origin is needed."""
|
||||
session = MagicMock()
|
||||
session.post.return_value = FakeResponse(200, {})
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-test",
|
||||
platform_url="https://platform.example.com",
|
||||
origin="https://other-tenant.moleculesai.app",
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.poll_state()
|
||||
hdrs = session.get.call_args[1]["headers"]
|
||||
assert hdrs["Authorization"] == "Bearer tok"
|
||||
assert "X-Molecule-Org-Id" not in hdrs
|
||||
assert hdrs["Origin"] == "https://other-tenant.moleculesai.app"
|
||||
|
||||
|
||||
def test_auth_headers_no_extra_when_unset():
|
||||
"""When neither org_id nor origin is set, headers contain only auth."""
|
||||
session = MagicMock()
|
||||
session.post.return_value = FakeResponse(200, {})
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-test",
|
||||
platform_url="https://platform.example.com",
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.poll_state()
|
||||
hdrs = session.get.call_args[1]["headers"]
|
||||
assert hdrs["Authorization"] == "Bearer tok"
|
||||
assert "X-Molecule-Org-Id" not in hdrs
|
||||
assert "Origin" not in hdrs
|
||||
|
||||
|
||||
def test_delegate_raises_on_http_error(client: RemoteAgentClient):
|
||||
client.save_token("tok")
|
||||
client._session.post.return_value = FakeResponse(500, {"error": "boom"})
|
||||
@ -943,55 +811,6 @@ def test_make_idempotency_key_deterministic():
|
||||
assert a == b
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# strip_a2a_boundary — OFFSEC-003 trust-boundary marker stripping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_basic():
|
||||
"""Interior text between the two markers is returned."""
|
||||
wrapped = "[A2A_RESULT_FROM_PEER]hello world[/A2A_RESULT_FROM_PEER]"
|
||||
assert strip_a2a_boundary(wrapped) == "hello world"
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_strips_whitespace_edges():
|
||||
"""Trailing/leading whitespace inside the boundary is stripped."""
|
||||
wrapped = "[A2A_RESULT_FROM_PEER] peer reply [/A2A_RESULT_FROM_PEER]"
|
||||
assert strip_a2a_boundary(wrapped) == "peer reply"
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_no_markers_returns_unchanged():
|
||||
"""Without both markers present the input passes through unchanged."""
|
||||
assert strip_a2a_boundary("plain text with no markers") == "plain text with no markers"
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_only_start_returns_unchanged():
|
||||
"""Only a start marker — no-op to stay safe during mid-rollout."""
|
||||
assert strip_a2a_boundary("[A2A_RESULT_FROM_PEER]unclosed") == "[A2A_RESULT_FROM_PEER]unclosed"
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_only_end_returns_unchanged():
|
||||
"""Only an end marker — no-op."""
|
||||
assert strip_a2a_boundary("[/A2A_RESULT_FROM_PEER]no start") == "[/A2A_RESULT_FROM_PEER]no start"
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_empty_returns_empty():
|
||||
assert strip_a2a_boundary("") == ""
|
||||
assert strip_a2a_boundary(None) == "" # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_end_before_start_returns_unchanged():
|
||||
"""If end marker appears before start, treat as no-op."""
|
||||
text = "[/A2A_RESULT_FROM_PEER]X[A2A_RESULT_FROM_PEER]"
|
||||
assert strip_a2a_boundary(text) == text
|
||||
|
||||
|
||||
def test_strip_a2a_boundary_multiline_content():
|
||||
"""Multiline interior content is preserved (stripped at edges only)."""
|
||||
wrapped = "[A2A_RESULT_FROM_PEER]\n step one\n step two\n[/A2A_RESULT_FROM_PEER]"
|
||||
assert strip_a2a_boundary(wrapped) == "step one\n step two"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _safe_extract_tar
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user