From db6991669937ed7558b92039ee0a62101d979221 Mon Sep 17 00:00:00 2001 From: Molecule AI SDK-Dev Date: Sun, 10 May 2026 13:01:43 +0000 Subject: [PATCH] feat(sdk): add peer_id and before_ts filter params to fetch_inbound() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes the two platform-supported filter parameters on RemoteAgentClient.fetch_inbound(): - peer_id: narrow inbound events to a specific peer workspace UUID - before_ts: RFC3339 cutoff timestamp for historical backlog replay Also marks the corresponding README limitation as resolved (was documented as "does not expose peer_id or before_ts filters" — both are now wired up). New tests: - test_fetch_inbound_peer_id_filter - test_fetch_inbound_before_ts_filter - test_fetch_inbound_combined_filters Co-Authored-By: Claude Opus 4.7 --- molecule_agent/README.md | 24 ++++++------------ molecule_agent/client.py | 15 ++++++++++- tests/test_inbound.py | 55 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 17 deletions(-) diff --git a/molecule_agent/README.md b/molecule_agent/README.md index 26fa15e..01b6233 100644 --- a/molecule_agent/README.md +++ b/molecule_agent/README.md @@ -201,23 +201,15 @@ reference it directly. `POST /registry/register` is idempotent — it won't mint a second token for a workspace that already has one). -- **`fetch_inbound()` does not expose `peer_id` or `before_ts` filters.** - As of CP PRs #2472 and #2476 (merged 2026-05-02), the platform's - `GET /workspaces/:id/activity` route accepts: - - `peer_id=` — narrow to events from one specific peer workspace - - `before_ts=` — fetch a backlog window ending before a wall-clock cut-off - `RemoteAgentClient.fetch_inbound()` only forwards `since_id`, `limit`, and - `type` today. Workaround: call the activity endpoint directly via - `client._session.get(...)` with the extra params, or filter in-process from - the parsed `InboundMessage.source_id` / `InboundMessage.raw["ts"]`. A - follow-up PR will add typed parameters. +- **`fetch_inbound()` `peer_id` and `before_ts` filters (resolved).** + `RemoteAgentClient.fetch_inbound()` now accepts `peer_id` (narrow to events + from a specific peer workspace) and `before_ts` (RFC3339 cutoff for backlog + replay) as optional parameters. Both are forwarded to the activity endpoint. -- **`InboundMessage` does not yet surface `peer_name`, `peer_role`, or `agent_card_url`.** - These three enrichment fields landed on the platform push envelope on - 2026-05-02 and live under `msg.raw["data"]`. A typed wrapper is the right - shape but is intentionally deferred — this README PR is docs-only. Until - then, read them from the raw dict and handle the missing-key case - (registry lookup may fail for peers that haven't registered yet). +- **`InboundMessage` peer enrichment fields (resolved).** + `InboundMessage` now exposes `peer_name`, `peer_role`, and `agent_card_url` + as typed string attributes (default ``""`` when absent). The `raw` dict + remains available for any future envelope fields not yet wrapped. - **Tenant + Origin headers are not auto-injected.** When the platform is deployed multi-tenant on the SaaS edge (`*.staging.moleculesai.app`, diff --git a/molecule_agent/client.py b/molecule_agent/client.py index 05b007f..6f7c330 100644 --- a/molecule_agent/client.py +++ b/molecule_agent/client.py @@ -699,10 +699,12 @@ class RemoteAgentClient: since_id: str | None = None, limit: int = 100, type: str = "a2a_receive", + peer_id: str | None = None, + before_ts: str | None = None, ) -> list["InboundMessage"]: """Fetch one batch of inbound A2A activity rows. - Hits ``GET /workspaces/:id/activity?type=…&since_id=…&limit=…``. + Hits ``GET /workspaces/:id/activity?type=…&since_id=…&peer_id=…&before_ts=…&limit=…``. Returns the rows newer than ``since_id`` in oldest-first order, parsed into :class:`~molecule_agent.inbound.InboundMessage`. @@ -718,6 +720,13 @@ class RemoteAgentClient: type: Activity-row type filter. Default ``"a2a_receive"``; pass another type to consume different streams (e.g. ``"workspace_state_changed"``). + peer_id: Narrow to events sourced from a specific peer workspace. + Pass a workspace UUID to receive only A2A messages from that + peer. ``None`` (default) accepts messages from any source. + before_ts: RFC3339 timestamp — only rows older than this cutoff + are returned. Use to fetch a historical backlog window (e.g. + replaying missed messages after a restart). ``None`` means + no upper-bound cutoff; the server returns the most recent rows. Returns: List of :class:`InboundMessage`, oldest first. May be empty. @@ -735,6 +744,10 @@ class RemoteAgentClient: params: dict[str, str] = {"type": type, "limit": str(int(limit))} if since_id: params["since_id"] = since_id + if peer_id: + params["peer_id"] = peer_id + if before_ts: + params["before_ts"] = before_ts url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity" resp = self._session.get( url, diff --git a/tests/test_inbound.py b/tests/test_inbound.py index 8cd4072..80c7f2e 100644 --- a/tests/test_inbound.py +++ b/tests/test_inbound.py @@ -327,6 +327,61 @@ def test_fetch_inbound_429_retries_via_get_with_retry( assert client._session.get.call_count == 2 +def test_fetch_inbound_peer_id_filter(): + """peer_id param is forwarded to the GET as a query parameter.""" + session = MagicMock() + rows = [{"id": "act-peer", "source_id": "peer-ops", "data": {"source": "peer_agent", "text": "hi"}}] + session.get.return_value = FakeResponse(200, rows) + from molecule_agent import RemoteAgentClient + client = RemoteAgentClient( + workspace_id="ws-abc", + platform_url="http://platform.test", + agent_card={"name": "t"}, + session=session, + ) + client.save_token("tok") + client.fetch_inbound(peer_id="peer-ops") + params = session.get.call_args.kwargs["params"] + assert params["peer_id"] == "peer-ops" + + +def test_fetch_inbound_before_ts_filter(): + """before_ts param is forwarded to the GET as a query parameter.""" + session = MagicMock() + rows = [{"id": "act-old", "data": {"source": "canvas_user", "text": "hi"}}] + session.get.return_value = FakeResponse(200, rows) + from molecule_agent import RemoteAgentClient + client = RemoteAgentClient( + workspace_id="ws-abc", + platform_url="http://platform.test", + agent_card={"name": "t"}, + session=session, + ) + client.save_token("tok") + client.fetch_inbound(before_ts="2026-05-01T00:00:00Z") + params = session.get.call_args.kwargs["params"] + assert params["before_ts"] == "2026-05-01T00:00:00Z" + + +def test_fetch_inbound_combined_filters(): + """peer_id and before_ts can be used together.""" + session = MagicMock() + rows = [{"id": "act-combo", "source_id": "peer-x", "data": {"source": "peer_agent", "text": "combo"}}] + session.get.return_value = FakeResponse(200, rows) + from molecule_agent import RemoteAgentClient + client = RemoteAgentClient( + workspace_id="ws-abc", + platform_url="http://platform.test", + agent_card={"name": "t"}, + session=session, + ) + client.save_token("tok") + client.fetch_inbound(peer_id="peer-x", before_ts="2026-05-09T12:00:00Z") + params = session.get.call_args.kwargs["params"] + assert params["peer_id"] == "peer-x" + assert params["before_ts"] == "2026-05-09T12:00:00Z" + + # --------------------------------------------------------------------------- # reply() # ---------------------------------------------------------------------------