diff --git a/workspace/main.py b/workspace/main.py index 550d734f..0402a779 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -148,62 +148,15 @@ async def main(): # pragma: no cover heartbeat=heartbeat, ) - # 5. Setup adapter and create executor - # If setup fails, ensure heartbeat is stopped to prevent resource leak - try: - await adapter.setup(adapter_config) - executor = await adapter.create_executor(adapter_config) - - # 5a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE - # is set, exercise the executor's full import tree by calling - # execute() once with stub deps + a short timeout. Skips platform - # registration + uvicorn entirely. Returns process exit code. - from smoke_mode import is_smoke_mode, run_executor_smoke - if is_smoke_mode(): - exit_code = await run_executor_smoke(executor) - if hasattr(heartbeat, "stop"): - try: - await heartbeat.stop() - except Exception: # noqa: BLE001 - pass - raise SystemExit(exit_code) - - # 5b. Restore from pre-stop snapshot if one exists (GH#1391). - # The snapshot is scrubbed before being written, so secrets are - # already redacted — restore_state must not re-expose them. - from lib.pre_stop import read_snapshot - snapshot = read_snapshot() - if snapshot: - try: - adapter.restore_state(snapshot) - print( - f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, " - f"uptime={snapshot.get('uptime_seconds', 0)}s" - ) - except Exception as restore_err: - print(f"Warning: snapshot restore failed (continuing): {restore_err}") - except Exception: - # heartbeat hasn't started yet but may have async tasks pending - if hasattr(heartbeat, "stop"): - try: - await heartbeat.stop() - except Exception: - pass - raise - - # 5.5. Initialise Temporal durable execution wrapper (optional) - # Connects to TEMPORAL_HOST (default: localhost:7233) and starts a - # co-located Temporal worker as a background asyncio task. - # No-op with a warning log if Temporal is unreachable or temporalio - # is not installed — all tasks fall back to direct execution transparently. - from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper - temporal_wrapper = _create_temporal_wrapper() - await temporal_wrapper.start() - - # Get loaded skills for agent card (adapter may have populated them) - loaded_skills = getattr(adapter, "loaded_skills", []) - - # 6. Build Agent Card + # 5. Build the AgentCard *before* adapter.setup() so /.well-known/agent-card.json + # is reachable as soon as uvicorn binds, regardless of whether the adapter + # has working LLM credentials. Decoupling readiness ("is the workspace up?") + # from configuration ("can it actually answer?") means a workspace with a + # missing/rotated key stays REACHABLE — canvas can render a clear + # "agent not configured" error instead of "stuck booting forever," and + # operators can deprovision/redeploy normally. Skills built from + # config.skills (static names from config.yaml) up front; richer metadata + # from the adapter's loaded_skills swaps in below if setup() succeeds. machine_ip = os.environ.get("HOSTNAME", get_machine_ip()) workspace_url = f"http://{machine_ip}:{port}" @@ -237,20 +190,96 @@ async def main(): # pragma: no cover # always available and tasks/get accepts historyLength via # apply_history_length(). Don't add this kwarg back. ), + # Static skill stubs from config.yaml; replaced with rich metadata + # below if adapter.setup() loads skills successfully. skills=[ - AgentSkill( - id=skill.metadata.id, - name=skill.metadata.name, - description=skill.metadata.description, - tags=skill.metadata.tags, - examples=skill.metadata.examples, - ) - for skill in loaded_skills + AgentSkill(id=name, name=name, description=name, tags=[], examples=[]) + for name in (config.skills or []) ], default_input_modes=["text/plain", "application/json"], default_output_modes=["text/plain", "application/json"], ) + # 6. Setup adapter and create executor + # On failure: log + continue. The card route stays mounted (above); + # the JSON-RPC route below returns -32603 "agent not configured" until + # the operator fixes credentials and redeploys. Heartbeat keeps running + # so the platform sees the workspace as reachable-but-misconfigured + # rather than crash-looping. + adapter_ready = False + adapter_error: str | None = None + executor = None + try: + await adapter.setup(adapter_config) + executor = await adapter.create_executor(adapter_config) + + # 6a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE + # is set, exercise the executor's full import tree by calling + # execute() once with stub deps + a short timeout. Skips platform + # registration + uvicorn entirely. Returns process exit code. + from smoke_mode import is_smoke_mode, run_executor_smoke + if is_smoke_mode(): + exit_code = await run_executor_smoke(executor) + if hasattr(heartbeat, "stop"): + try: + await heartbeat.stop() + except Exception: # noqa: BLE001 + pass + raise SystemExit(exit_code) + + # 6b. Restore from pre-stop snapshot if one exists (GH#1391). + # The snapshot is scrubbed before being written, so secrets are + # already redacted — restore_state must not re-expose them. + from lib.pre_stop import read_snapshot + snapshot = read_snapshot() + if snapshot: + try: + adapter.restore_state(snapshot) + print( + f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, " + f"uptime={snapshot.get('uptime_seconds', 0)}s" + ) + except Exception as restore_err: + print(f"Warning: snapshot restore failed (continuing): {restore_err}") + + # 6c. Swap rich skill metadata into the card now that setup() loaded + # them. In-place mutation: a2a-sdk's create_agent_card_routes serialises + # the card on each request, so the route mounted below sees the update. + loaded_skills = getattr(adapter, "loaded_skills", None) + if loaded_skills: + agent_card.skills = [ + AgentSkill( + id=skill.metadata.id, + name=skill.metadata.name, + description=skill.metadata.description, + tags=skill.metadata.tags, + examples=skill.metadata.examples, + ) + for skill in loaded_skills + ] + adapter_ready = True + except SystemExit: + # Smoke-mode exit signal — propagate untouched. + raise + except Exception as setup_err: # noqa: BLE001 + adapter_error = f"{type(setup_err).__name__}: {setup_err}" + print( + f"WARNING: adapter.setup() failed — workspace will serve agent-card " + f"but JSON-RPC will return -32603 until configuration is fixed. " + f"Reason: {adapter_error}", + flush=True, + ) + # Heartbeat keeps running so the platform marks the workspace as + # reachable-but-misconfigured. Operators can then redeploy with the + # correct env vars without having to chase a crash-loop. + + # 6.5. Initialise Temporal durable execution wrapper (optional). Only + # meaningful when an executor exists; skipped on misconfigured boots. + if adapter_ready: + from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper + temporal_wrapper = _create_temporal_wrapper() + await temporal_wrapper.start() + # 7. Wrap in A2A. # # Regression fix (#204): PR #198 tried to wire push_config_store + @@ -262,42 +291,51 @@ async def main(): # pragma: no cover # in the AgentCard below is still advertised via AgentCapabilities so # clients know we COULD do pushes; actually implementing them requires # a concrete sender subclass, tracked as a Phase-H follow-up to #175. - handler = DefaultRequestHandler( - agent_executor=executor, - task_store=InMemoryTaskStore(), - # a2a-sdk 1.x added agent_card as a required positional/keyword - # argument — it's used internally for capability dispatch (e.g. - # routing tasks/get historyLength based on the card's protocol - # version). Pass the same agent_card we registered with the - # platform so the handler's capability surface matches what the - # AgentCard advertises. - agent_card=agent_card, - ) - - # v1: replace A2AStarletteApplication with Starlette route factory. - # rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x). - # Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also - # what the platform's a2a_proxy.go POSTs to (it forwards to the - # workspace's URL without appending a path). Card endpoint stays at - # the well-known path /.well-known/agent-card.json (handled by - # create_agent_card_routes default). routes = [] routes.extend(create_agent_card_routes(agent_card)) - # enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients - # using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase - # Pydantic field names) can talk to us without re-deploying. Outbound - # JSON-RPC wire payloads MUST also use v0.3 shape — the v0.3 compat - # adapter at /usr/local/lib/python3.11/site-packages/a2a/compat/v0_3/ - # validates against Pydantic Role enum (`agent`|`user`) and rejects - # the protobuf-style `ROLE_USER` enum names with JSON-RPC -32600 - # (Invalid Request). Native v1.x types (a2a.types.Role.ROLE_AGENT) - # are only for code that constructs Message objects in-process and - # hands them to the SDK, which serialises them correctly for the - # outbound wire format. - routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True)) + + if adapter_ready: + handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + # a2a-sdk 1.x added agent_card as a required positional/keyword + # argument — it's used internally for capability dispatch (e.g. + # routing tasks/get historyLength based on the card's protocol + # version). Pass the same agent_card we registered with the + # platform so the handler's capability surface matches what the + # AgentCard advertises. + agent_card=agent_card, + ) + # v1: replace A2AStarletteApplication with Starlette route factory. + # rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x). + # Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also + # what the platform's a2a_proxy.go POSTs to (it forwards to the + # workspace's URL without appending a path). Card endpoint stays at + # the well-known path /.well-known/agent-card.json (handled by + # create_agent_card_routes default). + # enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients + # using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase + # Pydantic field names) can talk to us without re-deploying. + routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True)) + else: + # Misconfigured: serve the card but reject JSON-RPC with -32603 so + # canvas surfaces a useful "agent not configured: " instead + # of letting requests time out. Handler factory is in its own module + # so the behavior is unit-testable (workspace/tests/test_not_configured_handler.py). + from starlette.routing import Route + from not_configured_handler import make_not_configured_handler + + routes.append( + Route("/", make_not_configured_handler(adapter_error), methods=["POST"]) + ) + app = Starlette(routes=routes) # 8. Register with platform + # When adapter.setup() failed, advertise via configuration_status so + # the platform/canvas can render "configured: false, reason: …" instead + # of a confused "ready but silent" state. + loaded_skills = getattr(adapter, "loaded_skills", None) or [] agent_card_dict = { "name": config.name, "description": config.description, @@ -311,11 +349,16 @@ async def main(): # pragma: no cover "tags": s.metadata.tags, } for s in loaded_skills + ] if adapter_ready else [ + {"id": n, "name": n, "description": n, "tags": []} + for n in (config.skills or []) ], "capabilities": { "streaming": config.a2a.streaming, "pushNotifications": config.a2a.push_notifications, }, + "configuration_status": "ready" if adapter_ready else "not_configured", + **({"configuration_error": adapter_error} if adapter_error else {}), } async with httpx.AsyncClient(timeout=10.0) as client: @@ -364,7 +407,9 @@ async def main(): # pragma: no cover # 9b. Start skills hot-reload watcher (background task) # When a skill file changes the watcher reloads the skill module and calls # back into the adapter so the next A2A request uses the updated tools. - if config.skills: + # Skipped on misconfigured boots — adapter has no executor / tool registry + # to swap into, so reloading skills would NPE on the agent rebuild path. + if adapter_ready and config.skills: try: from skill_loader.watcher import SkillsWatcher @@ -495,9 +540,13 @@ async def main(): # pragma: no cover # 10b. Schedule initial_prompt self-message after server is ready. # Only runs on first boot — creates a marker file to prevent re-execution on restart. + # Skipped on misconfigured boots: the self-message would route through the + # platform back to /, hit the -32603 not-configured handler, and consume + # the marker for a fire that can't actually run. Wait until the operator + # fixes credentials and the workspace redeploys with adapter_ready=True. initial_prompt_task = None initial_prompt_marker = resolve_initial_prompt_marker(config_path) - if config.initial_prompt and not os.path.exists(initial_prompt_marker): + if adapter_ready and config.initial_prompt and not os.path.exists(initial_prompt_marker): # Write the marker UP FRONT (#71): if the prompt later crashes or # times out, we do NOT replay on next boot — that created a # ProcessError cascade where every message kept crashing. Operators @@ -615,7 +664,9 @@ async def main(): # pragma: no cover # workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or # per-workspace to enable. idle_loop_task = None - if config.idle_prompt: + # Skipped on misconfigured boots — the self-fire would route to the + # -32603 handler in a tight loop and consume cycles for no useful work. + if adapter_ready and config.idle_prompt: # Idle-fire HTTP timeout. Kept tight relative to the fire cadence so a # hung platform doesn't accumulate dangling requests — a fire that # takes longer than the idle interval itself is almost certainly stuck. diff --git a/workspace/not_configured_handler.py b/workspace/not_configured_handler.py new file mode 100644 index 00000000..da559b62 --- /dev/null +++ b/workspace/not_configured_handler.py @@ -0,0 +1,55 @@ +"""Build a JSON-RPC handler that returns ``-32603 "agent not configured"``. + +Used by the workspace runtime when ``adapter.setup()`` fails (most often +because an LLM credential is missing or rotated). Lets ``/.well-known/agent-card.json`` +keep serving 200 — the workspace stays REACHABLE for canvas/operator +introspection — while message-send requests get a clear, immediate +error instead of silently timing out. + +Kept as its own module so the behavior is unit-testable without booting +the whole runtime (main.py is ``# pragma: no cover``). +""" +from __future__ import annotations + +from typing import Awaitable, Callable + +from starlette.requests import Request +from starlette.responses import JSONResponse + + +def make_not_configured_handler( + reason: str | None, +) -> Callable[[Request], Awaitable[JSONResponse]]: + """Return a Starlette POST handler that always 503s with JSON-RPC -32603. + + ``reason`` is surfaced in the JSON-RPC ``error.data`` field so canvas + can render "agent not configured: " to the user. Pass the + stringified ``adapter.setup()`` exception. ``None`` falls back to a + generic "adapter.setup() failed". + + The handler echoes the request's JSON-RPC ``id`` when present so a + well-behaved JSON-RPC client can correlate the error to its request. + Malformed bodies (non-JSON, missing id) get ``id: null`` per spec. + """ + + fallback = reason or "adapter.setup() failed" + + async def _handler(request: Request) -> JSONResponse: + try: + body = await request.json() + except Exception: # noqa: BLE001 + body = {} + return JSONResponse( + { + "jsonrpc": "2.0", + "id": body.get("id") if isinstance(body, dict) else None, + "error": { + "code": -32603, + "message": "Internal error: agent not configured", + "data": fallback, + }, + }, + status_code=503, + ) + + return _handler diff --git a/workspace/tests/test_not_configured_handler.py b/workspace/tests/test_not_configured_handler.py new file mode 100644 index 00000000..39483ffc --- /dev/null +++ b/workspace/tests/test_not_configured_handler.py @@ -0,0 +1,87 @@ +"""Tests for ``not_configured_handler`` — the JSON-RPC -32603 fallback the +runtime mounts when ``adapter.setup()`` fails. + +Tests the behavior end-to-end via Starlette's TestClient so the JSON-RPC +wire shape (status 503, code -32603, id-echo) is exercised the same way +canvas would see it. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +# Make workspace/ importable in test isolation — same pattern as the +# adjacent tests (test_smoke_mode.py, test_heartbeat.py). +WORKSPACE_DIR = Path(__file__).resolve().parents[1] +if str(WORKSPACE_DIR) not in sys.path: + sys.path.insert(0, str(WORKSPACE_DIR)) + +from starlette.applications import Starlette +from starlette.routing import Route +from starlette.testclient import TestClient + +from not_configured_handler import make_not_configured_handler + + +def _build_app(reason: str | None) -> TestClient: + handler = make_not_configured_handler(reason) + app = Starlette(routes=[Route("/", handler, methods=["POST"])]) + return TestClient(app) + + +def test_returns_503_with_jsonrpc_error_envelope(): + """Status 503; body is a valid JSON-RPC 2.0 error envelope.""" + client = _build_app("MINIMAX_API_KEY not set") + resp = client.post("/", json={"jsonrpc": "2.0", "id": 7, "method": "message/send"}) + assert resp.status_code == 503 + body = resp.json() + assert body["jsonrpc"] == "2.0" + assert body["error"]["code"] == -32603 + assert body["error"]["message"] == "Internal error: agent not configured" + + +def test_echoes_request_id_when_present(): + """JSON-RPC clients correlate replies via id; the handler must echo it.""" + client = _build_app("reason") + resp = client.post("/", json={"jsonrpc": "2.0", "id": "abc-123", "method": "x"}) + assert resp.json()["id"] == "abc-123" + + +def test_id_is_null_when_body_malformed(): + """Per JSON-RPC 2.0: id MUST be null when it can't be determined from + the request. Malformed bodies (non-JSON, empty, non-object) all map + to id=null.""" + client = _build_app("reason") + resp = client.post("/", content=b"not json at all", headers={"content-type": "application/json"}) + assert resp.status_code == 503 + assert resp.json()["id"] is None + + +def test_reason_surfaces_in_error_data(): + """Operators read ``error.data`` to figure out what to fix. The + setup() exception string lands there verbatim.""" + client = _build_app("RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set") + resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"}) + assert resp.json()["error"]["data"] == ( + "RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set" + ) + + +def test_none_reason_falls_back_to_generic_message(): + """If the adapter raised but we couldn't capture a reason, give the + operator a hint where to look (still better than a stuck-booting + workspace with no log line).""" + client = _build_app(None) + resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"}) + assert resp.json()["error"]["data"] == "adapter.setup() failed" + + +def test_array_body_does_not_crash_id_extraction(): + """JSON-RPC supports batch (array) requests. We don't currently + support batch in the runtime, but the handler shouldn't crash on a + batch body — it should just respond with id=null and the same -32603 + so the client sees a clear error instead of a 500.""" + client = _build_app("reason") + resp = client.post("/", json=[{"jsonrpc": "2.0", "id": 1, "method": "x"}]) + assert resp.status_code == 503 + assert resp.json()["id"] is None