Merge pull request 'feat(sdk): add peer_id and before_ts filter params to fetch_inbound()' (#6) from feat/fetch-inbound-peer-ts-filters into main
Some checks are pending
Test / test (3.11) (push) Waiting to run
Test / test (3.12) (push) Waiting to run
Test / test (3.13) (push) Waiting to run

This commit is contained in:
Molecule AI · sdk-lead 2026-05-10 13:25:33 +00:00
commit e8b9d42fe6
3 changed files with 77 additions and 17 deletions

View File

@ -201,23 +201,15 @@ reference it directly.
`POST /registry/register` is idempotent — it won't mint a second token for
a workspace that already has one).
- **`fetch_inbound()` does not expose `peer_id` or `before_ts` filters.**
As of CP PRs #2472 and #2476 (merged 2026-05-02), the platform's
`GET /workspaces/:id/activity` route accepts:
- `peer_id=<uuid>` — narrow to events from one specific peer workspace
- `before_ts=<RFC3339>` — fetch a backlog window ending before a wall-clock cut-off
`RemoteAgentClient.fetch_inbound()` only forwards `since_id`, `limit`, and
`type` today. Workaround: call the activity endpoint directly via
`client._session.get(...)` with the extra params, or filter in-process from
the parsed `InboundMessage.source_id` / `InboundMessage.raw["ts"]`. A
follow-up PR will add typed parameters.
- **`fetch_inbound()` `peer_id` and `before_ts` filters (resolved).**
`RemoteAgentClient.fetch_inbound()` now accepts `peer_id` (narrow to events
from a specific peer workspace) and `before_ts` (RFC3339 cutoff for backlog
replay) as optional parameters. Both are forwarded to the activity endpoint.
- **`InboundMessage` does not yet surface `peer_name`, `peer_role`, or `agent_card_url`.**
These three enrichment fields landed on the platform push envelope on
2026-05-02 and live under `msg.raw["data"]`. A typed wrapper is the right
shape but is intentionally deferred — this README PR is docs-only. Until
then, read them from the raw dict and handle the missing-key case
(registry lookup may fail for peers that haven't registered yet).
- **`InboundMessage` peer enrichment fields (resolved).**
`InboundMessage` now exposes `peer_name`, `peer_role`, and `agent_card_url`
as typed string attributes (default ``""`` when absent). The `raw` dict
remains available for any future envelope fields not yet wrapped.
- **Tenant + Origin headers are not auto-injected.** When the platform is
deployed multi-tenant on the SaaS edge (`*.staging.moleculesai.app`,

View File

@ -699,10 +699,12 @@ class RemoteAgentClient:
since_id: str | None = None,
limit: int = 100,
type: str = "a2a_receive",
peer_id: str | None = None,
before_ts: str | None = None,
) -> list["InboundMessage"]:
"""Fetch one batch of inbound A2A activity rows.
Hits ``GET /workspaces/:id/activity?type=&since_id=&limit=``.
Hits ``GET /workspaces/:id/activity?type=&since_id=&peer_id=&before_ts=&limit=``.
Returns the rows newer than ``since_id`` in oldest-first order,
parsed into :class:`~molecule_agent.inbound.InboundMessage`.
@ -718,6 +720,13 @@ class RemoteAgentClient:
type: Activity-row type filter. Default ``"a2a_receive"``;
pass another type to consume different streams (e.g.
``"workspace_state_changed"``).
peer_id: Narrow to events sourced from a specific peer workspace.
Pass a workspace UUID to receive only A2A messages from that
peer. ``None`` (default) accepts messages from any source.
before_ts: RFC3339 timestamp only rows older than this cutoff
are returned. Use to fetch a historical backlog window (e.g.
replaying missed messages after a restart). ``None`` means
no upper-bound cutoff; the server returns the most recent rows.
Returns:
List of :class:`InboundMessage`, oldest first. May be empty.
@ -735,6 +744,10 @@ class RemoteAgentClient:
params: dict[str, str] = {"type": type, "limit": str(int(limit))}
if since_id:
params["since_id"] = since_id
if peer_id:
params["peer_id"] = peer_id
if before_ts:
params["before_ts"] = before_ts
url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity"
resp = self._session.get(
url,

View File

@ -327,6 +327,61 @@ def test_fetch_inbound_429_retries_via_get_with_retry(
assert client._session.get.call_count == 2
def test_fetch_inbound_peer_id_filter():
"""peer_id param is forwarded to the GET as a query parameter."""
session = MagicMock()
rows = [{"id": "act-peer", "source_id": "peer-ops", "data": {"source": "peer_agent", "text": "hi"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(peer_id="peer-ops")
params = session.get.call_args.kwargs["params"]
assert params["peer_id"] == "peer-ops"
def test_fetch_inbound_before_ts_filter():
"""before_ts param is forwarded to the GET as a query parameter."""
session = MagicMock()
rows = [{"id": "act-old", "data": {"source": "canvas_user", "text": "hi"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(before_ts="2026-05-01T00:00:00Z")
params = session.get.call_args.kwargs["params"]
assert params["before_ts"] == "2026-05-01T00:00:00Z"
def test_fetch_inbound_combined_filters():
"""peer_id and before_ts can be used together."""
session = MagicMock()
rows = [{"id": "act-combo", "source_id": "peer-x", "data": {"source": "peer_agent", "text": "combo"}}]
session.get.return_value = FakeResponse(200, rows)
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="ws-abc",
platform_url="http://platform.test",
agent_card={"name": "t"},
session=session,
)
client.save_token("tok")
client.fetch_inbound(peer_id="peer-x", before_ts="2026-05-09T12:00:00Z")
params = session.get.call_args.kwargs["params"]
assert params["peer_id"] == "peer-x"
assert params["before_ts"] == "2026-05-09T12:00:00Z"
# ---------------------------------------------------------------------------
# reply()
# ---------------------------------------------------------------------------