feat(sdk): add peer_id and before_ts filter params to fetch_inbound() #6
@ -201,23 +201,15 @@ reference it directly.
|
||||
`POST /registry/register` is idempotent — it won't mint a second token for
|
||||
a workspace that already has one).
|
||||
|
||||
- **`fetch_inbound()` does not expose `peer_id` or `before_ts` filters.**
|
||||
As of CP PRs #2472 and #2476 (merged 2026-05-02), the platform's
|
||||
`GET /workspaces/:id/activity` route accepts:
|
||||
- `peer_id=<uuid>` — narrow to events from one specific peer workspace
|
||||
- `before_ts=<RFC3339>` — fetch a backlog window ending before a wall-clock cut-off
|
||||
`RemoteAgentClient.fetch_inbound()` only forwards `since_id`, `limit`, and
|
||||
`type` today. Workaround: call the activity endpoint directly via
|
||||
`client._session.get(...)` with the extra params, or filter in-process from
|
||||
the parsed `InboundMessage.source_id` / `InboundMessage.raw["ts"]`. A
|
||||
follow-up PR will add typed parameters.
|
||||
- **`fetch_inbound()` `peer_id` and `before_ts` filters (resolved).**
|
||||
`RemoteAgentClient.fetch_inbound()` now accepts `peer_id` (narrow to events
|
||||
from a specific peer workspace) and `before_ts` (RFC3339 cutoff for backlog
|
||||
replay) as optional parameters. Both are forwarded to the activity endpoint.
|
||||
|
||||
- **`InboundMessage` does not yet surface `peer_name`, `peer_role`, or `agent_card_url`.**
|
||||
These three enrichment fields landed on the platform push envelope on
|
||||
2026-05-02 and live under `msg.raw["data"]`. A typed wrapper is the right
|
||||
shape but is intentionally deferred — this README PR is docs-only. Until
|
||||
then, read them from the raw dict and handle the missing-key case
|
||||
(registry lookup may fail for peers that haven't registered yet).
|
||||
- **`InboundMessage` peer enrichment fields (resolved).**
|
||||
`InboundMessage` now exposes `peer_name`, `peer_role`, and `agent_card_url`
|
||||
as typed string attributes (default ``""`` when absent). The `raw` dict
|
||||
remains available for any future envelope fields not yet wrapped.
|
||||
|
||||
- **Tenant + Origin headers are not auto-injected.** When the platform is
|
||||
deployed multi-tenant on the SaaS edge (`*.staging.moleculesai.app`,
|
||||
|
||||
@ -699,10 +699,12 @@ class RemoteAgentClient:
|
||||
since_id: str | None = None,
|
||||
limit: int = 100,
|
||||
type: str = "a2a_receive",
|
||||
peer_id: str | None = None,
|
||||
before_ts: str | None = None,
|
||||
) -> list["InboundMessage"]:
|
||||
"""Fetch one batch of inbound A2A activity rows.
|
||||
|
||||
Hits ``GET /workspaces/:id/activity?type=…&since_id=…&limit=…``.
|
||||
Hits ``GET /workspaces/:id/activity?type=…&since_id=…&peer_id=…&before_ts=…&limit=…``.
|
||||
Returns the rows newer than ``since_id`` in oldest-first order,
|
||||
parsed into :class:`~molecule_agent.inbound.InboundMessage`.
|
||||
|
||||
@ -718,6 +720,13 @@ class RemoteAgentClient:
|
||||
type: Activity-row type filter. Default ``"a2a_receive"``;
|
||||
pass another type to consume different streams (e.g.
|
||||
``"workspace_state_changed"``).
|
||||
peer_id: Narrow to events sourced from a specific peer workspace.
|
||||
Pass a workspace UUID to receive only A2A messages from that
|
||||
peer. ``None`` (default) accepts messages from any source.
|
||||
before_ts: RFC3339 timestamp — only rows older than this cutoff
|
||||
are returned. Use to fetch a historical backlog window (e.g.
|
||||
replaying missed messages after a restart). ``None`` means
|
||||
no upper-bound cutoff; the server returns the most recent rows.
|
||||
|
||||
Returns:
|
||||
List of :class:`InboundMessage`, oldest first. May be empty.
|
||||
@ -735,6 +744,10 @@ class RemoteAgentClient:
|
||||
params: dict[str, str] = {"type": type, "limit": str(int(limit))}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
if peer_id:
|
||||
params["peer_id"] = peer_id
|
||||
if before_ts:
|
||||
params["before_ts"] = before_ts
|
||||
url = f"{self.platform_url}/workspaces/{self.workspace_id}/activity"
|
||||
resp = self._session.get(
|
||||
url,
|
||||
|
||||
@ -327,6 +327,61 @@ def test_fetch_inbound_429_retries_via_get_with_retry(
|
||||
assert client._session.get.call_count == 2
|
||||
|
||||
|
||||
def test_fetch_inbound_peer_id_filter():
|
||||
"""peer_id param is forwarded to the GET as a query parameter."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-peer", "source_id": "peer-ops", "data": {"source": "peer_agent", "text": "hi"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(peer_id="peer-ops")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["peer_id"] == "peer-ops"
|
||||
|
||||
|
||||
def test_fetch_inbound_before_ts_filter():
|
||||
"""before_ts param is forwarded to the GET as a query parameter."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-old", "data": {"source": "canvas_user", "text": "hi"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(before_ts="2026-05-01T00:00:00Z")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["before_ts"] == "2026-05-01T00:00:00Z"
|
||||
|
||||
|
||||
def test_fetch_inbound_combined_filters():
|
||||
"""peer_id and before_ts can be used together."""
|
||||
session = MagicMock()
|
||||
rows = [{"id": "act-combo", "source_id": "peer-x", "data": {"source": "peer_agent", "text": "combo"}}]
|
||||
session.get.return_value = FakeResponse(200, rows)
|
||||
from molecule_agent import RemoteAgentClient
|
||||
client = RemoteAgentClient(
|
||||
workspace_id="ws-abc",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "t"},
|
||||
session=session,
|
||||
)
|
||||
client.save_token("tok")
|
||||
client.fetch_inbound(peer_id="peer-x", before_ts="2026-05-09T12:00:00Z")
|
||||
params = session.get.call_args.kwargs["params"]
|
||||
assert params["peer_id"] == "peer-x"
|
||||
assert params["before_ts"] == "2026-05-09T12:00:00Z"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# reply()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user