molecule-ai-workspace-runtime/molecule_runtime/coordinator.py
Molecule AI Infra-SRE be9c9997c0 fix(builtin_tools/validation): cover remaining WORKSPACE_ID URL usages
Extend get_validated_workspace_id() to all remaining unguarded URL positions:

- consolidation.py: _consolidate() — validates before GET/POST/DELETE to
  /workspaces/{id}/memories endpoints. Graceful skip on failure (log + return).
- coordinator.py: get_children() — validates before /registry/{id}/peers.
  Graceful skip (empty list) on failure.
- molecule_ai_status.py: set_status() — validates before /registry/heartbeat
  and /workspaces/{id}/activity. Exits with descriptive error on failure.

With these three, every runtime use of WORKSPACE_ID in a URL path is now
validated. Remaining WORKSPACE_ID uses are:
- JSON body fields (not injection-risky): heartbeat, memory POST bodies
- Header values (X-Workspace-ID): lower risk, non-URL-injection
2026-04-21 00:55:08 +00:00

143 lines
5.2 KiB
Python

"""Coordinator pattern for team workspaces.
When a workspace is expanded into a team, the parent agent becomes a
coordinator that routes incoming tasks to the appropriate child workspace
based on the task content and children's capabilities.
The coordinator:
1. Fetches its children's Agent Cards (skills, capabilities)
2. Analyzes each incoming task to determine which child is best suited
3. Delegates to the chosen child via the delegation tool
4. Aggregates responses if a task requires multiple children
5. Falls back to handling the task itself if no child is appropriate
"""
import logging
import os
import httpx
from langchain_core.tools import tool
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from molecule_runtime.adapters.shared_runtime import build_peer_section
from policies.routing import build_team_routing_payload
logger = logging.getLogger(__name__)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
async def get_parent_context() -> list[dict]:
"""Fetch shared context files from this workspace's parent.
Returns a list of {"path": str, "content": str} dicts.
Returns empty list if no parent, parent unreachable, or no shared context.
"""
parent_id = os.environ.get("PARENT_ID", "")
if not parent_id:
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{parent_id}/shared-context",
headers={"X-Workspace-ID": WORKSPACE_ID},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Failed to fetch parent context: %s", e)
return []
async def get_children() -> list[dict]:
"""Fetch this workspace's children from the platform."""
try:
ws_id = get_validated_workspace_id(caller="coordinator.get_children")
except WorkspaceIdValidationError:
logger.warning("get_children skipped: invalid WORKSPACE_ID")
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/registry/{ws_id}/peers",
headers={"X-Workspace-ID": ws_id},
)
if resp.status_code == 200:
peers = resp.json()
# Filter to only children (parent_id == our ID)
return [p for p in peers if p.get("parent_id") == ws_id]
except Exception as e:
logger.warning("Failed to fetch children: %s", e)
return []
def build_children_description(children: list[dict]) -> str:
"""Build a description of children's capabilities for the coordinator prompt."""
if not children:
return ""
team_section = build_peer_section(
children,
heading="## Your Team (sub-workspaces you coordinate)",
instruction=(
"Use the `delegate_to_workspace` tool to send tasks to the chosen member. "
"Only delegate to members listed above."
),
)
return "\n".join(
[
team_section,
"",
"### Coordination Rules — MANDATORY",
"1. You are a COORDINATOR. Your ONLY job is to delegate and synthesize. NEVER do the work yourself.",
"2. For EVERY task, use `delegate_to_workspace` to send it to the appropriate team member(s). "
"Do this BEFORE writing any analysis, code, or research yourself.",
"3. If a task spans multiple members, delegate to ALL of them in parallel and aggregate results.",
"4. If ALL members are offline/paused, tell the caller which members are unavailable. "
"Do NOT attempt the work yourself — you lack the specialist context.",
"5. If a delegation FAILS (error, timeout): try another member first. "
"Only provide your own brief summary if NO member can respond. Never forward raw errors.",
"6. Your response should be a SYNTHESIS of your team's work, not your own analysis.",
"7. Always respond in the same language the caller uses.",
]
)
@tool
async def route_task_to_team(
task: str,
preferred_member_id: str = "",
) -> dict:
"""Route a task to the most appropriate team member.
As the team coordinator, analyze the task and delegate to the best-suited
child workspace. If preferred_member_id is provided, delegate directly to
that member.
Args:
task: The task description to route.
preferred_member_id: Optional — directly delegate to this member.
"""
from builtin_tools.delegation import delegate_to_workspace as delegate
children = await get_children()
decision = build_team_routing_payload(
children,
task=task,
preferred_member_id=preferred_member_id,
)
if decision.get("action") == "delegate_to_preferred_member":
# Async delegation — returns immediately with task_id
result = await delegate.ainvoke(
{
"workspace_id": decision["preferred_member_id"],
"task": task,
}
)
return result
return decision