diff --git a/.gitignore b/.gitignore index 2af45b5..8a0fcf1 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,12 @@ # Workspace auth tokens .auth-token .auth_token + +# Python build artifacts — never commit; regenerated by interpreter / pip. +__pycache__/ +*.py[cod] +*.egg-info/ +.pytest_cache/ +.ruff_cache/ +build/ +dist/ diff --git a/CLAUDE.md b/CLAUDE.md index 7736e31..a6d2665 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -139,6 +139,8 @@ unless noted): | `POST` | `/workspaces/:id/delegate` | 30.6 | bearer + X-Workspace-ID, 300s timeout | | `GET` | `/workspaces/:id/plugins/:name/download` | 30.3 | bearer | | `POST` | `/workspaces/:id/plugins` | 30.3 | bearer | +| `GET` | `/workspaces/:id/activity?type=a2a_receive&since_id=…` | 30.8c | bearer (poll-mode inbound) | +| `POST` | `/workspaces/:id/notify` | 30.8c | bearer (canvas-user reply) | **Token** is cached at `~/.molecule//.auth_token` with `0600` permissions. On restart the client reuses the cached token — the platform @@ -201,9 +203,20 @@ python -m molecule_agent verify-sha256 ./my-plugin-dir first**. Do not patch silently — the SDK is consumed across multiple runtime environments and silent patches can cause subtle breakage elsewhere. -- `molecule_agent` does not yet bundle an inbound A2A server helper. - Platform-initiated calls to a remote agent without a publicly reachable - endpoint will not succeed. See Phase 30.8b in the platform's `PLAN.md`. +- `molecule_agent` ships two inbound delivery paths: **push** (Phase 30.8b, + `A2AServer` — for agents with a publicly reachable URL) and **poll** (Phase + 30.8c, `PollDelivery` — for agents behind NAT or without a public endpoint, + the typical case for hermes-self-hosted, codex, and similar OSS runtimes). + Both feed the same `MessageHandler` callback through + `RemoteAgentClient.run_agent_loop(handler)`. The reply transport + (`/notify` for canvas users vs `/a2a` for peer agents) is hidden behind + `client.reply(msg, text)`. + +- One-line bootstrap for poll-mode agents: + `python -m molecule_agent connect --platform-url … --workspace-id … --token … --handler my_module:fn`. + Picks `PollDelivery` automatically when `--reported-url` is empty; SIGTERM/SIGINT + shut the loop down cleanly. Cursor optionally persisted to `--cursor-file` so + restarts resume from the last-seen activity row. --- diff --git a/molecule_agent/README.md b/molecule_agent/README.md index 34c89f0..8a38367 100644 --- a/molecule_agent/README.md +++ b/molecule_agent/README.md @@ -57,13 +57,62 @@ A runnable demo with full setup walkthrough lives at | `heartbeat(...)` | 30.1 | Single bearer-authed heartbeat | | `get_peers()` / `discover_peer()` | 30.6 | Sibling URL discovery with TTL cache | | `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback | +| `fetch_inbound(since_id=…)` | 30.8c | One-shot poll of `/workspaces/:id/activity` for inbound A2A | +| `reply(msg, text)` | 30.8c | Smart-routes reply to `/notify` (canvas user) or `/a2a` (peer) | | `run_heartbeat_loop()` | combo | Drives heartbeat + state-poll on a timer; exits on pause/delete | +| `run_agent_loop(handler)` | combo | Heartbeat + state + **inbound dispatch**; exits on pause/delete | + +## Inbound delivery — push vs poll + +Two ways an external agent can receive A2A messages: + +| Path | When to use | Class | +|---|---|---| +| **Push** | Your agent has a publicly reachable URL (cloud VM, ngrok tunnel) | `A2AServer` (Phase 30.8b) | +| **Poll** | Your agent is behind NAT, on a laptop, or in a CI runner with no public URL | `PollDelivery` (Phase 30.8c) | + +Both dispatch to the same `MessageHandler` callback through `run_agent_loop`: + +```python +from molecule_agent import RemoteAgentClient, InboundMessage + +def my_handler(msg: InboundMessage, client: RemoteAgentClient) -> str | None: + print(f"← {msg.source}: {msg.text}") + return f"echo: {msg.text}" # auto-routed via /notify or /a2a + +client = RemoteAgentClient(workspace_id="…", platform_url="…") +client.register() +client.run_agent_loop(my_handler) # default: PollDelivery +``` + +The reply transport (`/notify` for canvas users, `/a2a` for peer agents) is hidden — `client.reply(msg, text)` picks based on `msg.source`. Async handlers work too; `PollDelivery` detects awaitable returns and `asyncio.run`s them. + +## CLI: `molecule_agent connect` + +One command bootstraps the full poll-mode loop. No code beyond your handler: + +```bash +python -m molecule_agent connect \ + --platform-url https://your-tenant.moleculesai.app \ + --workspace-id 550e8400-… \ + --token your-workspace-token \ + --handler my_handlers:echo \ + --poll-interval 5 \ + --cursor-file ~/.molecule/cursor +``` + +Where `my_handlers.py` is anywhere on `PYTHONPATH`: + +```python +def echo(msg, client): + return f"echo: {msg.text}" +``` + +All flags also read from environment variables (`MOLECULE_PLATFORM_URL`, `MOLECULE_WORKSPACE_ID`, `MOLECULE_WORKSPACE_TOKEN`, `MOLECULE_POLL_INTERVAL`, `MOLECULE_CURSOR_FILE`). SIGTERM/SIGINT shut the loop down cleanly. ## What it doesn't do (yet) -- **No inbound A2A server.** Other agents can't initiate calls to your remote - agent unless you host an HTTP endpoint yourself. Future `start_a2a_server()` - helper will close this gap. +- **No long-poll.** Activity polling is fixed-cadence (default 5s). Server-side long-poll support would cut p50 inbound latency to ~0; tracked separately. - **No automatic reconnect after token loss.** If `~/.molecule//.auth_token` is deleted, you'll need to re-issue the token via the platform admin (since `POST /registry/register` is idempotent — it won't mint a second token for diff --git a/molecule_agent/__init__.py b/molecule_agent/__init__.py index 372edf4..3c92ddd 100644 --- a/molecule_agent/__init__.py +++ b/molecule_agent/__init__.py @@ -41,6 +41,16 @@ from .client import ( WorkspaceState, verify_plugin_sha256, ) +from .inbound import ( + CursorLostError, + DEFAULT_POLL_INTERVAL, + InboundDelivery, + InboundMessage, + InboundSource, + MessageHandler, + PollDelivery, + PushDelivery, +) # compute_plugin_sha256 lives in __main__ (the CLI entry point). # Import it here so `from molecule_agent import compute_plugin_sha256` works. @@ -51,6 +61,14 @@ __all__ = [ "RemoteAgentClient", "WorkspaceState", "PeerInfo", + "InboundMessage", + "InboundSource", + "InboundDelivery", + "PollDelivery", + "PushDelivery", + "MessageHandler", + "CursorLostError", + "DEFAULT_POLL_INTERVAL", "compute_plugin_sha256", "verify_plugin_sha256", "__version__", diff --git a/molecule_agent/__main__.py b/molecule_agent/__main__.py index 38c5b97..6534662 100644 --- a/molecule_agent/__main__.py +++ b/molecule_agent/__main__.py @@ -6,12 +6,24 @@ Commands: plugin.yaml (self-referential). Output the hash so you can paste it into plugin.yaml under the sha256 field. + + connect Register and run a remote agent against a + Molecule platform — heartbeat + state-poll + + inbound message poll, all in one process. + Loads a user-supplied handler module:func + and dispatches every inbound A2A message. + Designed for hermes / codex / any third-party + runtime that can't expose a reachable URL. """ from __future__ import annotations import argparse import hashlib +import importlib import json +import logging +import os +import signal import sys from pathlib import Path @@ -52,6 +64,119 @@ def compute_plugin_sha256(plugin_dir: Path) -> str: return hashlib.sha256(manifest_bytes).hexdigest() +def _resolve_handler(spec: str): + """Resolve a ``module.path:function`` spec into the callable. + + Mirrors the convention used by gunicorn / uvicorn / celery for app + references — a single string the user can put in a config file or env + var. Raises ``SystemExit`` with a readable message on any failure + (import, attribute lookup, non-callable result) so the CLI's exit + surface is clean. + """ + if ":" not in spec: + raise SystemExit( + f"error: handler spec {spec!r} must be of the form 'module.path:function'" + ) + mod_path, func_name = spec.split(":", 1) + if not mod_path or not func_name: + raise SystemExit(f"error: handler spec {spec!r} is malformed") + try: + # Importing the user's module pulls in their code — we run it from + # the current working directory by default so 'my_handler:fn' works + # without setting PYTHONPATH first. + if "" not in sys.path: + sys.path.insert(0, "") + module = importlib.import_module(mod_path) + except Exception as exc: + raise SystemExit(f"error: could not import {mod_path}: {exc}") + try: + func = getattr(module, func_name) + except AttributeError: + raise SystemExit(f"error: {mod_path} has no attribute {func_name!r}") + if not callable(func): + raise SystemExit(f"error: {spec} is not callable") + return func + + +def _connect_command(args: argparse.Namespace) -> int: + """Run the register + heartbeat + inbound-poll loop. + + Returns the process exit code. 0 on graceful exit (paused/removed/SIGTERM), + non-zero on registration / handler-import failures. + """ + # Lazy import — the connect path pulls in requests + the full client, + # while verify-sha256 should stay light. + from .client import RemoteAgentClient + from .inbound import PollDelivery + + logging.basicConfig( + level=logging.INFO if args.verbose else logging.WARNING, + format="[molecule] %(message)s", + ) + + handler = _resolve_handler(args.handler) + + client = RemoteAgentClient( + workspace_id=args.workspace_id, + platform_url=args.platform_url, + agent_card={"name": args.agent_name or f"remote-{args.workspace_id[:8]}"}, + reported_url=args.reported_url or "", + ) + + if args.token: + # User passed a token explicitly — persist it so register() can be + # skipped on a known-tokened workspace. The platform's register + # endpoint refuses to issue a second token when one is on file. + client.save_token(args.token) + + # If we don't have a token yet (and one wasn't provided), call register + # so the platform mints one. On a known-tokened workspace this still + # succeeds and just returns the cached token. + if client.load_token() is None: + try: + client.register() + except Exception as exc: + print(f"[molecule] register failed: {exc}", file=sys.stderr) + return 2 + + print( + f"[molecule] connected as {args.workspace_id} " + f"(platform={args.platform_url}, delivery=poll, interval={args.poll_interval}s)" + ) + + cursor_file = None + if args.cursor_file: + cursor_file = Path(args.cursor_file).expanduser() + + delivery = PollDelivery( + client, + interval=args.poll_interval, + cursor_file=cursor_file, + ) + + # Graceful shutdown on SIGINT / SIGTERM. The loop's built-in stop + # condition is platform-driven (paused / deleted), so we install a + # signal handler that sets max_iterations to the loop counter +1 + # by raising KeyboardInterrupt — caught below. + def _on_signal(_sig, _frame): + raise KeyboardInterrupt + + signal.signal(signal.SIGINT, _on_signal) + signal.signal(signal.SIGTERM, _on_signal) + + try: + terminal = client.run_agent_loop(handler, delivery=delivery) + print(f"[molecule] platform reports workspace {terminal} — exiting") + return 0 + except KeyboardInterrupt: + print("[molecule] received signal — shutting down cleanly") + try: + delivery.stop() + except Exception: + pass + return 0 + + def main() -> None: parser = argparse.ArgumentParser( prog="molecule_agent", @@ -69,6 +194,74 @@ def main() -> None: help="Path to the plugin directory (must contain plugin.yaml)", ) + cn = sub.add_parser( + "connect", + help=( + "Register and run a remote agent against a Molecule platform — " + "heartbeat + state-poll + inbound A2A message dispatch." + ), + ) + cn.add_argument( + "--platform-url", + required=True, + default=os.environ.get("MOLECULE_PLATFORM_URL"), + help="Base URL of the Molecule platform (env: MOLECULE_PLATFORM_URL)", + ) + cn.add_argument( + "--workspace-id", + required=True, + default=os.environ.get("MOLECULE_WORKSPACE_ID"), + help="UUID of the workspace this agent claims (env: MOLECULE_WORKSPACE_ID)", + ) + cn.add_argument( + "--token", + default=os.environ.get("MOLECULE_WORKSPACE_TOKEN"), + help=( + "Pre-issued workspace bearer token (env: MOLECULE_WORKSPACE_TOKEN). " + "If omitted, the CLI calls /registry/register and caches the issued token." + ), + ) + cn.add_argument( + "--handler", + required=True, + help=( + "Handler spec in 'module.path:function' form. The function receives " + "(InboundMessage, RemoteAgentClient) and returns a reply string or None." + ), + ) + cn.add_argument( + "--agent-name", + default=os.environ.get("MOLECULE_AGENT_NAME"), + help="Name in the agent_card (env: MOLECULE_AGENT_NAME). Defaults to remote-.", + ) + cn.add_argument( + "--reported-url", + default=os.environ.get("MOLECULE_REPORTED_URL", ""), + help=( + "Externally-reachable URL siblings can call. Empty = poll-only mode " + "(env: MOLECULE_REPORTED_URL)." + ), + ) + cn.add_argument( + "--poll-interval", + type=float, + default=float(os.environ.get("MOLECULE_POLL_INTERVAL", "5.0")), + help="Seconds between activity polls (env: MOLECULE_POLL_INTERVAL).", + ) + cn.add_argument( + "--cursor-file", + default=os.environ.get("MOLECULE_CURSOR_FILE"), + help=( + "Path to persist the activity cursor across restarts (env: " + "MOLECULE_CURSOR_FILE). Default: in-process only." + ), + ) + cn.add_argument( + "--verbose", + action="store_true", + help="Enable INFO-level logging.", + ) + args = parser.parse_args() if args.command == "verify-sha256": @@ -80,6 +273,8 @@ def main() -> None: print(f"Computed SHA256: {h}") except Exception as exc: sys.exit(f"error: {exc}") + elif args.command == "connect": + sys.exit(_connect_command(args)) else: parser.print_help() diff --git a/molecule_agent/client.py b/molecule_agent/client.py index f72d51d..05b007f 100644 --- a/molecule_agent/client.py +++ b/molecule_agent/client.py @@ -31,10 +31,13 @@ import time import uuid from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import requests +if TYPE_CHECKING: + from .inbound import InboundDelivery, InboundMessage, MessageHandler + logger = logging.getLogger(__name__) # Polling cadence defaults. Chosen to align with the platform's 60-second @@ -687,6 +690,266 @@ class RemoteAgentClient: resp.raise_for_status() return resp.json() + # ------------------------------------------------------------------ + # Inbound delivery (poll mode) — Phase 30.8c + # ------------------------------------------------------------------ + + def fetch_inbound( + self, + since_id: str | None = None, + limit: int = 100, + type: str = "a2a_receive", + ) -> list["InboundMessage"]: + """Fetch one batch of inbound A2A activity rows. + + Hits ``GET /workspaces/:id/activity?type=…&since_id=…&limit=…``. + Returns the rows newer than ``since_id`` in oldest-first order, + parsed into :class:`~molecule_agent.inbound.InboundMessage`. + + Used by :class:`~molecule_agent.inbound.PollDelivery`; most callers + should drive this through :py:meth:`run_agent_loop` rather than + polling manually. + + Args: + since_id: Activity-id cursor — only rows newer than this are + returned. Pass ``None`` for the initial fetch. + limit: Max rows per batch. Default 100. Server-side cap may + lower this. + type: Activity-row type filter. Default ``"a2a_receive"``; + pass another type to consume different streams (e.g. + ``"workspace_state_changed"``). + + Returns: + List of :class:`InboundMessage`, oldest first. May be empty. + + Raises: + :class:`~molecule_agent.inbound.CursorLostError`: if the server + returns 410 Gone (cursor's row has been rotated out of the + activity window). Caller should reset the cursor and retry. + ``requests.HTTPError``: on other non-2xx responses (401, 5xx, …). + """ + # Local import to avoid a circular dependency at module load — the + # inbound module references RemoteAgentClient via TYPE_CHECKING. + from .inbound import CursorLostError, _parse_activity_row + + params: dict[str, str] = {"type": type, "limit": str(int(limit))} + if since_id: + params["since_id"] = since_id + url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity" + resp = self._session.get( + url, + headers=self._auth_headers(), + params=params, + timeout=15.0, + ) + if resp.status_code == 410: + raise CursorLostError( + f"cursor {since_id!r} no longer valid (410 Gone); reset and re-poll" + ) + # 429 retry: rebuild the URL with encoded query string and route + # through _get_with_retry, which honours Retry-After + jittered + # backoff. We only retry on 429 — every other status falls through + # to raise_for_status below. + if resp.status_code == 429: + from urllib.parse import urlencode + resp = self._get_with_retry( + url + "?" + urlencode(params), + headers=self._auth_headers(), + ) + resp.raise_for_status() + + rows = resp.json() or [] + if not isinstance(rows, list): + # Defensive: if the server ever wraps in {"items": […]} we + # accept that shape too rather than silently dropping data. + rows = rows.get("items", []) if isinstance(rows, dict) else [] + + out: list["InboundMessage"] = [] + for row in rows: + if not isinstance(row, dict): + continue + msg = _parse_activity_row(row) + if msg is not None: + out.append(msg) + return out + + def reply(self, message: "InboundMessage", text: str) -> None: + """Reply to an inbound message. + + The reply transport is picked from ``message.source``: + + * ``canvas_user`` → ``POST /workspaces/:id/notify`` with + ``{"message": text}``. The canvas surfaces the text to the user. + * ``peer_agent`` → ``POST /workspaces/:peer_id/a2a`` with a JSON-RPC + ``message/send`` envelope and ``X-Source-Workspace-Id`` header. + * ``unknown`` → raises ``ValueError``. The SDK refuses to guess the + transport; the caller should inspect ``message.raw`` and use + :py:meth:`call_peer` or a direct HTTP call as appropriate. + + Args: + message: The :class:`InboundMessage` being replied to. Determines + the transport. + text: Reply text. Empty / whitespace-only strings raise + ``ValueError`` to prevent accidental silent acks. + + Raises: + ``ValueError``: on empty text or unknown source. + ``requests.HTTPError``: on non-2xx server response. + """ + if not text or not text.strip(): + raise ValueError("reply text must be non-empty") + + if message.source == "canvas_user": + resp = self._session.post( + f"{self.platform_url}/workspaces/{self.workspace_id}/notify", + headers={ + **self._auth_headers(), + "Content-Type": "application/json", + }, + json={"message": text}, + timeout=15.0, + ) + resp.raise_for_status() + return + + if message.source == "peer_agent": + target = message.source_id or "" + if not target: + raise ValueError( + "peer_agent inbound message has no source_id — cannot route reply" + ) + body = { + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "role": "agent", + "messageId": str(uuid.uuid4()), + "parts": [{"kind": "text", "text": text}], + } + }, + } + resp = self._session.post( + f"{self.platform_url}/workspaces/{target}/a2a", + headers={ + **self._auth_headers(), + "X-Source-Workspace-Id": self.workspace_id, + "X-Workspace-ID": self.workspace_id, + "Content-Type": "application/json", + }, + json=body, + timeout=15.0, + ) + resp.raise_for_status() + return + + raise ValueError( + f"cannot auto-route reply for source={message.source!r}; " + "inspect message.raw and call /notify or /a2a directly" + ) + + def run_agent_loop( + self, + handler: "MessageHandler", + delivery: "InboundDelivery | None" = None, + max_iterations: int | None = None, + task_supplier: "callable | None" = None, + ) -> str: + """Combined heartbeat + state-poll + inbound-delivery loop. + + Generalization of :py:meth:`run_heartbeat_loop` that also drains + inbound messages on every tick. This is the recommended entry + point for an external agent author — registers, heartbeats, + state-polls, and dispatches inbound, all in one sync call. + + Args: + handler: ``Callable[[InboundMessage, RemoteAgentClient], + str | None | Awaitable[str | None]]``. Invoked once per + inbound message. Returning a non-empty string sends an + automatic reply via :py:meth:`reply`. ``None`` skips the + reply (useful for fire-and-forget consumers). + delivery: An :class:`InboundDelivery` implementation. Defaults + to :class:`PollDelivery` (the right choice when the agent + can't expose an inbound URL — i.e. ``reported_url`` is + empty or starts with ``remote://``). Pass an explicit + :class:`PushDelivery` (constructed around an + :class:`A2AServer`) for push-mode agents. + max_iterations: Stop after N iterations. ``None`` = run until + the platform reports the workspace paused or deleted. + task_supplier: Optional zero-arg callable returning a dict + ``{"current_task": str, "active_tasks": int}`` reported on + each heartbeat (same contract as :py:meth:`run_heartbeat_loop`). + + Returns: + The terminal status: ``"paused"``, ``"removed"``, or + ``"max_iterations"``. + + Errors from the activity poll, heartbeat, or state poll are + logged and the loop continues — a transient platform hiccup + should not take a remote agent offline. Handler exceptions are + caught at the delivery layer (see :class:`PollDelivery`). + """ + from .inbound import PollDelivery + + if delivery is None: + delivery = PollDelivery(self) + + i = 0 + try: + while True: + if max_iterations is not None and i >= max_iterations: + return "max_iterations" + i += 1 + + report: dict[str, Any] = {} + if task_supplier is not None: + try: + report = task_supplier() or {} + except Exception as exc: + logger.warning("task_supplier raised: %s", exc) + + try: + self.heartbeat( + current_task=str(report.get("current_task", "")), + active_tasks=int(report.get("active_tasks", 0)), + ) + except Exception as exc: + logger.warning("heartbeat failed: %s — continuing", exc) + + try: + delivery.run_once(handler) + except Exception as exc: + logger.warning("inbound delivery.run_once raised: %s — continuing", exc) + + try: + state = self.poll_state() + except Exception as exc: + logger.warning("state poll failed: %s — continuing", exc) + state = None + + if state is not None and state.should_stop: + logger.info( + "platform reports workspace %s (paused=%s deleted=%s) — exiting", + state.status, state.paused, state.deleted, + ) + return state.status + + # Sleep cadence: take the smaller of heartbeat_interval and + # the delivery's poll interval (when present) so inbound + # latency is bounded by the delivery's setting, not by the + # heartbeat cadence. + interval = self.heartbeat_interval + poll_interval = getattr(delivery, "interval", None) + if isinstance(poll_interval, (int, float)) and poll_interval > 0: + interval = min(interval, float(poll_interval)) + time.sleep(interval) + finally: + try: + delivery.stop() + except Exception as exc: + logger.warning("delivery.stop raised on loop exit: %s", exc) + # ------------------------------------------------------------------ # Delegation — KI-002 idempotency guard # ------------------------------------------------------------------ diff --git a/molecule_agent/inbound.py b/molecule_agent/inbound.py new file mode 100644 index 0000000..b56a4e0 --- /dev/null +++ b/molecule_agent/inbound.py @@ -0,0 +1,371 @@ +"""Poll-mode inbound delivery for remote agents that can't expose an HTTP endpoint. + +The :class:`A2AServer` companion (Phase 30.8b) covers the case where an agent +can host a publicly reachable HTTP endpoint and the platform pushes work to it. +Many real adopters can't — laptops behind NAT, ephemeral CI runners, hermes +self-hosted on a developer machine. For those, the platform queues inbound +A2A messages on the workspace's ``activity_logs`` and the agent polls. + +This module provides: + +* :class:`InboundMessage` — typed view over an ``activity_logs`` row that + carries an ``a2a_receive`` event. Source is normalized to ``canvas_user`` + vs ``peer_agent`` so the SDK can route replies without the caller having + to know which envelope to use. +* :class:`CursorLostError` — raised when the activity endpoint returns + 410 Gone (the cursor's row was rotated out). Caller resets and re-polls. +* :class:`InboundDelivery` — protocol that ``run_agent_loop`` accepts; both + :class:`PollDelivery` and :class:`PushDelivery` satisfy it. +* :class:`PollDelivery` — the new poll-mode implementation. +* :class:`PushDelivery` — thin wrapper over :class:`A2AServer` so the same + ``run_agent_loop`` works for push-mode agents that expose an inbound URL. + +Big-tech prior art: Slack Socket Mode, Telegram getUpdates, AWS SQS long +polling, Stripe ``stripe listen``. Same shape — cursor-based poll, SDK-owned +loop, single handler callback, smart-reply hidden behind the SDK. +""" +from __future__ import annotations + +import asyncio +import inspect +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import ( + Any, + Awaitable, + Callable, + Literal, + Protocol, + TYPE_CHECKING, + runtime_checkable, +) + +if TYPE_CHECKING: + from .client import RemoteAgentClient + +logger = logging.getLogger(__name__) + + +InboundSource = Literal["canvas_user", "peer_agent", "unknown"] + + +@dataclass +class InboundMessage: + """One inbound A2A event the agent must handle. + + The ``activity_id`` is the cursor — pass it as ``since_id`` on the next + fetch to avoid re-receiving this message. + + ``source`` is normalized so the SDK can pick the reply transport: + + * ``canvas_user`` — a user typing in the canvas chat. Reply via + ``POST /workspaces/:id/notify``. + * ``peer_agent`` — another workspace's agent. Reply via + ``POST /workspaces/:peer_id/a2a`` with a JSON-RPC envelope and + ``X-Source-Workspace-Id`` header. + * ``unknown`` — the activity row didn't carry a recognizable source. + :py:meth:`RemoteAgentClient.reply` raises ``ValueError`` rather than + guess. + """ + + activity_id: str + source: InboundSource + source_id: str + text: str + raw: dict[str, Any] = field(default_factory=dict) + + +class CursorLostError(Exception): + """Raised when ``GET /workspaces/:id/activity`` returns 410 Gone. + + The platform retires old activity rows on a fixed window (see + workspace-server's activity_logs retention policy). If the agent's + cursor points at a row that has been rotated out, the server replies + 410. Callers should reset the cursor (``since_id=None``) and re-poll; + they will catch up on whatever's still in the window. + """ + + +# --------------------------------------------------------------------------- +# Activity row → InboundMessage parsing +# --------------------------------------------------------------------------- + + +def _parse_activity_row(row: dict[str, Any]) -> InboundMessage | None: + """Convert one ``activity_logs`` row into an :class:`InboundMessage`. + + Returns ``None`` if the row is malformed or doesn't carry text we can + deliver — preferable to raising and aborting the whole poll batch. + + Activity row shape (per workspace-server's handlers/activity.go): + ``{"id": ..., "type": "a2a_receive", "source_id": ..., "data": {...}, ...}`` + """ + aid = str(row.get("id") or "") + if not aid: + return None + + data = row.get("data") if isinstance(row.get("data"), dict) else {} + source_kind = str(data.get("source") or row.get("source") or "") + source_id = str(row.get("source_id") or data.get("source_id") or "") + + # Normalize source. The platform uses "canvas_user" / "peer_agent" / + # sometimes "user" (legacy). Anything else falls into "unknown" so we + # don't accidentally route a reply down the wrong transport. + source: InboundSource + if source_kind in ("canvas_user", "user"): + source = "canvas_user" + elif source_kind == "peer_agent": + source = "peer_agent" + elif source_id and source_id != "user": + # Heuristic: a non-empty source_id that isn't the "user" sentinel + # is almost certainly a peer workspace. + source = "peer_agent" + elif source_id == "user": + source = "canvas_user" + else: + source = "unknown" + + text = str(data.get("text") or data.get("message") or "") + + return InboundMessage( + activity_id=aid, + source=source, + source_id=source_id, + text=text, + raw=row, + ) + + +# --------------------------------------------------------------------------- +# Handler + delivery protocol +# --------------------------------------------------------------------------- + + +# A handler receives the inbound message + the client (so it can reply, fetch +# secrets, call peers, etc.) and returns either a reply string or None. +# Sync OR async — :class:`PollDelivery` detects ``Awaitable`` results and +# awaits them, mirroring the pattern in :class:`A2AServer`. +MessageHandler = Callable[ + ["InboundMessage", "RemoteAgentClient"], + "str | None | Awaitable[str | None]", +] + + +@runtime_checkable +class InboundDelivery(Protocol): + """The contract :py:meth:`RemoteAgentClient.run_agent_loop` calls into. + + Two implementations ship with the SDK: + + * :class:`PollDelivery` — for agents without a reachable URL. + * :class:`PushDelivery` — for agents that host an A2AServer. + + Third parties can supply their own (e.g. WebSocket, gRPC streaming) + by satisfying this protocol. + """ + + def run_once(self, handler: MessageHandler) -> int: + """Drain one batch of inbound messages and dispatch to handler. + + Returns the count of messages dispatched. The caller's outer loop + decides cadence / sleep. + """ + ... + + def stop(self) -> None: + """Release any resources (close sockets, stop background threads).""" + ... + + +# --------------------------------------------------------------------------- +# PollDelivery — the new path +# --------------------------------------------------------------------------- + + +# Default poll cadence. 5s gives <5s p50 latency for canvas-user messages +# while keeping load on workspace-server modest (one GET per agent per 5s). +# Slack Socket Mode runs at ~1s, Telegram getUpdates with timeout=30 is the +# canonical long-poll. We don't have long-poll support server-side yet, so +# fixed 5s is the conservative choice. Tunable via constructor. +DEFAULT_POLL_INTERVAL = 5.0 + + +class PollDelivery: + """Poll ``GET /workspaces/:id/activity?type=a2a_receive&since_id=…``. + + The cursor is process-memory by default; a restart re-polls from + scratch, which is harmless because handlers should be idempotent + (the platform makes no exactly-once guarantees on activity poll — + the same SDK-level convention as Slack Events API). + + Pass ``cursor_file`` to persist the cursor across restarts: + + PollDelivery(client, cursor_file=Path("~/.molecule/cursor")) + + Cursor-loss (HTTP 410) is handled transparently — the cursor is + reset to ``None`` and the next poll starts fresh with whatever's in + the activity window. + """ + + def __init__( + self, + client: "RemoteAgentClient", + interval: float = DEFAULT_POLL_INTERVAL, + type: str = "a2a_receive", + limit: int = 100, + cursor_file: Path | None = None, + ) -> None: + self._client = client + self.interval = interval + self.type = type + self.limit = limit + self._cursor_file = cursor_file + self._cursor: str | None = self._load_cursor() + self._stopped = False + + def _load_cursor(self) -> str | None: + if self._cursor_file is None or not self._cursor_file.exists(): + return None + try: + cur = self._cursor_file.read_text().strip() + return cur or None + except OSError as exc: + logger.warning("could not read cursor file %s: %s", self._cursor_file, exc) + return None + + def _save_cursor(self) -> None: + if self._cursor_file is None or self._cursor is None: + return + try: + self._cursor_file.parent.mkdir(parents=True, exist_ok=True) + self._cursor_file.write_text(self._cursor) + except OSError as exc: + logger.warning("could not write cursor file %s: %s", self._cursor_file, exc) + + @property + def cursor(self) -> str | None: + """Current cursor (``activity_id`` of the most recently dispatched + message). Useful for tests and observability.""" + return self._cursor + + def run_once(self, handler: MessageHandler) -> int: + """Fetch one batch and dispatch each message to ``handler``. + + Returns the number of messages dispatched. The cursor advances past + every dispatched row, including ones whose handler raised — a + poison-pill input shouldn't block the queue forever. The handler + is responsible for surfacing its own errors via logging or its own + observability. This matches Slack Events delivery and SQS DLQ + semantics; the platform makes no exactly-once guarantees on + activity poll, so handlers must be idempotent regardless. + """ + if self._stopped: + return 0 + try: + batch = self._client.fetch_inbound( + since_id=self._cursor, + limit=self.limit, + type=self.type, + ) + except CursorLostError: + logger.info("cursor %s lost (410 Gone) — resetting", self._cursor) + self._cursor = None + return 0 + + dispatched = 0 + for msg in batch: + try: + self._dispatch(handler, msg) + except Exception as exc: + # Log + continue. We DO advance the cursor past this message + # so a poison-pill input doesn't block the queue forever — + # this matches how Slack Events delivers and how SQS DLQs + # work. The handler is expected to surface its own errors + # via logging or its own observability. + logger.exception("handler raised on activity %s: %s", msg.activity_id, exc) + self._cursor = msg.activity_id + dispatched += 1 + + if dispatched: + self._save_cursor() + return dispatched + + def _dispatch(self, handler: MessageHandler, msg: "InboundMessage") -> None: + """Invoke handler, await if async, send the reply if returned.""" + result = handler(msg, self._client) + if inspect.isawaitable(result): + # Detect a running loop without using the deprecated + # asyncio.get_event_loop() (Py3.12+). If a loop is running we + # refuse — the caller is async and should await the handler + # themselves; we can't synchronously block on an awaitable + # without deadlocking the running loop. + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop — safe to spin up a fresh one. Mirrors + # A2AServer's pattern: build, run, close. asyncio.run is + # the modern equivalent of new_loop+run_until_complete+close + # and handles the close even on exception. + result = asyncio.run(result) # type: ignore[arg-type] + else: + raise RuntimeError( + "PollDelivery.run_once was called from inside a running " + "event loop with an async handler. Use a sync handler " + "here, or schedule run_once on a worker thread via " + "asyncio.to_thread()." + ) + + reply_text = result if isinstance(result, str) else None + if reply_text: + try: + self._client.reply(msg, reply_text) + except Exception as exc: + logger.warning("reply send failed for activity %s: %s", msg.activity_id, exc) + + def stop(self) -> None: + self._stopped = True + + +# --------------------------------------------------------------------------- +# PushDelivery — wraps the existing A2AServer +# --------------------------------------------------------------------------- + + +class PushDelivery: + """Adapt :class:`A2AServer` to the :class:`InboundDelivery` protocol. + + Use this when the agent CAN expose a reachable HTTP endpoint. The + A2AServer runs in its own thread and dispatches to ``handler`` as + HTTP requests arrive — ``run_once`` is a no-op (the loop driver in + :py:meth:`RemoteAgentClient.run_agent_loop` simply sleeps and + keeps the heartbeat alive). + """ + + def __init__(self, client: "RemoteAgentClient", server: Any) -> None: + # ``server`` typed Any to avoid a circular import; it's an A2AServer. + self._client = client + self._server = server + + def run_once(self, handler: MessageHandler) -> int: # noqa: ARG002 — handler unused + # A2AServer dispatches synchronously on its own thread; nothing + # for the outer loop to do per-tick. + return 0 + + def stop(self) -> None: + try: + self._server.stop() + except Exception as exc: + logger.warning("PushDelivery stop: A2AServer.stop raised: %s", exc) + + +__all__ = [ + "CursorLostError", + "DEFAULT_POLL_INTERVAL", + "InboundDelivery", + "InboundMessage", + "InboundSource", + "MessageHandler", + "PollDelivery", + "PushDelivery", +] diff --git a/tests/test_cli_connect.py b/tests/test_cli_connect.py new file mode 100644 index 0000000..59c9e72 --- /dev/null +++ b/tests/test_cli_connect.py @@ -0,0 +1,167 @@ +"""Tests for `python -m molecule_agent connect` CLI handler resolution. + +Run-loop integration is covered by tests/test_inbound.py — these tests only +exercise the CLI's argument parsing, handler resolution, and the +register-on-missing-token behavior. We do not start the full loop because +that's already covered, and starting it from a CLI test runs into signal ++ event-loop interactions that aren't worth reproducing here. +""" +from __future__ import annotations + +import sys +import textwrap +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from molecule_agent.__main__ import _resolve_handler + + +# --------------------------------------------------------------------------- +# _resolve_handler +# --------------------------------------------------------------------------- + + +def _write_handler_module(tmp_path: Path, name: str, body: str) -> None: + """Drop a handler module into tmp_path and prepend tmp_path to sys.path.""" + p = tmp_path / f"{name}.py" + p.write_text(textwrap.dedent(body)) + if str(tmp_path) not in sys.path: + sys.path.insert(0, str(tmp_path)) + + +def test_resolve_handler_happy_path(tmp_path: Path): + _write_handler_module( + tmp_path, + "ok_handler_mod", + """ + def echo(msg, client): + return msg.text + """, + ) + fn = _resolve_handler("ok_handler_mod:echo") + assert callable(fn) + # Sanity-check the resolved callable's name. + assert fn.__name__ == "echo" + + +def test_resolve_handler_missing_colon_exits(tmp_path: Path): + with pytest.raises(SystemExit, match="must be of the form"): + _resolve_handler("not_a_spec_no_colon") + + +def test_resolve_handler_empty_module_exits(): + with pytest.raises(SystemExit, match="malformed"): + _resolve_handler(":fn") + + +def test_resolve_handler_empty_function_exits(): + with pytest.raises(SystemExit, match="malformed"): + _resolve_handler("mod:") + + +def test_resolve_handler_import_error_exits(): + with pytest.raises(SystemExit, match="could not import"): + _resolve_handler("definitely_not_a_real_module_xyzzy:fn") + + +def test_resolve_handler_attribute_error_exits(tmp_path: Path): + _write_handler_module( + tmp_path, + "no_func_mod", + """ + OTHER = 1 + """, + ) + with pytest.raises(SystemExit, match="no attribute"): + _resolve_handler("no_func_mod:not_there") + + +def test_resolve_handler_not_callable_exits(tmp_path: Path): + _write_handler_module( + tmp_path, + "not_callable_mod", + """ + IT_IS_AN_INT = 42 + """, + ) + with pytest.raises(SystemExit, match="not callable"): + _resolve_handler("not_callable_mod:IT_IS_AN_INT") + + +# --------------------------------------------------------------------------- +# _connect_command — registration / token-loading branches +# --------------------------------------------------------------------------- + + +def test_connect_command_register_failure_returns_2(tmp_path: Path, monkeypatch): + _write_handler_module( + tmp_path, + "rcfail_mod", + """ + def fn(msg, client): + return None + """, + ) + + from molecule_agent import __main__ as cli_mod + + args = MagicMock() + args.handler = "rcfail_mod:fn" + args.platform_url = "http://platform.test" + args.workspace_id = "ws-zzz" + args.token = None + args.agent_name = None + args.reported_url = "" + args.poll_interval = 1.0 + args.cursor_file = None + args.verbose = False + + fake_client = MagicMock() + fake_client.load_token.return_value = None # no cached token + fake_client.register.side_effect = RuntimeError("network sad") + + with patch("molecule_agent.client.RemoteAgentClient", return_value=fake_client): + rc = cli_mod._connect_command(args) + assert rc == 2 + + +def test_connect_command_uses_provided_token_skips_register(tmp_path: Path, monkeypatch): + _write_handler_module( + tmp_path, + "tokset_mod", + """ + def fn(msg, client): + return None + """, + ) + + from molecule_agent import __main__ as cli_mod + + args = MagicMock() + args.handler = "tokset_mod:fn" + args.platform_url = "http://platform.test" + args.workspace_id = "ws-zzz" + args.token = "explicit-token" + args.agent_name = None + args.reported_url = "" + args.poll_interval = 1.0 + args.cursor_file = None + args.verbose = False + + fake_client = MagicMock() + # Once save_token has been called, load_token should return the token, + # so register is NOT called. + fake_client.load_token.return_value = "explicit-token" + # run_agent_loop returns a terminal status — paused — so the function + # exits 0 cleanly without us having to signal-break the loop. + fake_client.run_agent_loop.return_value = "paused" + + with patch("molecule_agent.client.RemoteAgentClient", return_value=fake_client): + rc = cli_mod._connect_command(args) + + assert rc == 0 + fake_client.save_token.assert_called_once_with("explicit-token") + fake_client.register.assert_not_called() + fake_client.run_agent_loop.assert_called_once() diff --git a/tests/test_inbound.py b/tests/test_inbound.py new file mode 100644 index 0000000..3fa1ebc --- /dev/null +++ b/tests/test_inbound.py @@ -0,0 +1,640 @@ +"""Tests for poll-mode inbound delivery (Phase 30.8c). + +Covers: + +* :func:`_parse_activity_row` source normalization and edge cases. +* :py:meth:`RemoteAgentClient.fetch_inbound` happy path, cursor, 410, shapes. +* :py:meth:`RemoteAgentClient.reply` smart-routing (canvas vs peer). +* :class:`PollDelivery` cursor advancement, async/sync handler dispatch, + error handling, 410 reset, cursor-file persistence, stop(). +* :py:meth:`RemoteAgentClient.run_agent_loop` heartbeat + state + delivery + composition, default-delivery selection, terminal-status handling, sleep + cadence selection. + +Mocking style matches ``tests/test_remote_agent.py``: a ``FakeResponse`` / +``MagicMock`` session, no third-party HTTP mock library. +""" +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import pytest +import requests + +from molecule_agent import ( + CursorLostError, + InboundMessage, + PollDelivery, + PushDelivery, + RemoteAgentClient, + WorkspaceState, +) +from molecule_agent.inbound import _parse_activity_row + + +# --------------------------------------------------------------------------- +# FakeResponse — same shape as the existing test_remote_agent helper +# --------------------------------------------------------------------------- + + +class FakeResponse: + def __init__(self, status_code: int = 200, json_body: Any = None, text: str = ""): + self.status_code = status_code + self._json = json_body + self.text = text + self.headers: dict[str, str] = {} + + def json(self) -> Any: + return self._json + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise requests.HTTPError(f"HTTP {self.status_code}") + + +@pytest.fixture +def tmp_token_dir(tmp_path: Path) -> Path: + return tmp_path / "molecule-token-cache" + + +@pytest.fixture +def client(tmp_token_dir: Path) -> RemoteAgentClient: + session = MagicMock() + c = RemoteAgentClient( + workspace_id="ws-abc-123", + platform_url="http://platform.test", + agent_card={"name": "test-agent"}, + token_dir=tmp_token_dir, + session=session, + ) + # Pre-seed the cached token so _auth_headers returns one and we don't + # have to mock /registry/register on every test. + c.save_token("test-token-secret") + return c + + +# --------------------------------------------------------------------------- +# _parse_activity_row +# --------------------------------------------------------------------------- + + +def test_parse_activity_row_canvas_user_explicit(): + row = { + "id": "act-1", + "type": "a2a_receive", + "source_id": "user", + "data": {"source": "canvas_user", "text": "hi"}, + } + msg = _parse_activity_row(row) + assert msg is not None + assert msg.activity_id == "act-1" + assert msg.source == "canvas_user" + assert msg.source_id == "user" + assert msg.text == "hi" + + +def test_parse_activity_row_legacy_user_normalizes_to_canvas(): + # Older platform versions used 'user' instead of 'canvas_user'. + row = {"id": "act-2", "data": {"source": "user", "text": "hello"}} + msg = _parse_activity_row(row) + assert msg is not None + assert msg.source == "canvas_user" + + +def test_parse_activity_row_peer_agent_explicit(): + row = { + "id": "act-3", + "source_id": "peer-ws-77", + "data": {"source": "peer_agent", "text": "ping"}, + } + msg = _parse_activity_row(row) + assert msg is not None + assert msg.source == "peer_agent" + assert msg.source_id == "peer-ws-77" + + +def test_parse_activity_row_inferred_peer_from_source_id(): + # No explicit source field but a non-'user' source_id present → infer peer_agent. + # This protects us from server-side variants that omit 'source' in data. + row = {"id": "act-4", "source_id": "peer-ws-88", "data": {"text": "ping"}} + msg = _parse_activity_row(row) + assert msg is not None + assert msg.source == "peer_agent" + + +def test_parse_activity_row_inferred_canvas_from_user_source_id(): + row = {"id": "act-5", "source_id": "user", "data": {"text": "hi"}} + msg = _parse_activity_row(row) + assert msg is not None + assert msg.source == "canvas_user" + + +def test_parse_activity_row_unknown_source_falls_through(): + # No source_id, no source → unknown. Reply path will refuse to guess. + row = {"id": "act-6", "data": {"text": "??"}} + msg = _parse_activity_row(row) + assert msg is not None + assert msg.source == "unknown" + + +def test_parse_activity_row_no_id_returns_none(): + row = {"data": {"source": "canvas_user", "text": "no id"}} + assert _parse_activity_row(row) is None + + +def test_parse_activity_row_text_alt_key(): + # Some server paths use 'message' instead of 'text'. Accept both. + row = {"id": "act-7", "data": {"source": "canvas_user", "message": "alt"}} + msg = _parse_activity_row(row) + assert msg is not None + assert msg.text == "alt" + + +# --------------------------------------------------------------------------- +# fetch_inbound +# --------------------------------------------------------------------------- + + +def test_fetch_inbound_happy_path(client: RemoteAgentClient): + rows = [ + {"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}, + {"id": "act-2", "source_id": "peer-77", "data": {"source": "peer_agent", "text": "ping"}}, + ] + client._session.get.return_value = FakeResponse(200, rows) + + out = client.fetch_inbound() + + assert len(out) == 2 + assert out[0].source == "canvas_user" + assert out[1].source == "peer_agent" + # Verify the GET shape. + call_args = client._session.get.call_args + assert call_args.args[0] == "http://platform.test/workspaces/ws-abc-123/activity" + assert call_args.kwargs["params"]["type"] == "a2a_receive" + assert call_args.kwargs["params"]["limit"] == "100" + assert "since_id" not in call_args.kwargs["params"] + + +def test_fetch_inbound_with_since_id_passes_cursor(client: RemoteAgentClient): + client._session.get.return_value = FakeResponse(200, []) + client.fetch_inbound(since_id="act-prev") + params = client._session.get.call_args.kwargs["params"] + assert params["since_id"] == "act-prev" + + +def test_fetch_inbound_410_raises_cursor_lost(client: RemoteAgentClient): + client._session.get.return_value = FakeResponse(410, {"error": "cursor lost"}) + with pytest.raises(CursorLostError): + client.fetch_inbound(since_id="act-stale") + + +def test_fetch_inbound_accepts_dict_items_wrapper(client: RemoteAgentClient): + # If a future server version wraps in {"items": [...]}, we still parse. + body = {"items": [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}]} + client._session.get.return_value = FakeResponse(200, body) + out = client.fetch_inbound() + assert len(out) == 1 + assert out[0].activity_id == "act-1" + + +def test_fetch_inbound_skips_malformed_rows(client: RemoteAgentClient): + rows = [ + {"id": "act-1", "data": {"source": "canvas_user", "text": "ok"}}, + "not a dict", + {"data": {"text": "no id"}}, # missing id → skipped + ] + client._session.get.return_value = FakeResponse(200, rows) + out = client.fetch_inbound() + assert len(out) == 1 + assert out[0].activity_id == "act-1" + + +def test_fetch_inbound_401_raises_http_error(client: RemoteAgentClient): + client._session.get.return_value = FakeResponse(401) + with pytest.raises(requests.HTTPError): + client.fetch_inbound() + + +def test_fetch_inbound_empty_returns_empty(client: RemoteAgentClient): + client._session.get.return_value = FakeResponse(200, []) + assert client.fetch_inbound() == [] + + +def test_fetch_inbound_429_retries_via_get_with_retry( + client: RemoteAgentClient, monkeypatch +): + """A 429 on the first GET should route through _get_with_retry, which + honours Retry-After / jittered backoff and eventually returns a 2xx. + """ + # Don't actually sleep during the retry — keeps the test fast. + monkeypatch.setattr("time.sleep", lambda _s: None) + + rows = [{"id": "act-after-retry", "data": {"source": "canvas_user", "text": "ok"}}] + + # First call: 429. Second call (the retry): 200 + rows. _get_with_retry + # will see 429 and call session.get again with the rebuilt URL — both + # responses come from the same mocked session.get, so we use side_effect. + first_429 = FakeResponse(429) + first_429.headers = {"Retry-After": "0"} + second_200 = FakeResponse(200, rows) + client._session.get.side_effect = [first_429, second_200] + + out = client.fetch_inbound(since_id="act-prev") + + assert len(out) == 1 + assert out[0].activity_id == "act-after-retry" + # Two GETs total: one 429, one 200. + assert client._session.get.call_count == 2 + + +# --------------------------------------------------------------------------- +# reply() +# --------------------------------------------------------------------------- + + +def test_reply_canvas_user_hits_notify(client: RemoteAgentClient): + msg = InboundMessage( + activity_id="act-1", source="canvas_user", source_id="user", text="hi" + ) + client._session.post.return_value = FakeResponse(200, {"status": "sent"}) + + client.reply(msg, "hello") + + call_args = client._session.post.call_args + assert call_args.args[0] == "http://platform.test/workspaces/ws-abc-123/notify" + assert call_args.kwargs["json"] == {"message": "hello"} + assert call_args.kwargs["headers"]["Authorization"] == "Bearer test-token-secret" + + +def test_reply_peer_agent_hits_a2a(client: RemoteAgentClient): + msg = InboundMessage( + activity_id="act-2", source="peer_agent", source_id="peer-ws-77", text="ping" + ) + client._session.post.return_value = FakeResponse(200, {"jsonrpc": "2.0", "result": {}}) + + client.reply(msg, "pong") + + call_args = client._session.post.call_args + assert call_args.args[0] == "http://platform.test/workspaces/peer-ws-77/a2a" + body = call_args.kwargs["json"] + assert body["jsonrpc"] == "2.0" + assert body["method"] == "message/send" + assert body["params"]["message"]["parts"][0]["text"] == "pong" + headers = call_args.kwargs["headers"] + assert headers["X-Source-Workspace-Id"] == "ws-abc-123" + assert headers["X-Workspace-ID"] == "ws-abc-123" + + +def test_reply_unknown_source_raises_value_error(client: RemoteAgentClient): + msg = InboundMessage(activity_id="act-3", source="unknown", source_id="", text="?") + with pytest.raises(ValueError, match="cannot auto-route"): + client.reply(msg, "won't send") + client._session.post.assert_not_called() + + +def test_reply_empty_text_raises_value_error(client: RemoteAgentClient): + msg = InboundMessage(activity_id="act-4", source="canvas_user", source_id="user", text="hi") + with pytest.raises(ValueError, match="non-empty"): + client.reply(msg, "") + with pytest.raises(ValueError, match="non-empty"): + client.reply(msg, " ") + client._session.post.assert_not_called() + + +def test_reply_peer_agent_missing_source_id_raises(client: RemoteAgentClient): + msg = InboundMessage(activity_id="act-5", source="peer_agent", source_id="", text="?") + with pytest.raises(ValueError, match="no source_id"): + client.reply(msg, "won't send") + + +def test_reply_propagates_http_error(client: RemoteAgentClient): + msg = InboundMessage(activity_id="act-6", source="canvas_user", source_id="user", text="hi") + client._session.post.return_value = FakeResponse(500) + with pytest.raises(requests.HTTPError): + client.reply(msg, "boom") + + +# --------------------------------------------------------------------------- +# PollDelivery +# --------------------------------------------------------------------------- + + +def test_poll_delivery_run_once_advances_cursor(client: RemoteAgentClient): + rows = [ + {"id": "act-1", "data": {"source": "canvas_user", "text": "a"}}, + {"id": "act-2", "data": {"source": "canvas_user", "text": "b"}}, + ] + client._session.get.return_value = FakeResponse(200, rows) + delivery = PollDelivery(client, interval=0.0) + + received: list[str] = [] + + def handler(msg: InboundMessage, _client: RemoteAgentClient): + received.append(msg.text) + return None # no reply + + n = delivery.run_once(handler) + assert n == 2 + assert received == ["a", "b"] + assert delivery.cursor == "act-2" + + +def test_poll_delivery_handler_exception_advances_and_continues( + client: RemoteAgentClient, caplog +): + rows = [ + {"id": "act-1", "data": {"source": "canvas_user", "text": "poison"}}, + {"id": "act-2", "data": {"source": "canvas_user", "text": "next"}}, + ] + client._session.get.return_value = FakeResponse(200, rows) + delivery = PollDelivery(client, interval=0.0) + + seen: list[str] = [] + + def handler(msg, _c): + seen.append(msg.text) + if msg.text == "poison": + raise RuntimeError("kaboom") + return None + + n = delivery.run_once(handler) + # Both messages should be dispatched even though the first raised. + assert n == 2 + assert seen == ["poison", "next"] + # Cursor advances past the failure so we don't get stuck on poison forever. + assert delivery.cursor == "act-2" + + +def test_poll_delivery_async_handler_awaited(client: RemoteAgentClient): + rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "ahoy"}}] + client._session.get.return_value = FakeResponse(200, rows) + delivery = PollDelivery(client, interval=0.0) + + seen: list[str] = [] + + async def async_handler(msg, _c): + await asyncio.sleep(0) + seen.append(msg.text) + return None + + n = delivery.run_once(async_handler) + assert n == 1 + assert seen == ["ahoy"] + + +def test_poll_delivery_handler_returns_text_triggers_reply(client: RemoteAgentClient): + rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}] + # First mock the GET (fetch_inbound), then the POST (reply). + client._session.get.return_value = FakeResponse(200, rows) + client._session.post.return_value = FakeResponse(200, {"status": "sent"}) + + delivery = PollDelivery(client, interval=0.0) + + def handler(msg, _c): + return f"echo:{msg.text}" + + n = delivery.run_once(handler) + assert n == 1 + # /notify should have been called with the echo body. + post_call = client._session.post.call_args + assert "/notify" in post_call.args[0] + assert post_call.kwargs["json"] == {"message": "echo:hi"} + + +def test_poll_delivery_handler_returns_none_no_reply(client: RemoteAgentClient): + rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}] + client._session.get.return_value = FakeResponse(200, rows) + delivery = PollDelivery(client, interval=0.0) + + def handler(_msg, _c): + return None + + delivery.run_once(handler) + client._session.post.assert_not_called() + + +def test_poll_delivery_410_resets_cursor(client: RemoteAgentClient): + delivery = PollDelivery(client, interval=0.0) + delivery._cursor = "act-stale" + + client._session.get.return_value = FakeResponse(410, {"error": "gone"}) + n = delivery.run_once(lambda *_: None) + + # No messages dispatched, cursor reset to None. + assert n == 0 + assert delivery.cursor is None + + +def test_poll_delivery_cursor_file_persistence( + client: RemoteAgentClient, tmp_path: Path +): + cursor_file = tmp_path / "cursor" + rows = [{"id": "act-XYZ", "data": {"source": "canvas_user", "text": "hi"}}] + client._session.get.return_value = FakeResponse(200, rows) + + delivery = PollDelivery(client, interval=0.0, cursor_file=cursor_file) + assert delivery.cursor is None # nothing on disk yet + + delivery.run_once(lambda *_: None) + assert cursor_file.read_text() == "act-XYZ" + + # New delivery instance reads the cursor from disk. + fresh = PollDelivery(client, interval=0.0, cursor_file=cursor_file) + assert fresh.cursor == "act-XYZ" + + +def test_poll_delivery_stop_makes_run_once_noop(client: RemoteAgentClient): + delivery = PollDelivery(client, interval=0.0) + delivery.stop() + + n = delivery.run_once(lambda *_: None) + assert n == 0 + # GET should not have been issued. + client._session.get.assert_not_called() + + +# --------------------------------------------------------------------------- +# PushDelivery +# --------------------------------------------------------------------------- + + +def test_push_delivery_run_once_is_noop(client: RemoteAgentClient): + fake_server = MagicMock() + delivery = PushDelivery(client, fake_server) + n = delivery.run_once(lambda *_: None) + assert n == 0 + + +def test_push_delivery_stop_calls_server_stop(client: RemoteAgentClient): + fake_server = MagicMock() + delivery = PushDelivery(client, fake_server) + delivery.stop() + fake_server.stop.assert_called_once() + + +def test_push_delivery_stop_swallows_server_exception( + client: RemoteAgentClient, caplog +): + fake_server = MagicMock() + fake_server.stop.side_effect = RuntimeError("server down hard") + delivery = PushDelivery(client, fake_server) + # Should not raise. + delivery.stop() + + +# --------------------------------------------------------------------------- +# run_agent_loop +# --------------------------------------------------------------------------- + + +def _stub_state(client: RemoteAgentClient, paused=False, deleted=False, status="online"): + """Make poll_state return a stub WorkspaceState.""" + client.poll_state = MagicMock( # type: ignore[method-assign] + return_value=WorkspaceState( + workspace_id=client.workspace_id, + status=status, + paused=paused, + deleted=deleted, + ) + ) + + +def test_run_agent_loop_exits_on_paused(client: RemoteAgentClient, monkeypatch): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + terminal = client.run_agent_loop(lambda *_: None, delivery=delivery) + assert terminal == "paused" + delivery.stop.assert_called_once() + + +def test_run_agent_loop_exits_on_deleted(client: RemoteAgentClient, monkeypatch): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, deleted=True, status="removed") + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + terminal = client.run_agent_loop(lambda *_: None, delivery=delivery) + assert terminal == "removed" + + +def test_run_agent_loop_max_iterations(client: RemoteAgentClient, monkeypatch): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client) # online forever + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + terminal = client.run_agent_loop(lambda *_: None, delivery=delivery, max_iterations=3) + assert terminal == "max_iterations" + assert delivery.run_once.call_count == 3 + assert client.heartbeat.call_count == 3 + + +def test_run_agent_loop_default_delivery_is_poll(client: RemoteAgentClient, monkeypatch): + """When delivery=None, run_agent_loop should construct a PollDelivery.""" + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + # fetch_inbound returns an empty list once for the default-poll path. + client.fetch_inbound = MagicMock(return_value=[]) # type: ignore[method-assign] + + terminal = client.run_agent_loop(lambda *_: None) + assert terminal == "paused" + client.fetch_inbound.assert_called() + + +def test_run_agent_loop_swallows_heartbeat_exception( + client: RemoteAgentClient, monkeypatch +): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock(side_effect=RuntimeError("hb down")) # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + terminal = client.run_agent_loop(lambda *_: None, delivery=delivery) + # Heartbeat failure does NOT stop the loop — we still detect 'paused'. + assert terminal == "paused" + + +def test_run_agent_loop_swallows_delivery_exception( + client: RemoteAgentClient, monkeypatch +): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + delivery = MagicMock() + delivery.run_once.side_effect = RuntimeError("delivery exploded") + delivery.interval = 0.0 + + terminal = client.run_agent_loop(lambda *_: None, delivery=delivery) + # Delivery failure logged + continued; loop still exits cleanly on paused. + assert terminal == "paused" + + +def test_run_agent_loop_uses_min_of_intervals(client: RemoteAgentClient, monkeypatch): + """The loop should sleep min(heartbeat_interval, delivery.interval).""" + sleeps: list[float] = [] + monkeypatch.setattr("time.sleep", lambda s: sleeps.append(s)) + client.heartbeat_interval = 30.0 + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client) # online; uses max_iterations to exit + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 5.0 + + client.run_agent_loop(lambda *_: None, delivery=delivery, max_iterations=2) + assert sleeps == [5.0, 5.0] + + +def test_run_agent_loop_calls_task_supplier(client: RemoteAgentClient, monkeypatch): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + def supplier(): + return {"current_task": "doing-thing", "active_tasks": 2} + + client.run_agent_loop(lambda *_: None, delivery=delivery, task_supplier=supplier) + # Heartbeat receives the supplied report. + hb_kwargs = client.heartbeat.call_args.kwargs + assert hb_kwargs["current_task"] == "doing-thing" + assert hb_kwargs["active_tasks"] == 2 + + +def test_run_agent_loop_swallows_task_supplier_exception( + client: RemoteAgentClient, monkeypatch +): + monkeypatch.setattr("time.sleep", lambda _s: None) + client.heartbeat = MagicMock() # type: ignore[method-assign] + _stub_state(client, paused=True, status="paused") + delivery = MagicMock() + delivery.run_once.return_value = 0 + delivery.interval = 0.0 + + def supplier(): + raise RuntimeError("supplier broken") + + terminal = client.run_agent_loop( + lambda *_: None, delivery=delivery, task_supplier=supplier + ) + assert terminal == "paused" + # Heartbeat called with empty task fields (the default when supplier fails). + hb_kwargs = client.heartbeat.call_args.kwargs + assert hb_kwargs["current_task"] == "" + assert hb_kwargs["active_tasks"] == 0