Merge pull request 'fix(sdk): surface peer_name/peer_role/agent_card_url in InboundMessage; resolve KI-001' (#5) from fix/inbound-peer-metadata-ki001-resolved into main
This commit is contained in:
commit
5db6ade4e4
@ -28,27 +28,43 @@ Format per entry:
|
||||
|
||||
## KI-001 — RemoteAgentClient does not implement inbound A2A server
|
||||
|
||||
**File:** `molecule_agent/client.py`
|
||||
**Status:** Known limitation; not yet implemented
|
||||
**File:** `molecule_agent/client.py`, `molecule_agent/a2a_server.py`, `molecule_agent/inbound.py`
|
||||
**Status:** ✅ Resolved
|
||||
**Severity:** Medium
|
||||
**Platform phase:** Phase 30.8b
|
||||
|
||||
### Symptom
|
||||
`RemoteAgentClient` can call other workspaces via A2A (outbound), but cannot
|
||||
receive inbound A2A calls. Any workspace that tries to delegate to or message
|
||||
this agent will get a connection refused or timeout.
|
||||
### Resolution
|
||||
The SDK now ships two inbound delivery paths:
|
||||
|
||||
### Impact
|
||||
Agents running outside the platform's Docker network via `molecule_agent` are
|
||||
one-directional. Platform agents cannot push work to them — the remote agent
|
||||
must poll or be provisioned with a publicly reachable webhook endpoint.
|
||||
**Push mode (`A2AServer`)** — `molecule_agent.a2a_server.A2AServer` exposes an HTTP
|
||||
server with a `POST /a2a/inbound` endpoint. It runs in a background daemon thread
|
||||
alongside the client's heartbeat loop. Use with `PushDelivery` from `inbound.py`:
|
||||
|
||||
### Suggested fix
|
||||
Add an `A2AServerMixin` class that exposes a `FastAPI` or `flask` route
|
||||
(`POST /a2a/inbound`) and runs in a background thread alongside the client's
|
||||
heartbeat loop. Register the inbound URL with the platform via the
|
||||
`/registry/discover` update endpoint when the server starts. See Phase 30.8b
|
||||
in the platform `PLAN.md`.
|
||||
```python
|
||||
from molecule_agent import RemoteAgentClient, A2AServer
|
||||
from molecule_agent.inbound import PushDelivery
|
||||
|
||||
server = A2AServer(agent_id=workspace_id, inbound_url="https://...", message_handler=my_handler)
|
||||
server.start_in_background()
|
||||
client = RemoteAgentClient(workspace_id=workspace_id, platform_url=...)
|
||||
client.reported_url = server.inbound_url # register with this URL
|
||||
client.register()
|
||||
# Pass PushDelivery so run_agent_loop doesn't also poll
|
||||
client.run_agent_loop(handler=my_handler, delivery=PushDelivery(client, server))
|
||||
```
|
||||
|
||||
**Poll mode (`PollDelivery`)** — for agents behind NAT or without a public endpoint,
|
||||
the SDK's `PollDelivery` polls `GET /workspaces/:id/activity` on a configurable
|
||||
interval (default 5s). Both paths feed the same `MessageHandler` callback.
|
||||
`run_agent_loop` picks `PollDelivery` automatically when no explicit delivery is passed.
|
||||
|
||||
### Files added
|
||||
- `molecule_agent/a2a_server.py` — `A2AServer` class; `HTTPServer` + `_A2AHandler`
|
||||
running in a daemon thread; handles `POST /a2a/inbound`, async/sync handlers,
|
||||
graceful stop.
|
||||
- `molecule_agent/inbound.py` — `InboundDelivery` protocol, `PollDelivery`,
|
||||
`PushDelivery` (wraps `A2AServer`), `InboundMessage`, `MessageHandler`.
|
||||
- `RemoteAgentClient.run_agent_loop` updated to accept any `InboundDelivery`.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@ -67,6 +67,12 @@ class InboundMessage:
|
||||
* ``unknown`` — the activity row didn't carry a recognizable source.
|
||||
:py:meth:`RemoteAgentClient.reply` raises ``ValueError`` rather than
|
||||
guess.
|
||||
|
||||
``peer_name``, ``peer_role``, and ``agent_card_url`` are enrichment fields
|
||||
added to the channel envelope on 2026-05-02 (CP PRs #2472, #2476). They
|
||||
are resolved from the platform's registry at delivery time and may be absent
|
||||
if the registry lookup failed or the sender hasn't registered yet. Always
|
||||
check for absence rather than assuming they are populated.
|
||||
"""
|
||||
|
||||
activity_id: str
|
||||
@ -74,6 +80,12 @@ class InboundMessage:
|
||||
source_id: str
|
||||
text: str
|
||||
raw: dict[str, Any] = field(default_factory=dict)
|
||||
# Enrichment fields — populated from row["data"]["peer_name"],
|
||||
# row["data"]["peer_role"], row["data"]["agent_card_url"].
|
||||
# May be empty strings if the registry lookup failed.
|
||||
peer_name: str = ""
|
||||
peer_role: str = ""
|
||||
agent_card_url: str = ""
|
||||
|
||||
|
||||
class CursorLostError(Exception):
|
||||
@ -134,6 +146,9 @@ def _parse_activity_row(row: dict[str, Any]) -> InboundMessage | None:
|
||||
source_id=source_id,
|
||||
text=text,
|
||||
raw=row,
|
||||
peer_name=str(data.get("peer_name") or ""),
|
||||
peer_role=str(data.get("peer_role") or ""),
|
||||
agent_card_url=str(data.get("agent_card_url") or ""),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -153,6 +153,83 @@ def test_parse_activity_row_text_alt_key():
|
||||
assert msg.text == "alt"
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields():
|
||||
"""peer_name, peer_role, agent_card_url are extracted from row["data"]."""
|
||||
row = {
|
||||
"id": "act-enriched",
|
||||
"source_id": "peer-ops-01",
|
||||
"data": {
|
||||
"source": "peer_agent",
|
||||
"text": "status report",
|
||||
"peer_name": "ops-agent",
|
||||
"peer_role": "sre",
|
||||
"agent_card_url": "https://platform.example/registry/discover/peer-ops-01",
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == "ops-agent"
|
||||
assert msg.peer_role == "sre"
|
||||
assert msg.agent_card_url == "https://platform.example/registry/discover/peer-ops-01"
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields_absent():
|
||||
"""When enrichment fields are absent, InboundMessage fields default to ""."""
|
||||
row = {
|
||||
"id": "act-no-enrich",
|
||||
"source_id": "peer-x",
|
||||
"data": {"source": "peer_agent", "text": "hello"},
|
||||
# no peer_name, peer_role, agent_card_url
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == ""
|
||||
assert msg.peer_role == ""
|
||||
assert msg.agent_card_url == ""
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_fields_null_becomes_empty():
|
||||
"""null values in enrichment fields become "" (not the string "None")."""
|
||||
row = {
|
||||
"id": "act-null-enrich",
|
||||
"source_id": "peer-y",
|
||||
"data": {
|
||||
"source": "peer_agent",
|
||||
"text": "ping",
|
||||
"peer_name": None,
|
||||
"peer_role": None,
|
||||
"agent_card_url": None,
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.peer_name == ""
|
||||
assert msg.peer_role == ""
|
||||
assert msg.agent_card_url == ""
|
||||
|
||||
|
||||
def test_parse_activity_row_enrichment_in_canvas_user_row():
|
||||
"""Enrichment fields are parsed even when source is canvas_user (edge case
|
||||
where the platform enriches the row even for user-sourced messages)."""
|
||||
row = {
|
||||
"id": "act-user-enrich",
|
||||
"source_id": "user",
|
||||
"data": {
|
||||
"source": "canvas_user",
|
||||
"text": "hi",
|
||||
"peer_name": "someone",
|
||||
"peer_role": "human",
|
||||
"agent_card_url": "https://platform.example/registry/discover/user-uuid",
|
||||
},
|
||||
}
|
||||
msg = _parse_activity_row(row)
|
||||
assert msg is not None
|
||||
assert msg.source == "canvas_user"
|
||||
assert msg.peer_name == "someone"
|
||||
assert msg.peer_role == "human"
|
||||
assert msg.agent_card_url == "https://platform.example/registry/discover/user-uuid"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# fetch_inbound
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user