Extend get_validated_workspace_id() to all remaining unguarded URL positions:
- consolidation.py: _consolidate() — validates before GET/POST/DELETE to
/workspaces/{id}/memories endpoints. Graceful skip on failure (log + return).
- coordinator.py: get_children() — validates before /registry/{id}/peers.
Graceful skip (empty list) on failure.
- molecule_ai_status.py: set_status() — validates before /registry/heartbeat
and /workspaces/{id}/activity. Exits with descriptive error on failure.
With these three, every runtime use of WORKSPACE_ID in a URL path is now
validated. Remaining WORKSPACE_ID uses are:
- JSON body fields (not injection-risky): heartbeat, memory POST bodies
- Header values (X-Workspace-ID): lower risk, non-URL-injection
143 lines
5.2 KiB
Python
143 lines
5.2 KiB
Python
"""Coordinator pattern for team workspaces.
|
|
|
|
When a workspace is expanded into a team, the parent agent becomes a
|
|
coordinator that routes incoming tasks to the appropriate child workspace
|
|
based on the task content and children's capabilities.
|
|
|
|
The coordinator:
|
|
1. Fetches its children's Agent Cards (skills, capabilities)
|
|
2. Analyzes each incoming task to determine which child is best suited
|
|
3. Delegates to the chosen child via the delegation tool
|
|
4. Aggregates responses if a task requires multiple children
|
|
5. Falls back to handling the task itself if no child is appropriate
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
|
|
import httpx
|
|
from langchain_core.tools import tool
|
|
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
|
|
from molecule_runtime.adapters.shared_runtime import build_peer_section
|
|
from policies.routing import build_team_routing_payload
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
|
|
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
|
|
|
|
|
|
async def get_parent_context() -> list[dict]:
|
|
"""Fetch shared context files from this workspace's parent.
|
|
|
|
Returns a list of {"path": str, "content": str} dicts.
|
|
Returns empty list if no parent, parent unreachable, or no shared context.
|
|
"""
|
|
parent_id = os.environ.get("PARENT_ID", "")
|
|
if not parent_id:
|
|
return []
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/workspaces/{parent_id}/shared-context",
|
|
headers={"X-Workspace-ID": WORKSPACE_ID},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch parent context: %s", e)
|
|
return []
|
|
|
|
|
|
async def get_children() -> list[dict]:
|
|
"""Fetch this workspace's children from the platform."""
|
|
try:
|
|
ws_id = get_validated_workspace_id(caller="coordinator.get_children")
|
|
except WorkspaceIdValidationError:
|
|
logger.warning("get_children skipped: invalid WORKSPACE_ID")
|
|
return []
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/registry/{ws_id}/peers",
|
|
headers={"X-Workspace-ID": ws_id},
|
|
)
|
|
if resp.status_code == 200:
|
|
peers = resp.json()
|
|
# Filter to only children (parent_id == our ID)
|
|
return [p for p in peers if p.get("parent_id") == ws_id]
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch children: %s", e)
|
|
return []
|
|
|
|
|
|
def build_children_description(children: list[dict]) -> str:
|
|
"""Build a description of children's capabilities for the coordinator prompt."""
|
|
if not children:
|
|
return ""
|
|
|
|
team_section = build_peer_section(
|
|
children,
|
|
heading="## Your Team (sub-workspaces you coordinate)",
|
|
instruction=(
|
|
"Use the `delegate_to_workspace` tool to send tasks to the chosen member. "
|
|
"Only delegate to members listed above."
|
|
),
|
|
)
|
|
|
|
return "\n".join(
|
|
[
|
|
team_section,
|
|
"",
|
|
"### Coordination Rules — MANDATORY",
|
|
"1. You are a COORDINATOR. Your ONLY job is to delegate and synthesize. NEVER do the work yourself.",
|
|
"2. For EVERY task, use `delegate_to_workspace` to send it to the appropriate team member(s). "
|
|
"Do this BEFORE writing any analysis, code, or research yourself.",
|
|
"3. If a task spans multiple members, delegate to ALL of them in parallel and aggregate results.",
|
|
"4. If ALL members are offline/paused, tell the caller which members are unavailable. "
|
|
"Do NOT attempt the work yourself — you lack the specialist context.",
|
|
"5. If a delegation FAILS (error, timeout): try another member first. "
|
|
"Only provide your own brief summary if NO member can respond. Never forward raw errors.",
|
|
"6. Your response should be a SYNTHESIS of your team's work, not your own analysis.",
|
|
"7. Always respond in the same language the caller uses.",
|
|
]
|
|
)
|
|
|
|
|
|
@tool
|
|
async def route_task_to_team(
|
|
task: str,
|
|
preferred_member_id: str = "",
|
|
) -> dict:
|
|
"""Route a task to the most appropriate team member.
|
|
|
|
As the team coordinator, analyze the task and delegate to the best-suited
|
|
child workspace. If preferred_member_id is provided, delegate directly to
|
|
that member.
|
|
|
|
Args:
|
|
task: The task description to route.
|
|
preferred_member_id: Optional — directly delegate to this member.
|
|
"""
|
|
from builtin_tools.delegation import delegate_to_workspace as delegate
|
|
|
|
children = await get_children()
|
|
decision = build_team_routing_payload(
|
|
children,
|
|
task=task,
|
|
preferred_member_id=preferred_member_id,
|
|
)
|
|
|
|
if decision.get("action") == "delegate_to_preferred_member":
|
|
# Async delegation — returns immediately with task_id
|
|
result = await delegate.ainvoke(
|
|
{
|
|
"workspace_id": decision["preferred_member_id"],
|
|
"task": task,
|
|
}
|
|
)
|
|
return result
|
|
|
|
return decision
|