molecule-ai-workspace-runtime/molecule_runtime/coordinator.py
Molecule AI Infra-Runtime-BE d21f8c2064
Some checks failed
ci / mirror-guard (pull_request) Failing after 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
fix(runtime): align PLATFORM_URL default to host.docker.internal across all modules
Unified the fallback default for PLATFORM_URL from `http://platform:8080`
(Docker Compose service name) to `http://host.docker.internal:8080`
across all 13 modules that declare it. This matches:
- The provisioner's default (buildContainerEnv injects PLATFORM_URL
  from cfg.PlatformURL, which defaults to host.docker.internal on the
  platform side — main.go:platformURL)
- The molecule-git-token-helper.sh script (already uses host.docker.internal)
- The MCP client (MOLECULE_URL injected by provisioner)

The provisioner always sets PLATFORM_URL in production containers, so
this is a development/Docker-only improvement: without this change,
a workspace started outside the Docker Compose network (e.g. via
`docker run` with `--network host`) would fail platform API calls
with "Connection refused" because `platform:8080` resolves nowhere.

13 modules updated: a2a_cli, a2a_client, a2a_mcp_server, adapters/base,
builtin_tools/a2a_tools, builtin_tools/approval, builtin_tools/delegation,
builtin_tools/hitl, builtin_tools/memory, consolidation, coordinator,
main, molecule_ai_status. All docstrings updated to match.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 03:34:24 +00:00

143 lines
5.2 KiB
Python

"""Coordinator pattern for team workspaces.
When a workspace is expanded into a team, the parent agent becomes a
coordinator that routes incoming tasks to the appropriate child workspace
based on the task content and children's capabilities.
The coordinator:
1. Fetches its children's Agent Cards (skills, capabilities)
2. Analyzes each incoming task to determine which child is best suited
3. Delegates to the chosen child via the delegation tool
4. Aggregates responses if a task requires multiple children
5. Falls back to handling the task itself if no child is appropriate
"""
import logging
import os
import httpx
from langchain_core.tools import tool
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from molecule_runtime.adapters.shared_runtime import build_peer_section
from policies.routing import build_team_routing_payload
logger = logging.getLogger(__name__)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
async def get_parent_context() -> list[dict]:
"""Fetch shared context files from this workspace's parent.
Returns a list of {"path": str, "content": str} dicts.
Returns empty list if no parent, parent unreachable, or no shared context.
"""
parent_id = os.environ.get("PARENT_ID", "")
if not parent_id:
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{parent_id}/shared-context",
headers={"X-Workspace-ID": WORKSPACE_ID},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning("Failed to fetch parent context: %s", e)
return []
async def get_children() -> list[dict]:
"""Fetch this workspace's children from the platform."""
try:
ws_id = get_validated_workspace_id(caller="coordinator.get_children")
except WorkspaceIdValidationError:
logger.warning("get_children skipped: invalid WORKSPACE_ID")
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/registry/{ws_id}/peers",
headers={"X-Workspace-ID": ws_id},
)
if resp.status_code == 200:
peers = resp.json()
# Filter to only children (parent_id == our ID)
return [p for p in peers if p.get("parent_id") == ws_id]
except Exception as e:
logger.warning("Failed to fetch children: %s", e)
return []
def build_children_description(children: list[dict]) -> str:
"""Build a description of children's capabilities for the coordinator prompt."""
if not children:
return ""
team_section = build_peer_section(
children,
heading="## Your Team (sub-workspaces you coordinate)",
instruction=(
"Use the `delegate_to_workspace` tool to send tasks to the chosen member. "
"Only delegate to members listed above."
),
)
return "\n".join(
[
team_section,
"",
"### Coordination Rules — MANDATORY",
"1. You are a COORDINATOR. Your ONLY job is to delegate and synthesize. NEVER do the work yourself.",
"2. For EVERY task, use `delegate_to_workspace` to send it to the appropriate team member(s). "
"Do this BEFORE writing any analysis, code, or research yourself.",
"3. If a task spans multiple members, delegate to ALL of them in parallel and aggregate results.",
"4. If ALL members are offline/paused, tell the caller which members are unavailable. "
"Do NOT attempt the work yourself — you lack the specialist context.",
"5. If a delegation FAILS (error, timeout): try another member first. "
"Only provide your own brief summary if NO member can respond. Never forward raw errors.",
"6. Your response should be a SYNTHESIS of your team's work, not your own analysis.",
"7. Always respond in the same language the caller uses.",
]
)
@tool
async def route_task_to_team(
task: str,
preferred_member_id: str = "",
) -> dict:
"""Route a task to the most appropriate team member.
As the team coordinator, analyze the task and delegate to the best-suited
child workspace. If preferred_member_id is provided, delegate directly to
that member.
Args:
task: The task description to route.
preferred_member_id: Optional — directly delegate to this member.
"""
from builtin_tools.delegation import delegate_to_workspace as delegate
children = await get_children()
decision = build_team_routing_payload(
children,
task=task,
preferred_member_id=preferred_member_id,
)
if decision.get("action") == "delegate_to_preferred_member":
# Async delegation — returns immediately with task_id
result = await delegate.ainvoke(
{
"workspace_id": decision["preferred_member_id"],
"task": task,
}
)
return result
return decision