diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index e8e793c0..a517baba 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -73,6 +73,7 @@ TOP_LEVEL_MODULES = { "main", "mcp_cli", "molecule_ai_status", + "not_configured_handler", "platform_auth", "platform_inbound_auth", "plugins", diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 2021d631..e11f5a96 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -18,6 +18,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" "github.com/Molecule-AI/molecule-monorepo/platform/internal/imagewatch" + memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/router" @@ -166,6 +167,16 @@ func main() { wh.SetCPProvisioner(cpProv) } + // Memory v2 plugin (RFC #2728): build the dependency bundle once + // here so all three handlers (MCPHandler, AdminMemoriesHandler, + // WorkspaceHandler) get the same plugin/resolver pair. memBundle + // is nil when MEMORY_PLUGIN_URL is unset — every consumer + // nil-checks before using. + memBundle := memwiring.Build(db.DB) + if memBundle != nil { + wh.WithNamespaceCleanup(memBundle.NamespaceCleanupFn()) + } + // External-plugin env mutators — each plugin contributes 0+ mutators // onto a shared registry. Order matters: gh-identity populates // MOLECULE_AGENT_ROLE-derived attribution env vars that downstream @@ -306,7 +317,7 @@ func main() { cronSched.SetChannels(channelMgr) // Router - r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr) + r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle) // HTTP server with graceful shutdown srv := &http.Server{ diff --git a/workspace-server/internal/memory/wiring/wiring.go b/workspace-server/internal/memory/wiring/wiring.go new file mode 100644 index 00000000..6fdad2ba --- /dev/null +++ b/workspace-server/internal/memory/wiring/wiring.go @@ -0,0 +1,81 @@ +// Package wiring constructs the v2 memory plugin dependency bundle +// at boot time so handlers can opt into the plugin path uniformly. +// +// The bundle is nil-safe: when MEMORY_PLUGIN_URL is unset, Build +// returns (nil, nil) so callers can detect "v2 not configured" with +// a single nil check instead of plumbing a feature flag through +// every handler. +// +// This package exists because the v2 plugin client + namespace +// resolver are needed by THREE different handler types (MCPHandler, +// AdminMemoriesHandler, WorkspaceHandler) constructed in two +// different files (main.go for WorkspaceHandler, router.go for the +// other two). A central Build() avoids each construction site +// re-implementing the env-var read + plugin instantiation. +package wiring + +import ( + "context" + "database/sql" + "log" + "os" + "time" + + mclient "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/client" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace" +) + +// Bundle is the v2 dependency bundle. Pass it through Setup as a +// single param; handlers extract what they need. +// +// nil receiver = "v2 not configured" — every method on Bundle +// nil-checks itself, so callers can pass a nil Bundle through the +// hot path without conditional spread. +type Bundle struct { + Plugin *mclient.Client + Resolver *namespace.Resolver +} + +// Build returns a wired Bundle if MEMORY_PLUGIN_URL is set, else nil. +// +// It probes /v1/health at boot — when the plugin is unreachable, we +// log a warning but STILL return the bundle. The MCP layer's +// circuit breaker handles ongoing unavailability; we don't want to +// block workspace-server boot just because the memory plugin is +// briefly down. +func Build(db *sql.DB) *Bundle { + if os.Getenv("MEMORY_PLUGIN_URL") == "" { + return nil + } + plugin := mclient.New(mclient.Config{}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if hr, err := plugin.Boot(ctx); err != nil { + log.Printf("memory-plugin: /v1/health probe failed (will retry per-request): %v", err) + } else { + log.Printf("memory-plugin: ok, capabilities=%v", hr.Capabilities) + } + return &Bundle{ + Plugin: plugin, + Resolver: namespace.New(db), + } +} + +// NamespaceCleanupFn returns a closure suitable for +// WorkspaceHandler.WithNamespaceCleanup. nil when bundle is nil so +// callers can pass it through unconditionally. +// +// The closure runs best-effort: errors are logged, never propagated. +// A misbehaving plugin must not block workspace purges. +func (b *Bundle) NamespaceCleanupFn() func(context.Context, string) { + if b == nil || b.Plugin == nil { + return nil + } + return func(ctx context.Context, workspaceID string) { + ns := "workspace:" + workspaceID + if err := b.Plugin.DeleteNamespace(ctx, ns); err != nil { + log.Printf("memory-plugin: namespace cleanup failed (workspace=%s ns=%s): %v", + workspaceID, ns, err) + } + } +} diff --git a/workspace-server/internal/memory/wiring/wiring_test.go b/workspace-server/internal/memory/wiring/wiring_test.go new file mode 100644 index 00000000..ee7a6840 --- /dev/null +++ b/workspace-server/internal/memory/wiring/wiring_test.go @@ -0,0 +1,160 @@ +package wiring + +import ( + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/DATA-DOG/go-sqlmock" +) + +// TestBuild_NilWhenURLUnset pins the operator-friendly default: no +// MEMORY_PLUGIN_URL → nil bundle → all callers fall through to legacy +// behavior with no surprises. +func TestBuild_NilWhenURLUnset(t *testing.T) { + t.Setenv("MEMORY_PLUGIN_URL", "") + if got := Build(nil); got != nil { + t.Errorf("expected nil bundle when MEMORY_PLUGIN_URL unset, got %+v", got) + } +} + +// TestBuild_NonNilWhenURLSet pins that the bundle is constructed even +// when the plugin's /v1/health probe fails — we don't want workspace- +// server boot to depend on a transiently unavailable plugin. +func TestBuild_NonNilWhenURLSet(t *testing.T) { + t.Setenv("MEMORY_PLUGIN_URL", "http://127.0.0.1:1") // bogus port = probe will fail + db, _, _ := sqlmock.New() + defer db.Close() + bundle := Build(db) + if bundle == nil { + t.Fatal("expected non-nil bundle when MEMORY_PLUGIN_URL is set") + } + if bundle.Plugin == nil { + t.Error("Plugin must be wired") + } + if bundle.Resolver == nil { + t.Error("Resolver must be wired") + } +} + +// TestNamespaceCleanupFn_NilBundle pins the nil-safe path: callers +// that pass `bundle.NamespaceCleanupFn()` unconditionally don't need +// to nil-check the bundle separately. +func TestNamespaceCleanupFn_NilBundle(t *testing.T) { + var b *Bundle // nil receiver + if got := b.NamespaceCleanupFn(); got != nil { + t.Errorf("nil bundle must return nil cleanup fn, got non-nil") + } +} + +// TestNamespaceCleanupFn_NilPlugin: bundle exists but plugin is nil — +// also returns nil cleanup fn (defensive in case of partial wiring). +func TestNamespaceCleanupFn_NilPlugin(t *testing.T) { + b := &Bundle{} // both fields nil + if got := b.NamespaceCleanupFn(); got != nil { + t.Errorf("bundle with nil plugin must return nil cleanup fn") + } +} + +// TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace is the real +// integration gate for the closure: it spins up an httptest.Server +// that records every DELETE request, points MEMORY_PLUGIN_URL at it, +// runs Build(), then invokes the returned closure and asserts the +// server saw `DELETE /v1/namespaces/workspace:`. +// +// This replaces two earlier tests that exercised parallel +// implementations rather than the production closure (caught in +// self-review). +func TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace(t *testing.T) { + var ( + mu sync.Mutex + gotPaths []string + gotMethods []string + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + gotPaths = append(gotPaths, r.URL.Path) + gotMethods = append(gotMethods, r.Method) + mu.Unlock() + switch r.URL.Path { + case "/v1/health": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`)) + default: + w.WriteHeader(http.StatusNoContent) + } + })) + t.Cleanup(srv.Close) + + t.Setenv("MEMORY_PLUGIN_URL", srv.URL) + db, _, _ := sqlmock.New() + defer db.Close() + + bundle := Build(db) + if bundle == nil { + t.Fatal("Build returned nil with MEMORY_PLUGIN_URL set") + } + cleanup := bundle.NamespaceCleanupFn() + if cleanup == nil { + t.Fatal("NamespaceCleanupFn returned nil with non-nil Plugin") + } + + cleanup(context.Background(), "abc-123") + + mu.Lock() + defer mu.Unlock() + // Two requests expected: /v1/health probe at Boot + DELETE for cleanup. + foundDelete := false + for i, p := range gotPaths { + if gotMethods[i] == "DELETE" && p == "/v1/namespaces/workspace:abc-123" { + foundDelete = true + } + } + if !foundDelete { + t.Errorf("expected DELETE /v1/namespaces/workspace:abc-123, got %v", + pathsAndMethods(gotPaths, gotMethods)) + } +} + +// TestNamespaceCleanupFn_PluginErrorDoesNotPanic exercises the failure +// path for real: server returns 500 on DELETE; the closure must log +// and return without propagating. Replaces the parallel-implementation +// version that didn't actually test the production code. +func TestNamespaceCleanupFn_PluginErrorDoesNotPanic(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/v1/health" { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`)) + return + } + http.Error(w, "boom", http.StatusInternalServerError) + })) + t.Cleanup(srv.Close) + + t.Setenv("MEMORY_PLUGIN_URL", srv.URL) + db, _, _ := sqlmock.New() + defer db.Close() + + bundle := Build(db) + cleanup := bundle.NamespaceCleanupFn() + + // Must not panic, must not propagate the 500. Recovering with + // defer is belt-and-suspenders — production calls this from a + // for-loop in workspace_crud.go that has no recover. + defer func() { + if r := recover(); r != nil { + t.Errorf("cleanup panicked on plugin 500: %v", r) + } + }() + cleanup(context.Background(), "ws-1") +} + +func pathsAndMethods(paths, methods []string) []string { + out := make([]string, len(paths)) + for i := range paths { + out[i] = methods[i] + " " + paths[i] + } + return out +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 0a5459fc..0c0bc928 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -13,6 +13,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" @@ -23,7 +24,7 @@ import ( "github.com/gin-gonic/gin" ) -func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager) *gin.Engine { +func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle) *gin.Engine { r := gin.Default() // Issue #179 — trust no reverse-proxy headers. Without this call Gin's @@ -150,6 +151,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // F1084/#1131: Export applies redactSecrets before returning content. // F1085/#1132: Import applies redactSecrets before persisting content.) adminMemH := handlers.NewAdminMemoriesHandler() + if memBundle != nil { + adminMemH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver) + } wsAdmin.GET("/admin/memories/export", adminMemH.Export) wsAdmin.POST("/admin/memories/import", adminMemH.Import) } @@ -370,6 +374,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // C3: commit_memory/recall_memory with scope=GLOBAL → permission error; // send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true. mcpH := handlers.NewMCPHandler(db.DB, broadcaster) + if memBundle != nil { + mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver) + } mcpRl := middleware.NewMCPRateLimiter(120, time.Minute, context.Background()) wsAuth.GET("/mcp/stream", mcpRl.Middleware(), mcpH.Stream) wsAuth.POST("/mcp", mcpRl.Middleware(), mcpH.Call) diff --git a/workspace/main.py b/workspace/main.py index 550d734f..0402a779 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -148,62 +148,15 @@ async def main(): # pragma: no cover heartbeat=heartbeat, ) - # 5. Setup adapter and create executor - # If setup fails, ensure heartbeat is stopped to prevent resource leak - try: - await adapter.setup(adapter_config) - executor = await adapter.create_executor(adapter_config) - - # 5a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE - # is set, exercise the executor's full import tree by calling - # execute() once with stub deps + a short timeout. Skips platform - # registration + uvicorn entirely. Returns process exit code. - from smoke_mode import is_smoke_mode, run_executor_smoke - if is_smoke_mode(): - exit_code = await run_executor_smoke(executor) - if hasattr(heartbeat, "stop"): - try: - await heartbeat.stop() - except Exception: # noqa: BLE001 - pass - raise SystemExit(exit_code) - - # 5b. Restore from pre-stop snapshot if one exists (GH#1391). - # The snapshot is scrubbed before being written, so secrets are - # already redacted — restore_state must not re-expose them. - from lib.pre_stop import read_snapshot - snapshot = read_snapshot() - if snapshot: - try: - adapter.restore_state(snapshot) - print( - f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, " - f"uptime={snapshot.get('uptime_seconds', 0)}s" - ) - except Exception as restore_err: - print(f"Warning: snapshot restore failed (continuing): {restore_err}") - except Exception: - # heartbeat hasn't started yet but may have async tasks pending - if hasattr(heartbeat, "stop"): - try: - await heartbeat.stop() - except Exception: - pass - raise - - # 5.5. Initialise Temporal durable execution wrapper (optional) - # Connects to TEMPORAL_HOST (default: localhost:7233) and starts a - # co-located Temporal worker as a background asyncio task. - # No-op with a warning log if Temporal is unreachable or temporalio - # is not installed — all tasks fall back to direct execution transparently. - from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper - temporal_wrapper = _create_temporal_wrapper() - await temporal_wrapper.start() - - # Get loaded skills for agent card (adapter may have populated them) - loaded_skills = getattr(adapter, "loaded_skills", []) - - # 6. Build Agent Card + # 5. Build the AgentCard *before* adapter.setup() so /.well-known/agent-card.json + # is reachable as soon as uvicorn binds, regardless of whether the adapter + # has working LLM credentials. Decoupling readiness ("is the workspace up?") + # from configuration ("can it actually answer?") means a workspace with a + # missing/rotated key stays REACHABLE — canvas can render a clear + # "agent not configured" error instead of "stuck booting forever," and + # operators can deprovision/redeploy normally. Skills built from + # config.skills (static names from config.yaml) up front; richer metadata + # from the adapter's loaded_skills swaps in below if setup() succeeds. machine_ip = os.environ.get("HOSTNAME", get_machine_ip()) workspace_url = f"http://{machine_ip}:{port}" @@ -237,20 +190,96 @@ async def main(): # pragma: no cover # always available and tasks/get accepts historyLength via # apply_history_length(). Don't add this kwarg back. ), + # Static skill stubs from config.yaml; replaced with rich metadata + # below if adapter.setup() loads skills successfully. skills=[ - AgentSkill( - id=skill.metadata.id, - name=skill.metadata.name, - description=skill.metadata.description, - tags=skill.metadata.tags, - examples=skill.metadata.examples, - ) - for skill in loaded_skills + AgentSkill(id=name, name=name, description=name, tags=[], examples=[]) + for name in (config.skills or []) ], default_input_modes=["text/plain", "application/json"], default_output_modes=["text/plain", "application/json"], ) + # 6. Setup adapter and create executor + # On failure: log + continue. The card route stays mounted (above); + # the JSON-RPC route below returns -32603 "agent not configured" until + # the operator fixes credentials and redeploys. Heartbeat keeps running + # so the platform sees the workspace as reachable-but-misconfigured + # rather than crash-looping. + adapter_ready = False + adapter_error: str | None = None + executor = None + try: + await adapter.setup(adapter_config) + executor = await adapter.create_executor(adapter_config) + + # 6a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE + # is set, exercise the executor's full import tree by calling + # execute() once with stub deps + a short timeout. Skips platform + # registration + uvicorn entirely. Returns process exit code. + from smoke_mode import is_smoke_mode, run_executor_smoke + if is_smoke_mode(): + exit_code = await run_executor_smoke(executor) + if hasattr(heartbeat, "stop"): + try: + await heartbeat.stop() + except Exception: # noqa: BLE001 + pass + raise SystemExit(exit_code) + + # 6b. Restore from pre-stop snapshot if one exists (GH#1391). + # The snapshot is scrubbed before being written, so secrets are + # already redacted — restore_state must not re-expose them. + from lib.pre_stop import read_snapshot + snapshot = read_snapshot() + if snapshot: + try: + adapter.restore_state(snapshot) + print( + f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, " + f"uptime={snapshot.get('uptime_seconds', 0)}s" + ) + except Exception as restore_err: + print(f"Warning: snapshot restore failed (continuing): {restore_err}") + + # 6c. Swap rich skill metadata into the card now that setup() loaded + # them. In-place mutation: a2a-sdk's create_agent_card_routes serialises + # the card on each request, so the route mounted below sees the update. + loaded_skills = getattr(adapter, "loaded_skills", None) + if loaded_skills: + agent_card.skills = [ + AgentSkill( + id=skill.metadata.id, + name=skill.metadata.name, + description=skill.metadata.description, + tags=skill.metadata.tags, + examples=skill.metadata.examples, + ) + for skill in loaded_skills + ] + adapter_ready = True + except SystemExit: + # Smoke-mode exit signal — propagate untouched. + raise + except Exception as setup_err: # noqa: BLE001 + adapter_error = f"{type(setup_err).__name__}: {setup_err}" + print( + f"WARNING: adapter.setup() failed — workspace will serve agent-card " + f"but JSON-RPC will return -32603 until configuration is fixed. " + f"Reason: {adapter_error}", + flush=True, + ) + # Heartbeat keeps running so the platform marks the workspace as + # reachable-but-misconfigured. Operators can then redeploy with the + # correct env vars without having to chase a crash-loop. + + # 6.5. Initialise Temporal durable execution wrapper (optional). Only + # meaningful when an executor exists; skipped on misconfigured boots. + if adapter_ready: + from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper + temporal_wrapper = _create_temporal_wrapper() + await temporal_wrapper.start() + # 7. Wrap in A2A. # # Regression fix (#204): PR #198 tried to wire push_config_store + @@ -262,42 +291,51 @@ async def main(): # pragma: no cover # in the AgentCard below is still advertised via AgentCapabilities so # clients know we COULD do pushes; actually implementing them requires # a concrete sender subclass, tracked as a Phase-H follow-up to #175. - handler = DefaultRequestHandler( - agent_executor=executor, - task_store=InMemoryTaskStore(), - # a2a-sdk 1.x added agent_card as a required positional/keyword - # argument — it's used internally for capability dispatch (e.g. - # routing tasks/get historyLength based on the card's protocol - # version). Pass the same agent_card we registered with the - # platform so the handler's capability surface matches what the - # AgentCard advertises. - agent_card=agent_card, - ) - - # v1: replace A2AStarletteApplication with Starlette route factory. - # rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x). - # Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also - # what the platform's a2a_proxy.go POSTs to (it forwards to the - # workspace's URL without appending a path). Card endpoint stays at - # the well-known path /.well-known/agent-card.json (handled by - # create_agent_card_routes default). routes = [] routes.extend(create_agent_card_routes(agent_card)) - # enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients - # using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase - # Pydantic field names) can talk to us without re-deploying. Outbound - # JSON-RPC wire payloads MUST also use v0.3 shape — the v0.3 compat - # adapter at /usr/local/lib/python3.11/site-packages/a2a/compat/v0_3/ - # validates against Pydantic Role enum (`agent`|`user`) and rejects - # the protobuf-style `ROLE_USER` enum names with JSON-RPC -32600 - # (Invalid Request). Native v1.x types (a2a.types.Role.ROLE_AGENT) - # are only for code that constructs Message objects in-process and - # hands them to the SDK, which serialises them correctly for the - # outbound wire format. - routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True)) + + if adapter_ready: + handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + # a2a-sdk 1.x added agent_card as a required positional/keyword + # argument — it's used internally for capability dispatch (e.g. + # routing tasks/get historyLength based on the card's protocol + # version). Pass the same agent_card we registered with the + # platform so the handler's capability surface matches what the + # AgentCard advertises. + agent_card=agent_card, + ) + # v1: replace A2AStarletteApplication with Starlette route factory. + # rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x). + # Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also + # what the platform's a2a_proxy.go POSTs to (it forwards to the + # workspace's URL without appending a path). Card endpoint stays at + # the well-known path /.well-known/agent-card.json (handled by + # create_agent_card_routes default). + # enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients + # using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase + # Pydantic field names) can talk to us without re-deploying. + routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True)) + else: + # Misconfigured: serve the card but reject JSON-RPC with -32603 so + # canvas surfaces a useful "agent not configured: " instead + # of letting requests time out. Handler factory is in its own module + # so the behavior is unit-testable (workspace/tests/test_not_configured_handler.py). + from starlette.routing import Route + from not_configured_handler import make_not_configured_handler + + routes.append( + Route("/", make_not_configured_handler(adapter_error), methods=["POST"]) + ) + app = Starlette(routes=routes) # 8. Register with platform + # When adapter.setup() failed, advertise via configuration_status so + # the platform/canvas can render "configured: false, reason: …" instead + # of a confused "ready but silent" state. + loaded_skills = getattr(adapter, "loaded_skills", None) or [] agent_card_dict = { "name": config.name, "description": config.description, @@ -311,11 +349,16 @@ async def main(): # pragma: no cover "tags": s.metadata.tags, } for s in loaded_skills + ] if adapter_ready else [ + {"id": n, "name": n, "description": n, "tags": []} + for n in (config.skills or []) ], "capabilities": { "streaming": config.a2a.streaming, "pushNotifications": config.a2a.push_notifications, }, + "configuration_status": "ready" if adapter_ready else "not_configured", + **({"configuration_error": adapter_error} if adapter_error else {}), } async with httpx.AsyncClient(timeout=10.0) as client: @@ -364,7 +407,9 @@ async def main(): # pragma: no cover # 9b. Start skills hot-reload watcher (background task) # When a skill file changes the watcher reloads the skill module and calls # back into the adapter so the next A2A request uses the updated tools. - if config.skills: + # Skipped on misconfigured boots — adapter has no executor / tool registry + # to swap into, so reloading skills would NPE on the agent rebuild path. + if adapter_ready and config.skills: try: from skill_loader.watcher import SkillsWatcher @@ -495,9 +540,13 @@ async def main(): # pragma: no cover # 10b. Schedule initial_prompt self-message after server is ready. # Only runs on first boot — creates a marker file to prevent re-execution on restart. + # Skipped on misconfigured boots: the self-message would route through the + # platform back to /, hit the -32603 not-configured handler, and consume + # the marker for a fire that can't actually run. Wait until the operator + # fixes credentials and the workspace redeploys with adapter_ready=True. initial_prompt_task = None initial_prompt_marker = resolve_initial_prompt_marker(config_path) - if config.initial_prompt and not os.path.exists(initial_prompt_marker): + if adapter_ready and config.initial_prompt and not os.path.exists(initial_prompt_marker): # Write the marker UP FRONT (#71): if the prompt later crashes or # times out, we do NOT replay on next boot — that created a # ProcessError cascade where every message kept crashing. Operators @@ -615,7 +664,9 @@ async def main(): # pragma: no cover # workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or # per-workspace to enable. idle_loop_task = None - if config.idle_prompt: + # Skipped on misconfigured boots — the self-fire would route to the + # -32603 handler in a tight loop and consume cycles for no useful work. + if adapter_ready and config.idle_prompt: # Idle-fire HTTP timeout. Kept tight relative to the fire cadence so a # hung platform doesn't accumulate dangling requests — a fire that # takes longer than the idle interval itself is almost certainly stuck. diff --git a/workspace/not_configured_handler.py b/workspace/not_configured_handler.py new file mode 100644 index 00000000..da559b62 --- /dev/null +++ b/workspace/not_configured_handler.py @@ -0,0 +1,55 @@ +"""Build a JSON-RPC handler that returns ``-32603 "agent not configured"``. + +Used by the workspace runtime when ``adapter.setup()`` fails (most often +because an LLM credential is missing or rotated). Lets ``/.well-known/agent-card.json`` +keep serving 200 — the workspace stays REACHABLE for canvas/operator +introspection — while message-send requests get a clear, immediate +error instead of silently timing out. + +Kept as its own module so the behavior is unit-testable without booting +the whole runtime (main.py is ``# pragma: no cover``). +""" +from __future__ import annotations + +from typing import Awaitable, Callable + +from starlette.requests import Request +from starlette.responses import JSONResponse + + +def make_not_configured_handler( + reason: str | None, +) -> Callable[[Request], Awaitable[JSONResponse]]: + """Return a Starlette POST handler that always 503s with JSON-RPC -32603. + + ``reason`` is surfaced in the JSON-RPC ``error.data`` field so canvas + can render "agent not configured: " to the user. Pass the + stringified ``adapter.setup()`` exception. ``None`` falls back to a + generic "adapter.setup() failed". + + The handler echoes the request's JSON-RPC ``id`` when present so a + well-behaved JSON-RPC client can correlate the error to its request. + Malformed bodies (non-JSON, missing id) get ``id: null`` per spec. + """ + + fallback = reason or "adapter.setup() failed" + + async def _handler(request: Request) -> JSONResponse: + try: + body = await request.json() + except Exception: # noqa: BLE001 + body = {} + return JSONResponse( + { + "jsonrpc": "2.0", + "id": body.get("id") if isinstance(body, dict) else None, + "error": { + "code": -32603, + "message": "Internal error: agent not configured", + "data": fallback, + }, + }, + status_code=503, + ) + + return _handler diff --git a/workspace/tests/test_not_configured_handler.py b/workspace/tests/test_not_configured_handler.py new file mode 100644 index 00000000..39483ffc --- /dev/null +++ b/workspace/tests/test_not_configured_handler.py @@ -0,0 +1,87 @@ +"""Tests for ``not_configured_handler`` — the JSON-RPC -32603 fallback the +runtime mounts when ``adapter.setup()`` fails. + +Tests the behavior end-to-end via Starlette's TestClient so the JSON-RPC +wire shape (status 503, code -32603, id-echo) is exercised the same way +canvas would see it. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +# Make workspace/ importable in test isolation — same pattern as the +# adjacent tests (test_smoke_mode.py, test_heartbeat.py). +WORKSPACE_DIR = Path(__file__).resolve().parents[1] +if str(WORKSPACE_DIR) not in sys.path: + sys.path.insert(0, str(WORKSPACE_DIR)) + +from starlette.applications import Starlette +from starlette.routing import Route +from starlette.testclient import TestClient + +from not_configured_handler import make_not_configured_handler + + +def _build_app(reason: str | None) -> TestClient: + handler = make_not_configured_handler(reason) + app = Starlette(routes=[Route("/", handler, methods=["POST"])]) + return TestClient(app) + + +def test_returns_503_with_jsonrpc_error_envelope(): + """Status 503; body is a valid JSON-RPC 2.0 error envelope.""" + client = _build_app("MINIMAX_API_KEY not set") + resp = client.post("/", json={"jsonrpc": "2.0", "id": 7, "method": "message/send"}) + assert resp.status_code == 503 + body = resp.json() + assert body["jsonrpc"] == "2.0" + assert body["error"]["code"] == -32603 + assert body["error"]["message"] == "Internal error: agent not configured" + + +def test_echoes_request_id_when_present(): + """JSON-RPC clients correlate replies via id; the handler must echo it.""" + client = _build_app("reason") + resp = client.post("/", json={"jsonrpc": "2.0", "id": "abc-123", "method": "x"}) + assert resp.json()["id"] == "abc-123" + + +def test_id_is_null_when_body_malformed(): + """Per JSON-RPC 2.0: id MUST be null when it can't be determined from + the request. Malformed bodies (non-JSON, empty, non-object) all map + to id=null.""" + client = _build_app("reason") + resp = client.post("/", content=b"not json at all", headers={"content-type": "application/json"}) + assert resp.status_code == 503 + assert resp.json()["id"] is None + + +def test_reason_surfaces_in_error_data(): + """Operators read ``error.data`` to figure out what to fix. The + setup() exception string lands there verbatim.""" + client = _build_app("RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set") + resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"}) + assert resp.json()["error"]["data"] == ( + "RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set" + ) + + +def test_none_reason_falls_back_to_generic_message(): + """If the adapter raised but we couldn't capture a reason, give the + operator a hint where to look (still better than a stuck-booting + workspace with no log line).""" + client = _build_app(None) + resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"}) + assert resp.json()["error"]["data"] == "adapter.setup() failed" + + +def test_array_body_does_not_crash_id_extraction(): + """JSON-RPC supports batch (array) requests. We don't currently + support batch in the runtime, but the handler shouldn't crash on a + batch body — it should just respond with id=null and the same -32603 + so the client sees a clear error instead of a 500.""" + client = _build_app("reason") + resp = client.post("/", json=[{"jsonrpc": "2.0", "id": 1, "method": "x"}]) + assert resp.status_code == 503 + assert resp.json()["id"] is None