molecule-core/workspace-template/a2a_client.py
Dev Lead Agent 6c78962a33 fix(security): Cycle 5 — auth middleware, injection hardening, skill sandbox
Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
  WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
  ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
  secrets.Values: workspaces with no live token are grandfathered through.
  Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.

Fix A — platform/internal/router/router.go:
  Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
  and /a2a remain on root router; all other /workspaces/:id/* sub-routes
  moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
  CORS AllowHeaders updated to include Authorization so browser/agent callers
  can send the bearer token cross-origin.

Fix B — workspace-template/heartbeat.py:
  _check_delegations(): validate source_id == self.workspace_id before
  accepting a delegation result. Attacker-crafted records with a foreign
  source_id are silently skipped with a WARNING log (injection attempt).
  trigger_msg no longer embeds raw response_preview text; references
  delegation_id + status only — removes the prompt-injection vector.

Fix C — workspace-template/skill_loader/loader.py:
  load_skill_tools(): before exec_module(), verify script is within
  scripts_dir (path traversal guard) and temporarily scrub sensitive env
  vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
  WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
  in finally block. Defence-in-depth even if /plugins auth gate is bypassed.

Fix D — platform/internal/handlers/socket.go:
  HandleConnect(): agent connections (X-Workspace-ID present) validated via
  wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
  Canvas clients (no X-Workspace-ID) remain unauthenticated.

Fix D — workspace-template/events.py:
  PlatformEventSubscriber._connect(): include platform_auth bearer token in
  WebSocket upgrade headers alongside X-Workspace-ID.

Fix E — workspace-template/executor_helpers.py:
  recall_memories() and commit_memory() now pass platform_auth bearer token
  in Authorization header so WorkspaceAuth middleware allows access.

Fix F — workspace-template/a2a_client.py:
  send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
  write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.

Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 04:44:42 +00:00

103 lines
3.8 KiB
Python

"""A2A protocol client — peer discovery, messaging, and workspace info.
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import logging
import os
import uuid
import httpx
logger = logging.getLogger(__name__)
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
async def discover_peer(target_id: str) -> dict | None:
"""Discover a peer workspace's URL via the platform registry."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{target_id}",
headers={"X-Workspace-ID": WORKSPACE_ID},
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
logger.error(f"Discovery failed for {target_id}: {e}")
return None
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace."""
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try:
resp = await client.post(
target_url,
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
return f"{_A2A_ERROR_PREFIX}{data['error'].get('message', 'unknown')}"
return str(data)
except Exception as e:
return f"{_A2A_ERROR_PREFIX}{e}"
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers")
if resp.status_code == 200:
return resp.json()
return []
except Exception:
return []
async def get_workspace_info() -> dict:
"""Get this workspace's info from the platform."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}")
if resp.status_code == 200:
return resp.json()
return {"error": "not found"}
except Exception as e:
return {"error": str(e)}