molecule-core/workspace-template/tests/test_shared_runtime.py
Hongming Wang 24fec62d7f initial commit — Molecule AI platform
Forked clean from public hackathon repo (Starfire-AgentTeam, BSL 1.1)
with full rebrand to Molecule AI under github.com/Molecule-AI/molecule-monorepo.

Brand: Starfire → Molecule AI.
Slug: starfire / agent-molecule → molecule.
Env vars: STARFIRE_* → MOLECULE_*.
Go module: github.com/agent-molecule/platform → github.com/Molecule-AI/molecule-monorepo/platform.
Python packages: starfire_plugin → molecule_plugin, starfire_agent → molecule_agent.
DB: agentmolecule → molecule.

History truncated; see public repo for prior commits and contributor
attribution. Verified green: go test -race ./... (platform), pytest
(workspace-template 1129 + sdk 132), vitest (canvas 352), build (mcp).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 11:55:37 -07:00

190 lines
6.2 KiB
Python

"""Tests for shared runtime helpers used by A2A-backed executors."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from adapters.shared_runtime import (
append_peer_guidance,
build_peer_section,
build_task_text,
brief_task,
extract_history,
extract_message_text,
format_conversation_history,
summarize_peer_cards,
set_current_task,
)
def _make_context(parts=None, metadata=None):
context = MagicMock()
context.message.parts = parts or []
context.metadata = metadata or {}
return context
def test_extract_message_text_prefers_text_then_root_text():
part1 = MagicMock()
part1.text = "Hello"
part2 = MagicMock(spec=[])
part2.root = SimpleNamespace(text="World")
assert extract_message_text(_make_context([part1, part2])) == "Hello World"
def test_extract_message_text_supports_dict_parts():
parts = [{"text": "Hello"}, {"root": {"text": "World"}}]
assert extract_message_text(parts) == "Hello World"
def test_extract_history_and_formatting():
ctx = _make_context(
metadata={
"history": [
{"role": "user", "parts": [{"text": "First"}]},
{"role": "agent", "parts": [{"text": "Second"}]},
]
}
)
history = extract_history(ctx)
assert history == [("human", "First"), ("ai", "Second")]
assert format_conversation_history(history) == "User: First\nAgent: Second"
assert (
build_task_text("Current request", history)
== "Conversation so far:\nUser: First\nAgent: Second\n\nCurrent request: Current request"
)
def test_append_peer_guidance_is_optional():
assert append_peer_guidance(None, "", default_text="Base", tool_name="delegate") == "Base"
assert (
append_peer_guidance("Base", "Peer A", default_text="Base", tool_name="delegate")
== "Base\n\n## Peers\nPeer A\nUse delegate to communicate with them."
)
def test_summarize_peer_cards_and_render_section():
peers = [
{
"id": "peer-1",
"status": "online",
"agent_card": {
"name": "Alpha",
"skills": [{"name": "research"}, {"id": "write"}],
},
},
{"id": "peer-2", "status": "offline", "agent_card": None},
]
assert summarize_peer_cards(peers) == [
{
"id": "peer-1",
"name": "Alpha",
"status": "online",
"skills": ["research", "write"],
}
]
section = build_peer_section(peers)
assert "## Your Peers" in section
assert "**Alpha** (id: `peer-1`, status: online)" in section
assert "Skills: research, write" in section
assert "delegate_to_workspace" in section
def test_brief_task_truncates_at_sixty_chars():
assert brief_task("x" * 59) == "x" * 59
assert brief_task("x" * 60) == "x" * 60
assert brief_task("x" * 61) == ("x" * 60) + "..."
@pytest.mark.asyncio
async def test_set_current_task_updates_heartbeat():
heartbeat = SimpleNamespace(current_task="", active_tasks=0)
await set_current_task(heartbeat, "Working")
assert heartbeat.current_task == "Working"
assert heartbeat.active_tasks == 1
await set_current_task(heartbeat, "")
assert heartbeat.current_task == ""
assert heartbeat.active_tasks == 0
@pytest.mark.asyncio
async def test_set_current_task_is_noop_for_none():
await set_current_task(None, "Working")
# ---------------------------------------------------------------------------
# build_task_text() with no history
# ---------------------------------------------------------------------------
def test_build_task_text_no_history_returns_user_message():
"""When history is empty, build_task_text() returns the user_message directly."""
result = build_task_text("What is the weather?", [])
assert result == "What is the weather?"
# ---------------------------------------------------------------------------
# summarize_peer_cards() edge cases
# ---------------------------------------------------------------------------
def test_summarize_peer_cards_invalid_json_string_skipped():
"""A peer whose agent_card is an invalid JSON string is skipped entirely."""
peers = [
{"id": "peer-bad", "status": "online", "agent_card": "{not valid json}"},
{
"id": "peer-good",
"status": "online",
"agent_card": {"name": "Good Peer", "skills": []},
},
]
result = summarize_peer_cards(peers)
assert len(result) == 1
assert result[0]["id"] == "peer-good"
def test_summarize_peer_cards_json_string_not_dict_skipped():
"""A peer whose agent_card is a JSON-encoded list (not a dict) is skipped."""
import json
peers = [
{"id": "peer-list", "status": "online", "agent_card": json.dumps(["skill1"])},
{
"id": "peer-dict",
"status": "online",
"agent_card": {"name": "Dict Peer", "skills": []},
},
]
result = summarize_peer_cards(peers)
assert len(result) == 1
assert result[0]["id"] == "peer-dict"
# ---------------------------------------------------------------------------
# set_current_task() httpx exception is swallowed
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_set_current_task_httpx_exception_is_silenced(monkeypatch):
"""set_current_task() silently ignores exceptions from the httpx heartbeat push."""
monkeypatch.setenv("WORKSPACE_ID", "ws-test")
monkeypatch.setenv("PLATFORM_URL", "http://platform:8080")
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(side_effect=Exception("Connection refused"))
# httpx is imported lazily inside the function, so patch at the httpx module level
with patch("httpx.AsyncClient", return_value=mock_client):
# Should not raise — exception is swallowed with pass
heartbeat = SimpleNamespace(current_task="", active_tasks=0)
await set_current_task(heartbeat, "Doing work")
assert heartbeat.current_task == "Doing work"
assert heartbeat.active_tasks == 1