molecule-core/workspace/tests/test_a2a_cli.py
Hongming Wang 479a027e4b chore: open-source restructure — rename dirs, remove internal files, scrub secrets
Renames:
- platform/ → workspace-server/ (Go module path stays as "platform" for
  external dep compat — will update after plugin module republish)
- workspace-template/ → workspace/

Removed (moved to separate repos or deleted):
- PLAN.md — internal roadmap (move to private project board)
- HANDOFF.md, AGENTS.md — one-time internal session docs
- .claude/ — gitignored entirely (local agent config)
- infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy
- org-templates/molecule-dev/ → standalone template repo
- .mcp-eval/ → molecule-mcp-server repo
- test-results/ — ephemeral, gitignored

Security scrubbing:
- Cloudflare account/zone/KV IDs → placeholders
- Real EC2 IPs → <EC2_IP> in all docs
- CF token prefix, Neon project ID, Fly app names → redacted
- Langfuse dev credentials → parameterized
- Personal runner username/machine name → generic

Community files:
- CONTRIBUTING.md — build, test, branch conventions
- CODE_OF_CONDUCT.md — Contributor Covenant 2.1

All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml,
README, CLAUDE.md updated for new directory names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 00:24:44 -07:00

673 lines
24 KiB
Python

"""Tests for a2a_cli.py — CLI tool for inter-workspace communication.
Uses importlib.util.spec_from_file_location to load the real module, bypassing
conftest mocks. Tests call async functions directly rather than going through
main() to avoid sys.exit() complications.
"""
import importlib.util
import json as json_mod
import sys
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parents[1]
def _load_cli(monkeypatch, *, platform_url="http://platform.test", workspace_id="ws-test"):
"""Load the real a2a_cli.py in isolation."""
monkeypatch.setenv("PLATFORM_URL", platform_url)
monkeypatch.setenv("WORKSPACE_ID", workspace_id)
spec = importlib.util.spec_from_file_location(
"_test_a2a_cli",
ROOT / "a2a_cli.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
# Patch module-level constants to match env
mod.PLATFORM_URL = platform_url
mod.WORKSPACE_ID = workspace_id
return mod
class _FakeResponse:
def __init__(self, status_code, payload):
self.status_code = status_code
self._payload = payload
self.text = str(payload)
def json(self):
return self._payload
class _FakeBadJsonResponse:
def __init__(self, status_code):
self.status_code = status_code
self.text = "not json"
def json(self):
raise ValueError("invalid json")
# ---------------------------------------------------------------------------
# discover()
# ---------------------------------------------------------------------------
class TestDiscover:
async def test_discover_200(self, monkeypatch):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
assert "ws-target" in url
assert headers.get("X-Workspace-ID") == "ws-test"
return _FakeResponse(200, {"id": "ws-target", "url": "http://target.test/a2a"})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
result = await mod.discover("ws-target")
assert result == {"id": "ws-target", "url": "http://target.test/a2a"}
async def test_discover_non_200_returns_none(self, monkeypatch):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(403, {"error": "forbidden"})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
result = await mod.discover("ws-target")
assert result is None
# ---------------------------------------------------------------------------
# delegate() — sync mode
# ---------------------------------------------------------------------------
class TestDelegate:
async def test_delegate_sync_success(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {
"result": {
"parts": [{"kind": "text", "text": "Task result!"}]
}
})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something")
captured = capsys.readouterr()
assert "Task result!" in captured.out
async def test_delegate_sync_no_peer(self, monkeypatch, capsys):
"""When discover returns None, prints error and sys.exit(1) is called."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(404, {})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.delegate("ws-target", "do something")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "cannot reach workspace" in captured.err
async def test_delegate_sync_no_url(self, monkeypatch, capsys):
"""When peer has no URL, prints error and sys.exit(1)."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": ""})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.delegate("ws-target", "do something")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "no URL" in captured.err
async def test_delegate_sync_invalid_json_response(self, monkeypatch, capsys):
"""When A2A response is not valid JSON, prints error and sys.exit(1)."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeBadJsonResponse(200)
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.delegate("ws-target", "do something")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "invalid JSON" in captured.err
async def test_delegate_sync_error_response_exits(self, monkeypatch, capsys):
"""When A2A responds with error (non-rate-limit), prints error and sys.exit(1)."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {"error": {"message": "Permission denied"}})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.delegate("ws-target", "do something")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Permission denied" in captured.err
async def test_delegate_sync_empty_response_final_attempt(self, monkeypatch, capsys):
"""Empty result on all retries prints fallback message."""
mod = _load_cli(monkeypatch)
# Mock asyncio.sleep to be instant
monkeypatch.setattr(mod.asyncio, "sleep", lambda s: _instant_sleep())
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {"result": {"parts": [{"text": ""}]}})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something")
captured = capsys.readouterr()
assert "no response after retries" in captured.out
async def test_delegate_sync_rate_limit_then_success(self, monkeypatch, capsys):
"""Rate-limited response retries and eventually succeeds."""
mod = _load_cli(monkeypatch)
monkeypatch.setattr(mod.asyncio, "sleep", lambda s: _instant_sleep())
call_count = {"n": 0}
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
call_count["n"] += 1
if call_count["n"] < 2:
return _FakeResponse(200, {"error": {"message": "rate limit exceeded"}})
return _FakeResponse(200, {"result": {"parts": [{"text": "Done"}]}})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something")
captured = capsys.readouterr()
assert "Done" in captured.out
async def test_delegate_sync_timeout_retries_then_fails(self, monkeypatch, capsys):
"""TimeoutException on all retries exits with error."""
mod = _load_cli(monkeypatch)
monkeypatch.setattr(mod.asyncio, "sleep", lambda s: _instant_sleep())
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
raise mod.httpx.TimeoutException("timed out")
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.delegate("ws-target", "do something")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "timed out" in captured.err
async def test_delegate_sync_timeout_retry_then_success(self, monkeypatch, capsys):
"""TimeoutException on first attempt retries and eventually succeeds."""
mod = _load_cli(monkeypatch)
monkeypatch.setattr(mod.asyncio, "sleep", lambda s: _instant_sleep())
call_count = {"n": 0}
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
call_count["n"] += 1
if call_count["n"] == 1:
raise mod.httpx.TimeoutException("timed out")
return _FakeResponse(200, {"result": {"parts": [{"text": "Success after retry"}]}})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something")
captured = capsys.readouterr()
assert "Success after retry" in captured.out
# ---------------------------------------------------------------------------
# delegate() — async mode
# ---------------------------------------------------------------------------
class TestDelegateAsync:
async def test_delegate_async_success(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {"jsonrpc": "2.0"})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something", async_mode=True)
captured = capsys.readouterr()
parsed = json_mod.loads(captured.out)
assert parsed["status"] == "submitted"
assert parsed["target"] == "ws-target"
async def test_delegate_async_timeout(self, monkeypatch, capsys):
"""TimeoutException in async mode prints uncertain status to stderr."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
raise mod.httpx.TimeoutException("timed out")
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.delegate("ws-target", "do something", async_mode=True)
captured = capsys.readouterr()
parsed = json_mod.loads(captured.err)
assert parsed["status"] == "uncertain"
# ---------------------------------------------------------------------------
# peers()
# ---------------------------------------------------------------------------
class TestPeers:
async def test_peers_success(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url):
return _FakeResponse(200, [
{"id": "ws-1", "name": "Alpha Worker", "role": "worker", "status": "online"},
{"id": "ws-2", "name": "Beta Analyst", "role": "analyst", "status": "idle"},
])
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.peers()
captured = capsys.readouterr()
assert "ws-1" in captured.out
assert "Alpha Worker" in captured.out
assert "ws-2" in captured.out
async def test_peers_failure_exits(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url):
return _FakeResponse(500, {})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.peers()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "could not fetch peers" in captured.err
# ---------------------------------------------------------------------------
# info()
# ---------------------------------------------------------------------------
class TestInfo:
async def test_info_success(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
workspace_data = {
"id": "ws-test",
"name": "Test Workspace",
"role": "worker",
"tier": "standard",
"status": "active",
"parent_id": "ws-parent",
}
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url):
assert "ws-test" in url
return _FakeResponse(200, workspace_data)
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.info()
captured = capsys.readouterr()
assert "ws-test" in captured.out
assert "Test Workspace" in captured.out
assert "worker" in captured.out
assert "standard" in captured.out
assert "active" in captured.out
assert "ws-parent" in captured.out
async def test_info_non_200_no_output(self, monkeypatch, capsys):
"""When platform returns non-200, info() prints nothing (no crash)."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url):
return _FakeResponse(404, {"error": "not found"})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
# No exception — just no output
await mod.info()
captured = capsys.readouterr()
assert captured.out == ""
# ---------------------------------------------------------------------------
# check_status()
# ---------------------------------------------------------------------------
class TestCheckStatus:
async def test_check_status_completed(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {
"result": {
"status": {"state": "completed"},
"artifacts": [
{"parts": [{"text": "Artifact result"}]}
],
}
})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.check_status("ws-target", "task-123")
captured = capsys.readouterr()
assert "completed" in captured.out
assert "Artifact result" in captured.out
async def test_check_status_no_peer(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(404, {})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
with pytest.raises(SystemExit) as exc_info:
await mod.check_status("ws-target", "task-123")
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "cannot reach workspace" in captured.err
async def test_check_status_error_response(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {"error": {"message": "task not found"}})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.check_status("ws-target", "task-999")
captured = capsys.readouterr()
assert "task not found" in captured.out
async def test_check_status_running(self, monkeypatch, capsys):
"""Status in non-completed state — no artifacts printed."""
mod = _load_cli(monkeypatch)
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None):
return _FakeResponse(200, {
"result": {
"status": {"state": "running"},
"artifacts": [],
}
})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
await mod.check_status("ws-target", "task-456")
captured = capsys.readouterr()
assert "running" in captured.out
# ---------------------------------------------------------------------------
# main() — via command dispatch
# ---------------------------------------------------------------------------
class TestMain:
def test_main_no_args_exits(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a"])
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Usage" in captured.out
def test_main_unknown_command_exits(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "unknown-cmd"])
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Unknown command" in captured.err
def test_main_delegate_missing_args_exits(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "delegate"])
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Usage" in captured.err
def test_main_status_missing_args_exits(self, monkeypatch, capsys):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "status", "only-one-arg"])
with pytest.raises(SystemExit) as exc_info:
mod.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "Usage" in captured.err
def test_main_delegate_calls_asyncio_run(self, monkeypatch):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "delegate", "ws-target", "do something"])
called_with = {}
def fake_asyncio_run(coro):
called_with["coro"] = coro
# Close the coroutine to avoid ResourceWarning
coro.close()
monkeypatch.setattr(mod.asyncio, "run", fake_asyncio_run)
mod.main()
assert "coro" in called_with
def test_main_delegate_async_flag(self, monkeypatch):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "delegate", "--async", "ws-target", "do something"])
called_with = {}
def fake_asyncio_run(coro):
called_with["coro"] = coro
coro.close()
monkeypatch.setattr(mod.asyncio, "run", fake_asyncio_run)
mod.main()
assert "coro" in called_with
def test_main_status_calls_asyncio_run(self, monkeypatch):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "status", "ws-target", "task-abc"])
called_with = {}
def fake_asyncio_run(coro):
called_with["coro"] = coro
coro.close()
monkeypatch.setattr(mod.asyncio, "run", fake_asyncio_run)
mod.main()
assert "coro" in called_with
def test_main_peers_calls_asyncio_run(self, monkeypatch):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "peers"])
called_with = {}
def fake_asyncio_run(coro):
called_with["coro"] = coro
coro.close()
monkeypatch.setattr(mod.asyncio, "run", fake_asyncio_run)
mod.main()
assert "coro" in called_with
def test_main_info_calls_asyncio_run(self, monkeypatch):
mod = _load_cli(monkeypatch)
monkeypatch.setattr(sys, "argv", ["a2a", "info"])
called_with = {}
def fake_asyncio_run(coro):
called_with["coro"] = coro
coro.close()
monkeypatch.setattr(mod.asyncio, "run", fake_asyncio_run)
mod.main()
assert "coro" in called_with
# ---------------------------------------------------------------------------
# Helper coroutine for instant sleep mock
# ---------------------------------------------------------------------------
async def _instant_sleep():
"""No-op coroutine to replace asyncio.sleep in tests."""
pass