molecule-core/workspace/tests/test_routing_policy.py
Hongming Wang d8026347e5 chore: open-source restructure — rename dirs, remove internal files, scrub secrets
Renames:
- platform/ → workspace-server/ (Go module path stays as "platform" for
  external dep compat — will update after plugin module republish)
- workspace-template/ → workspace/

Removed (moved to separate repos or deleted):
- PLAN.md — internal roadmap (move to private project board)
- HANDOFF.md, AGENTS.md — one-time internal session docs
- .claude/ — gitignored entirely (local agent config)
- infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy
- org-templates/molecule-dev/ → standalone template repo
- .mcp-eval/ → molecule-mcp-server repo
- test-results/ — ephemeral, gitignored

Security scrubbing:
- Cloudflare account/zone/KV IDs → placeholders
- Real EC2 IPs → <EC2_IP> in all docs
- CF token prefix, Neon project ID, Fly app names → redacted
- Langfuse dev credentials → parameterized
- Personal runner username/machine name → generic

Community files:
- CONTRIBUTING.md — build, test, branch conventions
- CODE_OF_CONDUCT.md — Contributor Covenant 2.1

All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml,
README, CLAUDE.md updated for new directory names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 00:24:44 -07:00

119 lines
3.8 KiB
Python

"""Tests for coordinator routing policy."""
import json
from policies.routing import (
build_team_routing_payload,
build_team_route_decision,
decide_team_route,
summarize_children,
_load_agent_card,
)
def test_summarize_children_extracts_skills():
children = [
{
"id": "child-1",
"name": "Alpha",
"status": "online",
"agent_card": {"skills": [{"name": "research"}, {"id": "write"}]},
}
]
assert summarize_children(children) == [
{
"id": "child-1",
"name": "Alpha",
"status": "online",
"skills": ["research", "write"],
}
]
def test_build_team_routing_payload_handles_empty_children():
payload = build_team_routing_payload([], "Investigate the issue")
assert payload["success"] is False
assert "No team members available" in payload["error"]
def test_decide_team_route_prefers_direct_member():
payload = decide_team_route(
[{"id": "child-1"}],
task="Investigate the issue",
preferred_member_id="child-2",
)
assert payload["action"] == "delegate_to_preferred_member"
assert payload["preferred_member_id"] == "child-2"
# ---------------------------------------------------------------------------
# _load_agent_card() tests
# ---------------------------------------------------------------------------
def test_load_agent_card_valid_json_string():
"""A valid JSON string that decodes to a dict is returned as a dict."""
card = json.dumps({"name": "Alpha", "skills": [{"name": "search"}]})
result = _load_agent_card(card)
assert result == {"name": "Alpha", "skills": [{"name": "search"}]}
def test_load_agent_card_invalid_json_string():
"""An invalid JSON string returns an empty dict."""
result = _load_agent_card("{not valid json}")
assert result == {}
def test_load_agent_card_json_string_not_dict():
"""A valid JSON string that decodes to a non-dict (e.g. a list) returns {}."""
result = _load_agent_card(json.dumps(["item1", "item2"]))
assert result == {}
# ---------------------------------------------------------------------------
# build_team_routing_payload() with no members
# ---------------------------------------------------------------------------
def test_build_team_routing_payload_no_children_returns_error():
"""build_team_routing_payload with empty children returns an error dict."""
result = build_team_routing_payload([], task="Do something")
assert result["success"] is False
assert "error" in result
assert "No team members available" in result["error"]
assert result["members"] == []
assert result["task"] == "Do something"
# ---------------------------------------------------------------------------
# build_team_route_decision() compatibility wrapper
# ---------------------------------------------------------------------------
def test_build_team_route_decision_delegates_correctly():
"""build_team_route_decision is a compatibility wrapper for build_team_routing_payload."""
children = [
{
"id": "child-1",
"name": "Worker",
"status": "online",
"agent_card": {"skills": [{"name": "coding"}]},
}
]
result = build_team_route_decision(children, task="Write code")
assert result["success"] is True
assert result["action"] == "choose_member"
assert result["task"] == "Write code"
assert len(result["members"]) == 1
def test_build_team_route_decision_with_preferred_member():
"""build_team_route_decision passes preferred_member_id through."""
result = build_team_route_decision(
[{"id": "child-1"}],
task="Analyze data",
preferred_member_id="child-1",
)
assert result["action"] == "delegate_to_preferred_member"
assert result["preferred_member_id"] == "child-1"