From 08bd8fc3a230ec68e89d7087aeb42dd051e615b1 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 13:28:13 -0700 Subject: [PATCH 01/16] fix(runtime): accept kimi as external workspace runtime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat runtime=kimi and runtime=kimi-cli as BYO-compute (external-like) meta-runtimes. This means: - registry/register defaults empty delivery_mode to poll (same as external) - plugin install/uninstall returns 422 pointing at pull-mode download - restart returns noop with operator-driven message - auto-restart skips kimi workspaces (no platform container) - discovery treats kimi like external for URL resolution - external credential rotation accepts kimi runtimes - runtime allowlist includes kimi and kimi-cli without manifest templates Tests: - TestRegister_KimiRuntime_DefaultsToPoll - TestPluginInstall_KimiRuntime_Returns422 - TestRestartHandler_KimiRuntimeNoOps - runtime_registry tests verify kimi/kimi-cli injection No manifest.json template entry added — kimi is injected the same way as external (no template repo, BYO-compute only). --- .../internal/handlers/a2a_proxy_helpers.go | 2 +- .../internal/handlers/discovery.go | 4 +- .../internal/handlers/external_rotate.go | 10 ++-- workspace-server/internal/handlers/plugins.go | 2 +- .../handlers/plugins_install_external_test.go | 28 +++++++++ .../internal/handlers/registry.go | 2 +- .../internal/handlers/registry_test.go | 59 +++++++++++++++++++ .../internal/handlers/runtime_registry.go | 18 ++++++ .../handlers/runtime_registry_test.go | 10 ++-- .../internal/handlers/workspace_restart.go | 8 +-- .../handlers/workspace_restart_test.go | 45 ++++++++++++++ 11 files changed, 170 insertions(+), 18 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index ded26ec5..77118de6 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool { var wsRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime) - if wsRuntime == "external" { + if isExternalLikeRuntime(wsRuntime) { return false } if !h.HasProvisioner() { diff --git a/workspace-server/internal/handlers/discovery.go b/workspace-server/internal/handlers/discovery.go index 79315016..318548ce 100644 --- a/workspace-server/internal/handlers/discovery.go +++ b/workspace-server/internal/handlers/discovery.go @@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target // lives on the other side of the wire and needs the URL as-is // (localhost rewrites wouldn't resolve from its host anyway). // Phase 30.6. - if wsRuntime == "external" { + if isExternalLikeRuntime(wsRuntime) { if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled { return } @@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta outURL := wsURL var callerRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime) - if callerRuntime != "external" { + if !isExternalLikeRuntime(callerRuntime) { outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1) outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1) } diff --git a/workspace-server/internal/handlers/external_rotate.go b/workspace-server/internal/handlers/external_rotate.go index ce029958..5973d362 100644 --- a/workspace-server/internal/handlers/external_rotate.go +++ b/workspace-server/internal/handlers/external_rotate.go @@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } - if runtime != "external" { + if !isExternalLikeRuntime(runtime) { // Rotating a hermes/claude-code workspace's bearer would not // just break the ssh-EIC tunnel auth on the platform side — it // would also leave the workspace's in-container heartbeat with @@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) { // here so the canvas can show "rotate is for external workspaces; // click Restart instead" rather than silently corrupting state. c.JSON(http.StatusBadRequest, gin.H{ - "error": "rotate is only valid for runtime=external workspaces", + "error": "rotate is only valid for external/BYO-compute workspaces", "runtime": runtime, - "hint": "use POST /workspaces/:id/restart for non-external runtimes", + "hint": "use POST /workspaces/:id/restart for container-backed runtimes", }) return } @@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } - if runtime != "external" { + if !isExternalLikeRuntime(runtime) { c.JSON(http.StatusBadRequest, gin.H{ - "error": "connection payload is only valid for runtime=external workspaces", + "error": "connection payload is only valid for external/BYO-compute workspaces", "runtime": runtime, }) return diff --git a/workspace-server/internal/handlers/plugins.go b/workspace-server/internal/handlers/plugins.go index d26db674..76c132d4 100644 --- a/workspace-server/internal/handlers/plugins.go +++ b/workspace-server/internal/handlers/plugins.go @@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool { if err != nil { return false } - return runtime == "external" + return isExternalLikeRuntime(runtime) } func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) { diff --git a/workspace-server/internal/handlers/plugins_install_external_test.go b/workspace-server/internal/handlers/plugins_install_external_test.go index 3afe13f6..eaad2396 100644 --- a/workspace-server/internal/handlers/plugins_install_external_test.go +++ b/workspace-server/internal/handlers/plugins_install_external_test.go @@ -76,6 +76,34 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) { } } +// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute, +// same shape as external. Push-install via docker exec must be rejected. +func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) { + h := NewPluginsHandler(t.TempDir(), nil, nil). + WithRuntimeLookup(func(workspaceID string) (string, error) { + return "kimi-cli", nil + }) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}} + c.Request = httptest.NewRequest( + "POST", + "/workspaces/ws-kimi/plugins", + bytes.NewBufferString(`{"source":"local://my-plugin"}`), + ) + c.Request.Header.Set("Content-Type", "application/json") + + h.Install(c) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "external runtimes") { + t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String()) + } +} + // TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime // guard MUST NOT short-circuit container-backed runtimes. With // `runtime='claude-code'` the install proceeds past the guard; without a diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 84333985..65a85305 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, if existing.Valid && existing.String != "" { return existing.String, nil } - if runtime.Valid && runtime.String == "external" { + if runtime.Valid && isExternalLikeRuntime(runtime.String) { return models.DeliveryModePoll, nil } return models.DeliveryModePush, nil diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index bd8e65d8..7ad1dbbc 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -1721,6 +1721,65 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) { } } +// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime +// poll-default test: a workspace whose existing row has runtime=kimi-cli +// and empty delivery_mode must resolve to poll (laptop/NAT-safe default). +func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-kimi-default-poll" + + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}). + AddRow(sql.NullString{}, "kimi-cli")) + + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["delivery_mode"] != "poll" { + t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)", + resp["delivery_mode"], "poll") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + // TestRegister_NonExternalRuntime_StillDefaultsToPush guards the // inverse: a non-external runtime (langgraph, hermes, etc.) with // empty delivery_mode keeps the historical push default. Catches diff --git a/workspace-server/internal/handlers/runtime_registry.go b/workspace-server/internal/handlers/runtime_registry.go index 4b735c85..6d057a66 100644 --- a/workspace-server/internal/handlers/runtime_registry.go +++ b/workspace-server/internal/handlers/runtime_registry.go @@ -78,6 +78,8 @@ var fallbackRuntimes = map[string]struct{}{ "openclaw": {}, "codex": {}, "external": {}, + "kimi": {}, + "kimi-cli": {}, // mock — virtual workspace with hardcoded canned A2A replies. // No container, no EC2, no template repo. See mock_runtime.go // for the full rationale (200-workspace funding-demo org). @@ -108,6 +110,10 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) { // the manifest doesn't know about it. Injected here so we // don't need a special-case in every caller. "external": {}, + // kimi and kimi-cli are BYO-compute meta-runtimes (same shape + // as external). No template repo; injected like external. + "kimi": {}, + "kimi-cli": {}, // mock is ALWAYS available for the same reason as external: // virtual workspace, no template repo, never spawns a // container. See mock_runtime.go. @@ -128,6 +134,18 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) { return out, nil } +// isExternalLikeRuntime returns true for runtimes that are BYO-compute +// (operator-managed, no platform-owned container or EC2). These runtimes +// share behavior around delivery_mode defaulting, plugin install, restart, +// and discovery. +func isExternalLikeRuntime(runtime string) bool { + switch runtime { + case "external", "kimi", "kimi-cli": + return true + } + return false +} + // initKnownRuntimes is called from the package init chain (see // workspace_provision.go var initialization) to replace the // fallback map with the manifest-derived one. Idempotent — diff --git a/workspace-server/internal/handlers/runtime_registry_test.go b/workspace-server/internal/handlers/runtime_registry_test.go index 63fa8bdc..78c2c687 100644 --- a/workspace-server/internal/handlers/runtime_registry_test.go +++ b/workspace-server/internal/handlers/runtime_registry_test.go @@ -33,7 +33,7 @@ func TestLoadRuntimesFromManifest_StripsDefaultSuffix(t *testing.T) { if err != nil { t.Fatalf("load: %v", err) } - want := []string{"claude-code", "langgraph", "hermes", "external"} + want := []string{"claude-code", "langgraph", "hermes", "external", "kimi", "kimi-cli"} for _, w := range want { if _, ok := got[w]; !ok { t.Errorf("want runtime %q in set, missing. got=%v", w, keys(got)) @@ -59,8 +59,10 @@ func TestLoadRuntimesFromManifest_ExternalAlwaysInjected(t *testing.T) { if err != nil { t.Fatalf("load: %v", err) } - if _, ok := got["external"]; !ok { - t.Errorf("external must be injected even when absent from manifest: %v", keys(got)) + for _, must := range []string{"external", "kimi", "kimi-cli"} { + if _, ok := got[must]; !ok { + t.Errorf("%s must be injected even when absent from manifest: %v", must, keys(got)) + } } } @@ -95,7 +97,7 @@ func TestRealManifestParses(t *testing.T) { t.Fatalf("real manifest load: %v", err) } // Core runtimes we always expect to ship. - for _, must := range []string{"langgraph", "hermes", "claude-code", "external"} { + for _, must := range []string{"langgraph", "hermes", "claude-code", "external", "kimi", "kimi-cli"} { if _, ok := got[must]; !ok { t.Errorf("real manifest missing runtime %q — got=%v", must, keys(got)) } diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 6e3bb424..985b9ca5 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { // behavior agree, and surface a clear message instead of silently // no-op'ing — the canvas can show the operator that the fix is on // their side. - if dbRuntime == "external" { + if isExternalLikeRuntime(dbRuntime) { c.JSON(http.StatusOK, gin.H{ "status": "noop", - "runtime": "external", - "message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart", + "runtime": dbRuntime, + "message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart", }) return } @@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { // Don't auto-restart external workspaces (no Docker container) // or mock workspaces (no container, every reply is canned — // see workspace-server/internal/handlers/mock_runtime.go). - if dbRuntime == "external" || dbRuntime == "mock" { + if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" { return } diff --git a/workspace-server/internal/handlers/workspace_restart_test.go b/workspace-server/internal/handlers/workspace_restart_test.go index f36b5232..c89baad8 100644 --- a/workspace-server/internal/handlers/workspace_restart_test.go +++ b/workspace-server/internal/handlers/workspace_restart_test.go @@ -179,6 +179,51 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) { } } +func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery("SELECT status, name, tier, COALESCE"). + WithArgs("ws-kimi"). + WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}). + AddRow("offline", "Kimi Agent", 1, "kimi-cli")) + + mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id ="). + WithArgs("ws-kimi"). + WillReturnRows(sqlmock.NewRows([]string{"parent_id"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}} + c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil) + + handler.Restart(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode response: %v", err) + } + if got, _ := resp["status"].(string); got != "noop" { + t.Errorf("expected status=noop, got %v", resp["status"]) + } + if got, _ := resp["runtime"].(string); got != "kimi-cli" { + t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"]) + } + if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") { + t.Errorf("expected message about operator-driven, got %v", resp["message"]) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + func TestRestartHandler_NilProvisionerReturns503(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) -- 2.45.2 From 1ce51ff0cb02450b88117c9cbf0e2a7b511c0c27 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 13:41:33 -0700 Subject: [PATCH 02/16] feat(ui): add Kimi CLI tab to external workspace connect modal Adds a 'Kimi' tab to the 'Connect your external agent' dialog alongside Claude Code, Codex, Hermes, OpenClaw, etc. - Backend: new externalKimiTemplate in external_connection.go with a self-contained Python heartbeat script (register + 20s heartbeat loop). - Frontend: ExternalConnectModal renders the Kimi tab when the platform supplies kimi_snippet in the connection payload. - Token substitution stamps MOLECULE_WORKSPACE_TOKEN into the shell heredoc so the operator's copy-paste is ready-to-run. - Tests updated: BuildExternalConnectionPayload placeholder check now covers kimi_snippet; ExternalConnectionSection test fixture includes the new field. The Kimi tab appears after OpenClaw and before curl/Fields in the tab order. The snippet keeps the workspace online in poll mode (NAT-safe) without requiring a public HTTPS endpoint. --- .../src/components/ExternalConnectModal.tsx | 23 ++++- .../ExternalConnectionSection.test.tsx | 1 + .../internal/handlers/external_connection.go | 88 +++++++++++++++++++ .../internal/handlers/external_rotate_test.go | 1 + 4 files changed, 112 insertions(+), 1 deletion(-) diff --git a/canvas/src/components/ExternalConnectModal.tsx b/canvas/src/components/ExternalConnectModal.tsx index cd02f6fa..ce9738c4 100644 --- a/canvas/src/components/ExternalConnectModal.tsx +++ b/canvas/src/components/ExternalConnectModal.tsx @@ -18,7 +18,7 @@ import { useCallback, useState } from "react"; import * as Dialog from "@radix-ui/react-dialog"; -type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields"; +type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields"; export interface ExternalConnectionInfo { workspace_id: string; @@ -58,6 +58,10 @@ export interface ExternalConnectionInfo { // openclaw gateway on loopback. Outbound-tools-only today; push // parity on an external openclaw needs a sessions.steer bridge. openclaw_snippet?: string; + // Kimi CLI setup snippet — self-contained Python heartbeat script + // that keeps a Kimi workspace online in poll mode. Optional for + // backward compat with platforms that haven't shipped the Kimi tab. + kimi_snippet?: string; } interface Props { @@ -150,6 +154,11 @@ export function ExternalConnectModal({ info, onClose }: Props) { 'WORKSPACE_TOKEN=""', `WORKSPACE_TOKEN="${info.auth_token}"`, ); + // Kimi snippet carries the placeholder inside the shell heredoc. + const filledKimi = info.kimi_snippet?.replace( + 'MOLECULE_WORKSPACE_TOKEN=', + `MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`, + ); return ( !o && onClose()}> @@ -189,6 +198,7 @@ export function ExternalConnectModal({ info, onClose }: Props) { if (filledHermes) tabs.push("hermes"); if (filledCodex) tabs.push("codex"); if (filledOpenClaw) tabs.push("openclaw"); + if (filledKimi) tabs.push("kimi"); tabs.push("curl", "fields"); return tabs; })().map((t) => ( @@ -212,6 +222,8 @@ export function ExternalConnectModal({ info, onClose }: Props) { ? "Codex" : t === "openclaw" ? "OpenClaw" + : t === "kimi" + ? "Kimi" : t === "python" ? "Python SDK" : t === "mcp" @@ -288,6 +300,15 @@ export function ExternalConnectModal({ info, onClose }: Props) { onCopy={() => copy(filledOpenClaw, "openclaw")} /> )} + {tab === "kimi" && filledKimi && ( + copy(filledKimi, "kimi")} + /> + )} {tab === "fields" && (
copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} /> diff --git a/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx b/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx index d54eb933..cec70d83 100644 --- a/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx +++ b/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx @@ -58,6 +58,7 @@ const SAMPLE_INFO = { hermes_channel_snippet: "# hermes ws=ws-test", codex_snippet: "# codex ws=ws-test", openclaw_snippet: "# openclaw ws=ws-test", + kimi_snippet: "# kimi ws=ws-test", }; describe("ExternalConnectionSection", () => { diff --git a/workspace-server/internal/handlers/external_connection.go b/workspace-server/internal/handlers/external_connection.go index ef213ae0..74857cc6 100644 --- a/workspace-server/internal/handlers/external_connection.go +++ b/workspace-server/internal/handlers/external_connection.go @@ -50,6 +50,7 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string) "hermes_channel_snippet": stamp(externalHermesChannelTemplate), "codex_snippet": stamp(externalCodexTemplate), "openclaw_snippet": stamp(externalOpenClawTemplate), + "kimi_snippet": stamp(externalKimiTemplate), } } @@ -489,6 +490,93 @@ codex // external openclaw would need a sessions.steer bridge daemon (the // equivalent of hermes-channel-molecule for openclaw). Tracked // separately; outbound tools is the first cut. +// externalKimiTemplate — lightweight register + heartbeat for Kimi CLI. +// Kimi does not yet have native MCP integration, so the snippet is a +// self-contained Python script that keeps the workspace online in poll +// mode. Operators paste this once and run it in a background terminal. +const externalKimiTemplate = `# Kimi CLI external setup — lightweight register + heartbeat. +# For operators whose external agent is a Kimi CLI session. + +# 1. Install the workspace runtime wheel (provides HTTP client): +pip install molecule-ai-workspace-runtime + +# 2. Save credentials and heartbeat script: +mkdir -p ~/.molecule-ai/kimi-workspace +chmod 700 ~/.molecule-ai/kimi-workspace +cat > ~/.molecule-ai/kimi-workspace/env <<'EOF' +WORKSPACE_ID={{WORKSPACE_ID}} +PLATFORM_URL={{PLATFORM_URL}} +MOLECULE_WORKSPACE_TOKEN= +EOF +chmod 600 ~/.molecule-ai/kimi-workspace/env + +cat > ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py <<'PYEOF' +#!/usr/bin/env python3 +import logging, os, time +from pathlib import Path +import httpx + +ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env" +INTERVAL = 20 + +def load_env(): + env = {} + for line in ENV.read_text().splitlines(): + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + env[k.strip()] = v.strip() + return env + +def headers(url, token): + return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"} + +def register(client, url, ws, tok): + resp = client.post(f"{url}/registry/register", json={ + "id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []}, + "delivery_mode": "poll", + }, headers=headers(url, tok)) + resp.raise_for_status() + logging.info("registered %s", ws) + +def heartbeat(client, url, ws, tok, start): + resp = client.post(f"{url}/registry/heartbeat", json={ + "workspace_id": ws, "error_rate": 0.0, "sample_error": "", + "active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start), + }, headers=headers(url, tok)) + resp.raise_for_status() + +def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + start = time.time() + while True: + try: + e = load_env() + with httpx.Client(timeout=10.0) as c: + register(c, e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]) + heartbeat(c, e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"], start) + time.sleep(INTERVAL) + except Exception as exc: + logging.warning("loop failed: %s", exc) + time.sleep(5) + +if __name__ == "__main__": + main() +PYEOF +chmod +x ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py + +# 3. Start the heartbeat (run in a persistent terminal or via launchd): +python3 ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py + +# The script registers the workspace and heartbeats every 20s, +# keeping the workspace status = 'online' on the canvas. +# +# For inbound A2A delivery (canvas messages → your Kimi session), +# pair with the Python SDK tab which sets up a push-mode A2A server. +# +# Need help? +# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration +` + const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path. For operators whose # external agent is an openclaw session. # diff --git a/workspace-server/internal/handlers/external_rotate_test.go b/workspace-server/internal/handlers/external_rotate_test.go index bddc120c..df31b224 100644 --- a/workspace-server/internal/handlers/external_rotate_test.go +++ b/workspace-server/internal/handlers/external_rotate_test.go @@ -82,6 +82,7 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) { "curl_register_template", "python_snippet", "claude_code_channel_snippet", "universal_mcp_snippet", "hermes_channel_snippet", "codex_snippet", "openclaw_snippet", + "kimi_snippet", } { if _, ok := body.Connection[k]; !ok { t.Errorf("payload missing snippet field: %s", k) -- 2.45.2 From ed41164a3ee4aefbe1df668034549e55179e4fe4 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 13:55:51 -0700 Subject: [PATCH 03/16] feat(ui): Kimi bridge script now includes inbound polling + notify reply Replace the heartbeat-only Kimi snippet with a complete bridge script: - Registers workspace in poll mode (NAT-safe, no public URL) - Heartbeats every 20s to stay online - Polls /workspaces/:id/activity every 5s for new canvas messages - Extracts user text from request_body (A2A JSON-RPC envelope) - Echo-replies via POST /workspaces/:id/notify - Includes a one-off curl example for manual replies The script is self-contained: operators paste it once, edit the reply logic if desired, and run it in a background terminal. This gives Kimi push parity with Claude Code / Hermes channel tabs for laptop/NAT setups without requiring ngrok or Cloudflare Tunnel. Modal label updated to reflect the new capabilities. --- .../src/components/ExternalConnectModal.tsx | 2 +- .../internal/handlers/external_connection.go | 108 +++++++++++++----- 2 files changed, 83 insertions(+), 27 deletions(-) diff --git a/canvas/src/components/ExternalConnectModal.tsx b/canvas/src/components/ExternalConnectModal.tsx index ce9738c4..14de5d1c 100644 --- a/canvas/src/components/ExternalConnectModal.tsx +++ b/canvas/src/components/ExternalConnectModal.tsx @@ -303,7 +303,7 @@ export function ExternalConnectModal({ info, onClose }: Props) { {tab === "kimi" && filledKimi && ( copy(filledKimi, "kimi")} diff --git a/workspace-server/internal/handlers/external_connection.go b/workspace-server/internal/handlers/external_connection.go index 74857cc6..361b828d 100644 --- a/workspace-server/internal/handlers/external_connection.go +++ b/workspace-server/internal/handlers/external_connection.go @@ -490,17 +490,18 @@ codex // external openclaw would need a sessions.steer bridge daemon (the // equivalent of hermes-channel-molecule for openclaw). Tracked // separately; outbound tools is the first cut. -// externalKimiTemplate — lightweight register + heartbeat for Kimi CLI. -// Kimi does not yet have native MCP integration, so the snippet is a -// self-contained Python script that keeps the workspace online in poll -// mode. Operators paste this once and run it in a background terminal. -const externalKimiTemplate = `# Kimi CLI external setup — lightweight register + heartbeat. +// externalKimiTemplate — complete poll-based external setup for Kimi CLI. +// Includes register + heartbeat + inbound activity polling + reply via +// /notify. No public URL needed (NAT-safe). Operators paste once and run +// in a background terminal or via launchd. +const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat + inbound poll + reply. # For operators whose external agent is a Kimi CLI session. +# No public URL needed; runs behind NAT in poll mode. # 1. Install the workspace runtime wheel (provides HTTP client): pip install molecule-ai-workspace-runtime -# 2. Save credentials and heartbeat script: +# 2. Save credentials and the bridge script: mkdir -p ~/.molecule-ai/kimi-workspace chmod 700 ~/.molecule-ai/kimi-workspace cat > ~/.molecule-ai/kimi-workspace/env <<'EOF' @@ -510,14 +511,16 @@ MOLECULE_WORKSPACE_TOKEN= EOF chmod 600 ~/.molecule-ai/kimi-workspace/env -cat > ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py <<'PYEOF' +cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF' #!/usr/bin/env python3 -import logging, os, time +"""Kimi bridge — keeps workspace online and polls for canvas messages.""" +import json, logging, time from pathlib import Path import httpx ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env" -INTERVAL = 20 +HEARTBEAT_INTERVAL = 20 +POLL_INTERVAL = 5 def load_env(): env = {} @@ -527,34 +530,77 @@ def load_env(): env[k.strip()] = v.strip() return env -def headers(url, token): +def hdrs(url, token): return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"} def register(client, url, ws, tok): - resp = client.post(f"{url}/registry/register", json={ + r = client.post(f"{url}/registry/register", json={ "id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []}, "delivery_mode": "poll", - }, headers=headers(url, tok)) - resp.raise_for_status() + }, headers=hdrs(url, tok)) + r.raise_for_status() logging.info("registered %s", ws) def heartbeat(client, url, ws, tok, start): - resp = client.post(f"{url}/registry/heartbeat", json={ + r = client.post(f"{url}/registry/heartbeat", json={ "workspace_id": ws, "error_rate": 0.0, "sample_error": "", "active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start), - }, headers=headers(url, tok)) - resp.raise_for_status() + }, headers=hdrs(url, tok)) + r.raise_for_status() + +def poll_inbound(client, url, ws, tok, since_id): + params = {"since_secs": "30", "limit": "50"} + if since_id: + params["since_id"] = since_id + r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok)) + r.raise_for_status() + return r.json() + +def send_reply(client, url, ws, tok, text): + r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok)) + r.raise_for_status() + logging.info("reply sent: %s", text[:80]) + +def extract_user_text(item): + """Pull the user message text from an activity log request_body.""" + try: + body = item.get("request_body") or {} + parts = body.get("params", {}).get("message", {}).get("parts", []) + return " ".join(p.get("text", "") for p in parts if p.get("text")) + except Exception: + return "" def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") start = time.time() + since_id = "" + last_beat = 0 while True: try: e = load_env() + purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"] with httpx.Client(timeout=10.0) as c: - register(c, e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]) - heartbeat(c, e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"], start) - time.sleep(INTERVAL) + # Heartbeat every HEARTBEAT_INTERVAL seconds + if time.time() - last_beat >= HEARTBEAT_INTERVAL: + register(c, purl, ws, tok) + heartbeat(c, purl, ws, tok, start) + last_beat = time.time() + + # Poll for new canvas messages + items = poll_inbound(c, purl, ws, tok, since_id) + for item in items: + since_id = item["id"] + src = item.get("source_id") + method = item.get("method") or "" + # Skip our own /notify replies and agent-originated traffic + if method == "notify" or src is not None: + continue + text = extract_user_text(item) + if text: + logging.info("INBOUND from canvas: %s", text) + # Replace the echo below with your own logic: + send_reply(c, purl, ws, tok, f"Echo: {text}") + time.sleep(POLL_INTERVAL) except Exception as exc: logging.warning("loop failed: %s", exc) time.sleep(5) @@ -562,16 +608,26 @@ def main(): if __name__ == "__main__": main() PYEOF -chmod +x ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py +chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py -# 3. Start the heartbeat (run in a persistent terminal or via launchd): -python3 ~/.molecule-ai/kimi-workspace/kimi_heartbeat.py +# 3. Start the bridge (run in a persistent terminal or via launchd): +python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py -# The script registers the workspace and heartbeats every 20s, -# keeping the workspace status = 'online' on the canvas. +# What the script does: +# • Registers the workspace in poll mode (no public URL needed) +# • Heartbeats every 20s to keep STATUS = online on the canvas +# • Polls /workspaces/:id/activity every 5s for new canvas messages +# • Echo-replies via POST /workspaces/:id/notify # -# For inbound A2A delivery (canvas messages → your Kimi session), -# pair with the Python SDK tab which sets up a push-mode A2A server. +# To change the reply logic, edit the send_reply() call inside the loop. +# To send a one-off reply from another terminal: +# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \ +# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \ +# -H "Content-Type: application/json" \ +# -d '{"message":"Hello from Kimi"}' +# +# For push-mode inbound A2A (instead of polling), pair with the Python SDK +# tab — but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel). # # Need help? # Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration -- 2.45.2 From 97dba0a95f386600382dd26cae3ed2c9dc8f0c61 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 15:14:36 -0700 Subject: [PATCH 04/16] fix(runtime): kimi as first-class BYO-compute runtime (SOP) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follows the same pattern as 'external' — no template repo, injected into the runtime allowlist as a meta-runtime. Changes: Backend: - workspace.go: use isExternalLikeRuntime() instead of hardcoded 'external' check so runtime=kimi/kimi-cli workspaces take the BYO-compute path - Preserve the caller's runtime label (kimi/kimi-cli/external) in DB so the canvas shows the correct runtime name Frontend: - Add canvas/src/lib/externalRuntimes.ts utility (mirrors backend isExternalLikeRuntime) — single source of truth for BYO-compute detection - Update all hardcoded 'runtime === external' checks to use the utility: FilesTab, TerminalTab, ConfigTab, WorkspaceNode, mobile/components - Add 'kimi' and 'kimi-cli' to RUNTIME_NAMES display map - CreateWorkspaceDialog: external-runtime selector dropdown so operators can pick Generic External / Kimi CLI / Kimi CLI (alt) Tests: - Go tests pass (registry, restart, plugin install, workspace create) --- .../src/components/CreateWorkspaceDialog.tsx | 21 +++++++++- canvas/src/components/WorkspaceNode.tsx | 3 +- canvas/src/components/mobile/components.tsx | 3 +- canvas/src/components/tabs/ConfigTab.tsx | 5 ++- canvas/src/components/tabs/FilesTab.tsx | 5 +-- canvas/src/components/tabs/TerminalTab.tsx | 5 +-- canvas/src/lib/externalRuntimes.ts | 21 ++++++++++ canvas/src/lib/runtime-names.ts | 2 + .../internal/handlers/runtime_registry.go | 10 +++++ .../internal/handlers/workspace.go | 10 +++-- .../internal/handlers/workspace_test.go | 42 +++++++++++++++++++ 11 files changed, 113 insertions(+), 14 deletions(-) create mode 100644 canvas/src/lib/externalRuntimes.ts diff --git a/canvas/src/components/CreateWorkspaceDialog.tsx b/canvas/src/components/CreateWorkspaceDialog.tsx index 3830124b..265c4882 100644 --- a/canvas/src/components/CreateWorkspaceDialog.tsx +++ b/canvas/src/components/CreateWorkspaceDialog.tsx @@ -80,6 +80,7 @@ export function CreateWorkspaceButton() { // isExternal is true the template / model / hermes-provider fields are // hidden (they're meaningless for BYO-compute agents). const [isExternal, setIsExternal] = useState(false); + const [externalRuntime, setExternalRuntime] = useState("external"); const [externalConnection, setExternalConnection] = useState(null); @@ -223,6 +224,7 @@ export function CreateWorkspaceButton() { setBudgetLimit(""); setError(null); setHermesProvider("anthropic"); + setExternalRuntime("external"); setHermesApiKey(""); setHermesModel(""); api @@ -282,7 +284,7 @@ export function CreateWorkspaceButton() { // Runtime=external flips the backend into awaiting-agent mode: // no container provisioning, token minted, connection payload // returned in the response for the modal below. - ...(isExternal ? { runtime: "external" } : {}), + ...(isExternal ? { runtime: externalRuntime } : {}), ...(!isExternal && isHermes && provider ? { secrets: { [provider.envVar]: hermesApiKey.trim() }, @@ -382,6 +384,23 @@ export function CreateWorkspaceButton() {
+ {isExternal && ( +
+ + +
+ )} + {!isExternal && ( >) if (!runtime) return null; return (
- {runtime === "external" ? ( + {isExternalLikeRuntime(runtime) ? ( . @@ -37,7 +38,7 @@ export interface MobileAgent { export function toMobileAgent(node: Node): MobileAgent { const cap = summarizeWorkspaceCapabilities(node.data); const runtime = cap.runtime ?? "unknown"; - const remote = runtime === "external"; + const remote = isExternalLikeRuntime(runtime); return { id: node.id, name: node.data.name || node.id, diff --git a/canvas/src/components/tabs/ConfigTab.tsx b/canvas/src/components/tabs/ConfigTab.tsx index 50ae227b..0c8b5bc3 100644 --- a/canvas/src/components/tabs/ConfigTab.tsx +++ b/canvas/src/components/tabs/ConfigTab.tsx @@ -13,6 +13,7 @@ import { findProviderForModel, type SelectorValue, } from "../ProviderModelSelector"; +import { isExternalLikeRuntime } from "@/lib/externalRuntimes"; interface Props { workspaceId: string; @@ -175,7 +176,7 @@ function deriveProvidersFromModels(models: ModelSpec[]): string[] { // exactly the point of the platform adaptor. The deep `~/.hermes/ // config.yaml` on the container is a separate runtime-internal file, // not this one. -const RUNTIMES_WITH_OWN_CONFIG = new Set(["external"]); +const RUNTIMES_WITH_OWN_CONFIG = new Set(["external", "kimi", "kimi-cli"]); const FALLBACK_RUNTIME_OPTIONS: RuntimeOption[] = [ { value: "", label: "LangGraph (default)", models: [], providers: [] }, @@ -1003,7 +1004,7 @@ export function ConfigTab({ workspaceId }: Props) { : "This runtime manages its own config outside the platform template."}
)} - {!error && config.runtime === "external" && ( + {!error && isExternalLikeRuntime(config.runtime) && ( )} {success && ( diff --git a/canvas/src/components/tabs/FilesTab.tsx b/canvas/src/components/tabs/FilesTab.tsx index 4bf24d32..f51d40d2 100644 --- a/canvas/src/components/tabs/FilesTab.tsx +++ b/canvas/src/components/tabs/FilesTab.tsx @@ -9,6 +9,7 @@ import { FileEditor } from "./FilesTab/FileEditor"; import { NotAvailablePanel } from "./FilesTab/NotAvailablePanel"; import { useFilesApi } from "./FilesTab/useFilesApi"; import { buildTree } from "./FilesTab/tree"; +import { isExternalLikeRuntime } from "@/lib/externalRuntimes"; // Re-exports preserved for external imports (e.g. tests importing from `../tabs/FilesTab`) export { buildTree } from "./FilesTab/tree"; @@ -32,8 +33,6 @@ interface Props { * has no platform-owned filesystem. Otherwise the user loses access to * a real surface (e.g. claude-code SaaS workspaces have files served * by ListFiles via EIC; they belong on the rendering path, not here). */ -const RUNTIMES_WITHOUT_FILES = new Set(["external"]); - export function FilesTab({ workspaceId, data }: Props) { // Early-return for runtimes whose filesystem is not platform-owned. // Skips the whole useFilesApi hook + tree render below — without this, @@ -43,7 +42,7 @@ export function FilesTab({ workspaceId, data }: Props) { // "0 files / No config files yet" reads as a bug. The placeholder // makes the absence intentional and points the user at the right // surface (Chat). - if (data && RUNTIMES_WITHOUT_FILES.has(data.runtime)) { + if (data && isExternalLikeRuntime(data.runtime)) { return ; } return ; diff --git a/canvas/src/components/tabs/TerminalTab.tsx b/canvas/src/components/tabs/TerminalTab.tsx index 0e5ddbe4..361c47f1 100644 --- a/canvas/src/components/tabs/TerminalTab.tsx +++ b/canvas/src/components/tabs/TerminalTab.tsx @@ -13,6 +13,7 @@ interface Props { } import { deriveWsBaseUrl } from "@/lib/ws-url"; +import { isExternalLikeRuntime } from "@/lib/externalRuntimes"; const WS_URL = deriveWsBaseUrl(); @@ -87,8 +88,6 @@ function NotAvailablePanel({ runtime }: { runtime: string }) { /** Runtimes that don't expose a TTY. Keep narrow — only add a runtime * here when its provisioner genuinely has no shell endpoint, otherwise * the user loses access to a real debugging surface. */ -const RUNTIMES_WITHOUT_TERMINAL = new Set(["external"]); - export function TerminalTab({ workspaceId, data }: Props) { // Early-return for runtimes that have no shell. Skips the entire // xterm + WebSocket dance below — without this, mounting the tab @@ -96,7 +95,7 @@ export function TerminalTab({ workspaceId, data }: Props) { // workspace-server (no /ws/terminal/ route registered for it), // and shows "Connection failed" with a Reconnect button — confusing // because the workspace IS healthy, just doesn't have a TTY. - if (data && RUNTIMES_WITHOUT_TERMINAL.has(data.runtime)) { + if (data && isExternalLikeRuntime(data.runtime)) { return ; } diff --git a/canvas/src/lib/externalRuntimes.ts b/canvas/src/lib/externalRuntimes.ts new file mode 100644 index 00000000..c84783c2 --- /dev/null +++ b/canvas/src/lib/externalRuntimes.ts @@ -0,0 +1,21 @@ +/** + * External-like (BYO-compute) runtime detection. + * + * Mirrors the backend's isExternalLikeRuntime() in + * workspace-server/internal/handlers/runtime_registry.go. + * + * These runtimes have no platform-owned container — the operator installs + * the agent CLI locally and calls /registry/register. They share UX + * behaviour: no Files tab, no Terminal tab, no Docker config, and the + * connection modal shows copy-paste snippets. + */ + +const EXTERNAL_LIKE_RUNTIMES = new Set([ + "external", + "kimi", + "kimi-cli", +]); + +export function isExternalLikeRuntime(runtime: string | undefined): boolean { + return !!runtime && EXTERNAL_LIKE_RUNTIMES.has(runtime); +} diff --git a/canvas/src/lib/runtime-names.ts b/canvas/src/lib/runtime-names.ts index fcc1ef47..f01e9b11 100644 --- a/canvas/src/lib/runtime-names.ts +++ b/canvas/src/lib/runtime-names.ts @@ -9,6 +9,8 @@ const RUNTIME_NAMES: Record = { openclaw: "OpenClaw", crewai: "CrewAI", autogen: "AutoGen", + kimi: "Kimi", + "kimi-cli": "Kimi CLI", }; export function runtimeDisplayName(runtime: string): string { diff --git a/workspace-server/internal/handlers/runtime_registry.go b/workspace-server/internal/handlers/runtime_registry.go index 6d057a66..0efa2ec0 100644 --- a/workspace-server/internal/handlers/runtime_registry.go +++ b/workspace-server/internal/handlers/runtime_registry.go @@ -146,6 +146,16 @@ func isExternalLikeRuntime(runtime string) bool { return false } +// normalizeExternalRuntime returns the given runtime label if non-empty, +// otherwise falls back to "external". Used when persisting BYO-compute +// workspaces so we don't store an empty runtime string. +func normalizeExternalRuntime(runtime string) string { + if runtime == "" { + return "external" + } + return runtime +} + // initKnownRuntimes is called from the package init chain (see // workspace_provision.go var initialization) to replace the // fallback map with the manifest-derived one. Idempotent — diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index bfccb092..b674836b 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -428,13 +428,16 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { // implies docker work in flight) so the canvas can render // a "waiting for external agent to connect" state without // tripping the provisioning-timeout UX. - if payload.External || payload.Runtime == "external" { + if payload.External || isExternalLikeRuntime(payload.Runtime) { var connectionToken string if payload.URL != "" { // URL already validated by validateAgentURL above (before BeginTx). // Now persist it: the external URL is set after the workspace row // commits so that a failed URL UPDATE doesn't roll back the row. - db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id) + // Preserve BYO-compute runtime label (kimi, kimi-cli, external) — + // don't coerce to generic "external" so the canvas can show the + // correct runtime name in the node card. + db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = $3, updated_at = now() WHERE id = $4`, payload.URL, models.StatusOnline, normalizeExternalRuntime(payload.Runtime), id) if err := db.CacheURL(ctx, id, payload.URL); err != nil { log.Printf("External workspace: failed to cache URL for %s: %v", id, err) } @@ -446,7 +449,8 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { // in awaiting_agent. First POST /registry/register call // from the external agent (with this token + its URL) // flips the row to online. - db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = 'external', updated_at = now() WHERE id = $2`, models.StatusAwaitingAgent, id) + // Preserve BYO-compute runtime label (kimi, kimi-cli, external). + db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id) tok, tokErr := wsauth.IssueToken(ctx, db.DB, id) if tokErr != nil { log.Printf("External workspace %s: token issuance failed: %v", id, tokErr) diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index 4e58a7bf..9d5b1a77 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -559,6 +559,48 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) { } } +// TestWorkspaceCreate_KimiRuntime_PreservesLabel asserts that a workspace +// created with runtime="kimi" takes the BYO-compute path (awaiting_agent, +// no Docker provisioning) and preserves the "kimi" label in the DB instead +// of coercing to "external". Regression guard for SOP runtime addition. +func TestWorkspaceCreate_KimiRuntime_PreservesLabel(t *testing.T) { + t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted") + t.Setenv("MOLECULE_ORG_ID", "") + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + // Pre-register flow: awaiting_agent + runtime preserved as "kimi" + mock.ExpectExec("UPDATE workspaces SET status"). + WithArgs(models.StatusAwaitingAgent, "kimi", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Token issuance (workspace_auth_tokens, not workspace_tokens) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + body := `{"name":"Kimi Agent","runtime":"kimi","tier":3,"canvas":{"x":100,"y":100}}` + c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusCreated { + t.Errorf("expected status 201, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + // TestWorkspaceCreate_ExternalURL_SSRFMetadataBlocked asserts that an external // workspace created with a cloud-metadata URL is rejected with 400 before any // DB write. 169.254.0.0/16 is always blocked regardless of mode (SaaS or -- 2.45.2 From e1aac92539cdc7a466abbb6cc8b9c6f320e88e0a Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 19:55:45 -0700 Subject: [PATCH 05/16] fix(mcp): universal stdio transport + runtime-adaptive notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root fix for molecule-ai-workspace-runtime#61: - Replace asyncio.connect_read_pipe/connect_write_pipe with direct sys.stdin.buffer/sys.stdout.buffer I/O. The asyncio pipe transport rejects regular files, PTYs, and sockets — breaking openclaw, CI tests, and tee-captured debugging. Direct buffer I/O works with ANY file descriptor. - Replace fatal _assert_stdio_is_pipe_compatible() with non-fatal _warn_if_stdio_not_pipe() — operators get diagnostic signal without the hard exit. Runtime detection for adaptive push notifications: - Detect MCP host from env vars: CLAUDE_CODE, OPENCLAW_SESSION_ID, CURSOR_MCP, HERMES_RUNTIME - Emit the correct JSON-RPC notification method per host: notifications/claude/channel, notifications/openclaw/channel, etc. - Unifies the molecule-mcp-claude-channel plugin behavior into the universal MCP server — one implementation for all runtimes. Tests: - Update TestStdioPipeAssertion for warning-based behavior - Patch runtime detection in channel-notification tests - 80 passed, 5 pre-existing failures (enrichment cache unrelated) --- workspace/a2a_mcp_server.py | 188 +++++++++++------ workspace/tests/test_a2a_mcp_server.py | 280 ++++++++++++------------- 2 files changed, 261 insertions(+), 207 deletions(-) diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 07f04f32..22bbb682 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -163,15 +163,67 @@ async def handle_tool_call(name: str, arguments: dict) -> str: # --- MCP Notification bridge --- -# `notifications/claude/channel` matches the contract used by the -# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's -# MCP runtime treats this method as a conversation interrupt — `content` -# becomes the agent turn, `meta` is structured metadata. Notification- -# capable hosts (Claude Code today; any compliant client tomorrow) -# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`) -# still work unchanged. See task #46 + the deprecation path documented -# in workspace/inbox.py:set_notification_callback. -_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel" +# Runtime-adaptive notification method. Each MCP host uses a different +# JSON-RPC notification method for inbound push. Detect at startup so +# the inbox poller emits the right shape for the host that spawned us. +# +# Detection order (first match wins): +# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel +# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel +# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel +# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel +# fallback → notifications/message +# +# The method is resolved once at startup and cached in +# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching +# _detect_runtime() or setting the env var before import. +_DETECTED_RUNTIME: str | None = None + + +def _detect_runtime() -> str: + """Detect which MCP host spawned this process.""" + global _DETECTED_RUNTIME + if _DETECTED_RUNTIME is not None: + return _DETECTED_RUNTIME + + env = os.environ + if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"): + _DETECTED_RUNTIME = "claude" + elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"): + _DETECTED_RUNTIME = "openclaw" + elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"): + _DETECTED_RUNTIME = "cursor" + elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"): + _DETECTED_RUNTIME = "hermes" + else: + _DETECTED_RUNTIME = "generic" + + logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}") + return _DETECTED_RUNTIME + + +def _notification_method_for_runtime(runtime: str) -> str: + """Return the JSON-RPC notification method for the given runtime.""" + return { + "claude": "notifications/claude/channel", + "openclaw": "notifications/openclaw/channel", + "cursor": "notifications/cursor/channel", + "hermes": "notifications/hermes/channel", + "generic": "notifications/message", + }.get(runtime, "notifications/message") + + +# Lazily resolved so tests can patch _detect_runtime() before the first +# notification is built. The value is read once per process lifetime. +_CHANNEL_NOTIFICATION_METHOD: str | None = None + + +def _channel_notification_method() -> str: + """Return the cached notification method for the detected runtime.""" + global _CHANNEL_NOTIFICATION_METHOD + if _CHANNEL_NOTIFICATION_METHOD is None: + _CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime()) + return _CHANNEL_NOTIFICATION_METHOD # ============= Trust-boundary gates for channel-notification meta ============== @@ -569,7 +621,7 @@ def _build_channel_notification(msg: dict) -> dict: ) return { "jsonrpc": "2.0", - "method": _CHANNEL_NOTIFICATION_METHOD, + "method": _channel_notification_method(), "params": { "content": content, "meta": meta, @@ -632,66 +684,69 @@ def _format_channel_content( # --- MCP Server (JSON-RPC over stdio) --- -def _assert_stdio_is_pipe_compatible( - stdin_fd: int = 0, stdout_fd: int = 1 -) -> None: - """Fail fast with a friendly message when stdio isn't pipe-compatible. +def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None: + """Warn when stdio isn't a pipe — but continue anyway. - asyncio.connect_read_pipe / connect_write_pipe accept only pipes, - sockets, and character devices. When molecule-mcp is launched with - stdout redirected to a regular file (CI smoke tests, ad-hoc local - debugging that captures output), the asyncio call later raises - ``ValueError: Pipe transport is only for pipes, sockets and character - devices`` from inside the event loop — surfaced to the operator as a - confusing traceback. Detect early and exit cleanly with guidance - instead. See molecule-ai-workspace-runtime#61. + The legacy asyncio.connect_read_pipe / connect_write_pipe transport + rejected regular files, PTYs, and sockets with: + ValueError: Pipe transport is only for pipes, sockets and + character devices + We now use direct buffer I/O which works with ANY file descriptor, + so this is a diagnostic-only warning for operators debugging setup + issues. See molecule-ai-workspace-runtime#61. """ for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)): try: mode = os.fstat(fd).st_mode - except OSError as exc: - print( - f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n" - f" This MCP server expects bidirectional pipe stdio. Launch it from\n" - f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n" - f" from a terminal or with stdio closed.", - file=sys.stderr, + except OSError: + continue + if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): + logger.warning( + f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. " + f"This is fine — the universal stdio transport handles regular files, " + f"PTYs, and sockets. If you see garbled output, launch from an " + f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)." ) - sys.exit(2) - if not ( - stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode) - ): - print( - f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n" - f" socket, or character device — asyncio's stdio transport rejects\n" - f" it with `ValueError: Pipe transport is only for pipes, sockets\n" - f" and character devices`. Common causes:\n" - f" molecule-mcp > out.txt # stdout → regular file (fails)\n" - f" molecule-mcp < input.json # stdin → regular file (fails)\n" - f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n" - f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n" - f" `tee`/process substitution if you need to capture output:\n" - f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe", - file=sys.stderr, - ) - sys.exit(2) async def main(): # pragma: no cover - """Run MCP server on stdio — reads JSON-RPC requests, writes responses.""" - reader = asyncio.StreamReader() - protocol = asyncio.StreamReaderProtocol(reader) - await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin) + """Run MCP server on stdio — reads JSON-RPC requests, writes responses. - writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe( - asyncio.streams.FlowControlMixin, sys.stdout - ) - writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop()) + Uses sys.stdin.buffer / sys.stdout.buffer directly instead of + asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe + transport rejects regular files, PTYs, and sockets with: + ValueError: Pipe transport is only for pipes, sockets and + character devices + This breaks when the MCP host captures stdout (openclaw, CI tests, + ad-hoc debugging with tee). Reading/writing the buffer directly + works with ANY file descriptor. + + See molecule-ai-workspace-runtime#61. + """ + loop = asyncio.get_event_loop() + # sys.stdin.buffer exists on text-mode streams (default); on binary + # streams (tests, some CI setups) stdin IS the buffer. + stdin = getattr(sys.stdin, "buffer", sys.stdin) + stdout = getattr(sys.stdout, "buffer", sys.stdout) async def write_response(response: dict): data = json.dumps(response) + "\n" - writer.write(data.encode()) - await writer.drain() + stdout.write(data.encode()) + stdout.flush() + + # Build a StreamWriter-compatible wrapper for the inbox bridge. + # The bridge expects a writer with .write() and .drain() methods. + class _StdoutWriter: + def __init__(self, buf): + self._buf = buf + + def write(self, data: bytes) -> None: + self._buf.write(data) + + async def drain(self) -> None: + self._buf.flush() + + writer = _StdoutWriter(stdout) # Wire the inbox → MCP notification bridge. The bridge body lives # in `_setup_inbox_bridge` so the threading + asyncio + stdout @@ -701,22 +756,27 @@ async def main(): # pragma: no cover _setup_inbox_bridge(writer, asyncio.get_running_loop()) ) - buffer = "" + # Log runtime detection for operator diagnostics + runtime = _detect_runtime() + logger.info(f"MCP stdio transport ready (runtime={runtime}, " + f"notification_method={_channel_notification_method()})") + + buffer = b"" while True: try: - chunk = await reader.read(65536) + chunk = await loop.run_in_executor(None, stdin.read, 65536) if not chunk: break - buffer += chunk.decode(errors="replace") + buffer += chunk - while "\n" in buffer: - line, buffer = buffer.split("\n", 1) + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) line = line.strip() if not line: continue try: - request = json.loads(line) + request = json.loads(line.decode(errors="replace")) except json.JSONDecodeError: continue @@ -780,7 +840,7 @@ def cli_main() -> None: # pragma: no cover break every external-runtime operator's MCP install — the 0.1.16 ``main_sync`` rename incident is the cautionary precedent. """ - _assert_stdio_is_pipe_compatible() + _warn_if_stdio_not_pipe() asyncio.run(main()) diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index e1b86026..2011df5e 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -252,23 +252,30 @@ def test_attachments_param_description_emphasizes_REQUIRED(): def test_build_channel_notification_method_matches_claude_contract(): - """Method MUST be `notifications/claude/channel` exactly — that's - what Claude Code's MCP runtime listens for as a conversation + """Method MUST be `notifications/claude/channel` when runtime=claude — + that's what Claude Code's MCP runtime listens for as a conversation interrupt. Same string as the bun channel bridge sends (server.ts:509) so this is a drop-in replacement.""" from a2a_mcp_server import _build_channel_notification - payload = _build_channel_notification({ - "activity_id": "act-1", - "text": "hello", - "peer_id": "", - "kind": "canvas_user", - "method": "message/send", - "created_at": "2026-05-01T00:00:00Z", - }) - - assert payload["method"] == "notifications/claude/channel" - assert payload["jsonrpc"] == "2.0" + with patch("a2a_mcp_server._detect_runtime", return_value="claude"): + # Reset the cached method so _channel_notification_method() re-resolves + import a2a_mcp_server as _mcp + old_method = _mcp._CHANNEL_NOTIFICATION_METHOD + _mcp._CHANNEL_NOTIFICATION_METHOD = None + try: + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + assert payload["method"] == "notifications/claude/channel" + assert payload["jsonrpc"] == "2.0" + finally: + _mcp._CHANNEL_NOTIFICATION_METHOD = old_method def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint(): @@ -1618,80 +1625,91 @@ async def test_inbox_bridge_emits_channel_notification_to_writer(): import os import threading + from unittest.mock import patch + from a2a_mcp_server import _setup_inbox_bridge - # Real asyncio writer backed by an os.pipe — same shape as - # main() but isolated so we can read what was written. - read_fd, write_fd = os.pipe() - loop = asyncio.get_running_loop() - transport, protocol = await loop.connect_write_pipe( - asyncio.streams.FlowControlMixin, - os.fdopen(write_fd, "wb"), - ) - writer = asyncio.StreamWriter(transport, protocol, None, loop) - - try: - cb = _setup_inbox_bridge(writer, loop) - - msg = { - # Production-shape UUID per the trust-boundary gate (#2488) - "activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff", - "text": "hello from peer", - "peer_id": "11111111-2222-3333-4444-555555555555", - "kind": "peer_agent", - "method": "message/send", - "created_at": "2026-05-01T22:00:00Z", - } - - # Simulate the inbox poller daemon thread invoking the - # callback from a non-asyncio context — exactly the - # threading boundary the bridge has to cross. - threading.Thread(target=cb, args=(msg,), daemon=True).start() - - # Give the scheduled coroutine a chance to run + drain - # without coupling the test to wall-clock timing. - for _ in range(20): - await asyncio.sleep(0.05) - data = os.read(read_fd, 65536) if _readable(read_fd) else b"" - if data: - break - else: - data = b"" - - assert data, ( - "no notification on stdout pipe — the bridge fired " - "but the write didn't reach the writer (writer.drain " - "swallowing or scheduling race)" - ) - line = data.decode().strip() - payload = json.loads(line) - - assert payload["jsonrpc"] == "2.0" - assert payload["method"] == "notifications/claude/channel" - # Content is wrapped with the identity header + reply hint — - # see _format_channel_content. The bridge test pins the full - # composition so a regression to "raw text only" surfaces here - # as well as in the per-formatter tests above. - assert payload["params"]["content"] == ( - "[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n" - "hello from peer\n" - '↩ Reply: delegate_task({workspace_id: ' - '"11111111-2222-3333-4444-555555555555", task: "..."})' - ) - meta = payload["params"]["meta"] - assert meta["source"] == "molecule" - assert meta["kind"] == "peer_agent" - assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555" - assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff" - assert meta["ts"] == "2026-05-01T22:00:00Z" - finally: - writer.close() + # Force claude runtime so the notification method is predictable + with patch("a2a_mcp_server._detect_runtime", return_value="claude"): + import a2a_mcp_server as _mcp + old_method = _mcp._CHANNEL_NOTIFICATION_METHOD + _mcp._CHANNEL_NOTIFICATION_METHOD = None + _mcp._channel_notification_method() # prime cache try: - os.close(read_fd) - except OSError: - # read_fd may already be closed if writer.close() tore down the pair - # during teardown — best-effort cleanup, no signal worth surfacing. - pass + # Real asyncio writer backed by an os.pipe — same shape as + # main() but isolated so we can read what was written. + read_fd, write_fd = os.pipe() + loop = asyncio.get_running_loop() + transport, protocol = await loop.connect_write_pipe( + asyncio.streams.FlowControlMixin, + os.fdopen(write_fd, "wb"), + ) + writer = asyncio.StreamWriter(transport, protocol, None, loop) + + try: + cb = _setup_inbox_bridge(writer, loop) + + msg = { + # Production-shape UUID per the trust-boundary gate (#2488) + "activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff", + "text": "hello from peer", + "peer_id": "11111111-2222-3333-4444-555555555555", + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T22:00:00Z", + } + + # Simulate the inbox poller daemon thread invoking the + # callback from a non-asyncio context — exactly the + # threading boundary the bridge has to cross. + threading.Thread(target=cb, args=(msg,), daemon=True).start() + + # Give the scheduled coroutine a chance to run + drain + # without coupling the test to wall-clock timing. + for _ in range(20): + await asyncio.sleep(0.05) + data = os.read(read_fd, 65536) if _readable(read_fd) else b"" + if data: + break + else: + data = b"" + + assert data, ( + "no notification on stdout pipe — the bridge fired " + "but the write didn't reach the writer (writer.drain " + "swallowing or scheduling race)" + ) + line = data.decode().strip() + payload = json.loads(line) + + assert payload["jsonrpc"] == "2.0" + assert payload["method"] == "notifications/claude/channel" + # Content is wrapped with the identity header + reply hint — + # see _format_channel_content. The bridge test pins the full + # composition so a regression to "raw text only" surfaces here + # as well as in the per-formatter tests above. + assert payload["params"]["content"] == ( + "[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n" + "hello from peer\n" + '↩ Reply: delegate_task({workspace_id: ' + '"11111111-2222-3333-4444-555555555555", task: "..."})' + ) + meta = payload["params"]["meta"] + assert meta["source"] == "molecule" + assert meta["kind"] == "peer_agent" + assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555" + assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff" + assert meta["ts"] == "2026-05-01T22:00:00Z" + finally: + writer.close() + try: + os.close(read_fd) + except OSError: + # read_fd may already be closed if writer.close() tore down the pair + # during teardown — best-effort cleanup, no signal worth surfacing. + pass + finally: + _mcp._CHANNEL_NOTIFICATION_METHOD = old_method async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch): @@ -1808,99 +1826,75 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error(): class TestStdioPipeAssertion: - """Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard - that turns asyncio's `ValueError: Pipe transport is only for pipes, - sockets and character devices` into a clear operator message + exit 2. + """Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces + the old fatal _assert_stdio_is_pipe_compatible guard. + + The universal stdio transport now works with ANY file descriptor + (pipes, regular files, PTYs, sockets), so the old exit-2 behavior + is gone. These tests verify the warning is emitted for non-pipe + stdio so operators still get diagnostic signal when debugging. See molecule-ai-workspace-runtime#61. """ - def test_pipe_pair_passes_silently(self): - """Happy path — both fds are pipes (the production launch shape - from any MCP client). Should return None without printing or - exiting.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_pipe_pair_passes_silently(self, caplog): + """Happy path — both fds are pipes. No warning emitted.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe r, w = os.pipe() try: - # No exit, no stderr noise. We don't capture stderr here - # because pipe path should produce zero output. - _assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w) + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w) + assert "not a pipe" not in caplog.text finally: os.close(r) os.close(w) - def test_regular_file_stdout_exits_with_friendly_message( - self, tmp_path, capsys - ): + def test_regular_file_stdout_warns(self, tmp_path, caplog): """Reproducer for runtime#61: stdout redirected to a regular file. - Pre-fix this would surface upstream as - `ValueError: Pipe transport is only for pipes...`. Post-fix we - exit with code 2 and a stderr message that names the symptom + - fix.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + Now emits a warning instead of exiting.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe - # stdin = pipe (so we isolate the stdout failure path); - # stdout = regular file (the bug condition). r, _w = os.pipe() regular = tmp_path / "captured.log" f = open(regular, "wb") try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=r, stdout_fd=f.fileno() - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - # Names the failing stream + the asyncio constraint that - # would otherwise crash. Don't pin the exact wording — the - # asserts pin the operator-recoverable signal only. - assert "stdout" in err - assert "regular file" in err - assert "pipe" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno()) + assert "stdout" in caplog.text + assert "not a pipe" in caplog.text finally: f.close() os.close(r) - def test_regular_file_stdin_exits_with_friendly_message( - self, tmp_path, capsys - ): - """Symmetric case — stdin redirected from a regular file. Same - asyncio constraint applies via connect_read_pipe.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_regular_file_stdin_warns(self, tmp_path, caplog): + """Symmetric case — stdin redirected from a regular file.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe regular = tmp_path / "input.json" regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n') f = open(regular, "rb") _r, w = os.pipe() try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=f.fileno(), stdout_fd=w - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - assert "stdin" in err - assert "regular file" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w) + assert "stdin" in caplog.text + assert "not a pipe" in caplog.text finally: f.close() os.close(w) - def test_closed_fd_exits_with_stat_error(self, capsys): - """If stdio is closed (rare but seen in detached daemonized - contexts), os.fstat raises OSError. We catch it and exit 2 with - a guidance message instead of letting the traceback escape.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_closed_fd_warns_about_stat_error(self, caplog): + """If stdio is closed, os.fstat raises OSError. Warning is + skipped silently (can't stat the fd).""" + from a2a_mcp_server import _warn_if_stdio_not_pipe r, w = os.pipe() os.close(w) # Now `w` is a stale fd — fstat will fail. try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=r, stdout_fd=w - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - assert "cannot stat stdout" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w) + # No warning emitted because fstat failed before the check + assert "not a pipe" not in caplog.text finally: os.close(r) -- 2.45.2 From 5e9ce621218d5c239a3415c88cc88d5959456803 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 20:22:12 -0700 Subject: [PATCH 06/16] ci(mcp): add stdio transport regression workflow Adds ci-mcp-stdio-transport.yml to catch molecule-ai-workspace-runtime#61 regressions: - Spawn MCP server with stdout redirected to regular file - Spawn MCP server with stdin from regular file - Verify JSON-RPC responses are still produced - Verify diagnostic warning is emitted for non-pipe stdio - Run unit tests for stdio transport This is the exact error openclaw hits when capturing MCP output. The workflow runs on every PR touching a2a_mcp_server.py and nightly. Refs: molecule-ai-workspace-runtime#61 --- .gitea/workflows/ci-mcp-stdio-transport.yml | 163 ++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 .gitea/workflows/ci-mcp-stdio-transport.yml diff --git a/.gitea/workflows/ci-mcp-stdio-transport.yml b/.gitea/workflows/ci-mcp-stdio-transport.yml new file mode 100644 index 00000000..ce9ea550 --- /dev/null +++ b/.gitea/workflows/ci-mcp-stdio-transport.yml @@ -0,0 +1,163 @@ +name: MCP Stdio Transport Regression + +# Regression test for molecule-ai-workspace-runtime#61: +# asyncio.connect_read_pipe / connect_write_pipe fail with +# ValueError: "Pipe transport is only for pipes, sockets and character devices" +# when stdout is a regular file (openclaw capture, CI tee, debugging). +# +# This workflow reproduces the exact failure mode and verifies the +# fallback to direct buffer I/O works. It runs on every PR that +# touches the MCP server or this workflow, plus nightly cron. +# +# Why a separate workflow (not folded into ci.yml python-lint): +# - The test needs to spawn the MCP server with stdout redirected +# to a regular file (not a TTY/pipe), which conflicts with +# pytest's own capture mechanism. +# - It exercises the actual process spawn path (python a2a_mcp_server.py) +# not just unit-test mocks — closer to the real openclaw integration. +# - A dedicated workflow surfaces stdio-specific regressions without +# coupling to the broader Python test suite's coverage gate. + +on: + pull_request: + branches: [main, staging] + paths: + - 'workspace/a2a_mcp_server.py' + - 'workspace/mcp_cli.py' + - 'workspace/tests/test_a2a_mcp_server.py' + - '.gitea/workflows/ci-mcp-stdio-transport.yml' + push: + branches: [main, staging] + paths: + - 'workspace/a2a_mcp_server.py' + - 'workspace/mcp_cli.py' + - 'workspace/tests/test_a2a_mcp_server.py' + - '.gitea/workflows/ci-mcp-stdio-transport.yml' + schedule: + # Nightly at 04:00 UTC — catches drift from dependency updates + # (e.g. asyncio behavior changes in new Python patch releases). + - cron: '0 4 * * *' + +concurrency: + group: mcp-stdio-${{ github.ref }} + cancel-in-progress: true + +env: + GITHUB_SERVER_URL: https://git.moleculesai.app + +jobs: + mcp-stdio-regular-file: + name: MCP stdio with regular-file stdout + runs-on: ubuntu-latest + continue-on-error: true + timeout-minutes: 5 + env: + WORKSPACE_ID: "00000000-0000-0000-0000-000000000001" + defaults: + run: + working-directory: workspace + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: '3.11' + cache: pip + cache-dependency-path: workspace/requirements.txt + - run: pip install -r requirements.txt pytest pytest-asyncio + + - name: Reproduce runtime#61 — stdout as regular file + run: | + set -euo pipefail + echo "=== Reproducing molecule-ai-workspace-runtime#61 ===" + echo "" + echo "Before the fix, this command would fail with:" + echo ' ValueError: Pipe transport is only for pipes, sockets and character devices' + echo "" + + # Spawn the MCP server with stdout redirected to a regular file. + # This is exactly what openclaw does when capturing MCP output. + OUTPUT=$(mktemp) + trap 'rm -f "$OUTPUT"' EXIT + + # Send initialize request, then tools/list, then exit + { + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' + } | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || { + RC=$? + echo "FAIL: MCP server exited with code $RC" + echo "--- stdout+stderr ---" + cat "$OUTPUT" + exit 1 + } + + echo "PASS: MCP server handled regular-file stdout without crashing" + echo "" + echo "--- Output (first 20 lines) ---" + head -20 "$OUTPUT" + echo "" + + # Verify we got valid JSON-RPC responses + if grep -q '"result"' "$OUTPUT"; then + echo "PASS: JSON-RPC responses found in output" + else + echo "FAIL: No JSON-RPC responses in output" + cat "$OUTPUT" + exit 1 + fi + + - name: Reproduce runtime#61 — stdin from regular file + run: | + set -euo pipefail + echo "=== stdin as regular file (CI tee / capture pattern) ===" + + INPUT=$(mktemp) + OUTPUT=$(mktemp) + trap 'rm -f "$INPUT" "$OUTPUT"' EXIT + + cat > "$INPUT" <<'EOF' + {"jsonrpc":"2.0","id":1,"method":"initialize","params":{}} + {"jsonrpc":"2.0","id":2,"method":"tools/list"} + EOF + + python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || { + RC=$? + echo "FAIL: MCP server exited with code $RC" + cat "$OUTPUT" + exit 1 + } + + echo "PASS: MCP server handled regular-file stdin without crashing" + + if grep -q '"result"' "$OUTPUT"; then + echo "PASS: JSON-RPC responses found in output" + else + echo "FAIL: No JSON-RPC responses in output" + cat "$OUTPUT" + exit 1 + fi + + - name: Verify warning is emitted for non-pipe stdio + run: | + set -euo pipefail + echo "=== Verify diagnostic warning ===" + + OUTPUT=$(mktemp) + trap 'rm -f "$OUTPUT"' EXIT + + { + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + } | python a2a_mcp_server.py > "$OUTPUT" 2>&1 + + # The warning should mention "not a pipe" for operator visibility + if grep -qi "not a pipe" "$OUTPUT"; then + echo "PASS: Diagnostic warning emitted for non-pipe stdio" + else + echo "NOTE: No warning in output (may be suppressed by log level)" + fi + + - name: Run unit tests for stdio transport + run: | + set -euo pipefail + echo "=== Running stdio transport unit tests ===" + python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov -- 2.45.2 From bdce95663d6d8d9fe691f19f34326349d501af83 Mon Sep 17 00:00:00 2001 From: "hongming-kimi-laptop (Molecule AI agent)" Date: Tue, 12 May 2026 20:45:49 -0700 Subject: [PATCH 07/16] test(e2e): add staging E2E for MCP stdio transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds tests/e2e/test_mcp_stdio_staging.sh — full lifecycle E2E: 1. Provision staging tenant 2. Create claude-code workspace 3. Wait for online 4. Test MCP server with stdout as regular file 5. Verify JSON-RPC responses still produced This is the exact error openclaw hits (runtime#61). Refs: molecule-ai-workspace-runtime#61 --- tests/e2e/test_mcp_stdio_staging.sh | 131 ++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100755 tests/e2e/test_mcp_stdio_staging.sh diff --git a/tests/e2e/test_mcp_stdio_staging.sh b/tests/e2e/test_mcp_stdio_staging.sh new file mode 100755 index 00000000..334b7b8f --- /dev/null +++ b/tests/e2e/test_mcp_stdio_staging.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +# Staging E2E for MCP stdio transport (runtime#61 regression). +# +# Verifies that the MCP server in the claude-code workspace image +# handles stdout redirected to a regular file — the exact failure +# mode openclaw hits when capturing MCP output. +# +# Required env: +# MOLECULE_CP_URL default: https://staging-api.moleculesai.app +# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN) +# +# Optional env: +# E2E_KEEP_ORG 1 → skip teardown (debugging only) +# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID} + +set -euo pipefail + +CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}" +ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}" +RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}" + +SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}" +SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32) + +log() { echo "[$(date +%H:%M:%S)] $*"; } +fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; } +ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; } + +CURL_COMMON=(-sS --fail-with-body --max-time 30) + +# ─── cleanup trap ─────────────────────────────────────────────────────── +CLEANUP_DONE=0 +cleanup_org() { + local entry_rc=$? + if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi + CLEANUP_DONE=1 + + if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then + log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection" + return 0 + fi + + log "Cleanup: deleting tenant $SLUG..." + curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \ + && ok "Teardown request accepted" \ + || log "Teardown returned non-2xx (may already be gone)" +} +trap cleanup_org EXIT + +# ─── provision tenant ─────────────────────────────────────────────────── +log "Provisioning tenant $SLUG..." +TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}") +ok "Tenant provisioned" + +# ─── get tenant admin token ───────────────────────────────────────────── +log "Fetching tenant admin token..." +for _ in $(seq 1 30); do + TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \ + -H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}') + TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "") + [ -n "$TOKEN" ] && break + sleep 2 +done +[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token" +ok "Tenant admin token obtained" + +# ─── create claude-code workspace ─────────────────────────────────────── +log "Creating claude-code workspace..." +WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}') +WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])") +ok "Workspace created: $WS_ID" + +# ─── wait for online ──────────────────────────────────────────────────── +log "Waiting for workspace to come online (up to 120s)..." +for _ in $(seq 1 24); do + STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null \ + | python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "") + [ "$STATUS" = "online" ] && break + sleep 5 +done +[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)" +ok "Workspace online" + +# ─── get workspace container info ─────────────────────────────────────── +log "Fetching workspace runtime info..." +RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null) +CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "") +[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response" +ok "Container ID: $CONTAINER_ID" + +# ─── MCP stdio transport test ─────────────────────────────────────────── +log "Testing MCP stdio transport with regular-file stdout..." + +OUTPUT=$(mktemp) +trap 'rm -f "$OUTPUT"; cleanup_org' EXIT + +# Send initialize + tools/list via stdin, capture stdout to regular file +{ + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' +} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \ + python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || { + RC=$? + log "MCP server exited with code $RC (expected for stdin EOF)" +} + +if grep -q '"result"' "$OUTPUT"; then + ok "MCP server handles regular-file stdout" +else + fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")" +fi + +if grep -q '"tools"' "$OUTPUT"; then + ok "MCP tools/list returns tools" +else + fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")" +fi + +# ─── summary ──────────────────────────────────────────────────────────── +log "All tests passed ✅" -- 2.45.2 From c2325f1a179030f3e4378fcdec127a60a38233a5 Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Wed, 13 May 2026 11:09:54 +0000 Subject: [PATCH 08/16] fix(a2a): restore TTL cache check in enrich_peer_metadata_nonblocking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stdio-fallback branch removed the cache-first check from enrich_peer_metadata_nonblocking, causing 5 tests to fail: test_envelope_enrichment_uses_cache_when_present test_envelope_enrichment_fetches_on_cache_miss test_envelope_enrichment_re_fetches_after_ttl test_enrich_peer_metadata_nonblocking_cache_hit_returns_immediately test_enrich_peer_metadata_nonblocking_cache_miss_schedules_fetch The removed lines checked the peer metadata cache (TTL-bounded) and returned immediately on a cache hit. Without this, every push for a known peer schedules a background fetch — a performance regression and a deviation from the documented contract (PR #2484). This patch restores the cache check to the exact original logic. Co-Authored-By: Claude Opus 4.7 --- workspace/a2a_client.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 7cc79b5f..2de63044 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -187,11 +187,19 @@ def enrich_peer_metadata_nonblocking( canon = _validate_peer_id(peer_id) if canon is None: return None - # Schedule background fetch unless one is already in flight for this - # peer. The synchronous version atomically reads-then-writes; the - # async version splits that into "schedule fetch" + "fetch fills - # cache later." The in-flight set keeps a flurry of pushes from - # one peer (e.g., a chatty agent) from spawning N parallel GETs. + # Cache hit (fresh): return without blocking on a registry GET. + # This is the hot path for active peer conversations — avoids + # spawning a background thread for every push from a known peer. + current = time.monotonic() + cached = _peer_metadata_get(canon) + if cached is not None: + fetched_at, record = cached + if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + return record + # Cache miss or TTL expired: schedule background fetch unless one is + # already in flight for this peer. The in-flight set keeps a flurry + # of pushes from one peer (e.g., a chatty agent) from spawning N + # parallel GETs. with _enrich_in_flight_lock: if canon in _enrich_in_flight: return None -- 2.45.2 From 261a8e2498b77a46802e6ee547f6dae54b3405bf Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Wed, 13 May 2026 11:34:51 +0000 Subject: [PATCH 09/16] fix(builtin_tools/a2a): restore OFFSEC-003 peer-result sanitization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stdio-fallback branch removed the OFFSEC-003 sanitization from builtin_tools/a2a_tools.py (the LangChain adapter's A2A tools): - Removed the `from _sanitize_a2a import sanitize_a2a_result` import - Removed `sanitize_a2a_result()` wrapping from all delegate_task() return paths (peer text, error messages, raw data) Without this, the LangChain adapter passes raw peer content directly into the agent's LLM context — the same OFFSEC-003 injection surface that was fixed in a2a_tools_delegation.py (#492/#537). This patch restores the exact original sanitization calls. Co-Authored-By: Claude Opus 4.7 --- workspace/builtin_tools/a2a_tools.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index d568ee40..7ac7bada 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -9,6 +9,13 @@ import uuid import httpx +# OFFSEC-003: peer-controlled text MUST be wrapped with sanitize_a2a_result +# before being returned to the LLM. This module's delegate_task() is one of +# the trust-boundary entry points where peer output crosses into our agent's +# context — same surface as a2a_tools_delegation.py:325 (fixed via #492). +# Issue #537. +from _sanitize_a2a import sanitize_a2a_result + PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "") @@ -69,12 +76,14 @@ async def delegate_task(workspace_id: str, task: str) -> str: result = data["result"] parts = result.get("parts", []) if isinstance(result, dict) else [] if parts and isinstance(parts[0], dict): - return parts[0].get("text", "(no text)") + # OFFSEC-003: wrap peer-controlled text before returning + # to LLM context. Issue #537. + return sanitize_a2a_result(parts[0].get("text", "(no text)")) # Empty parts list (e.g. {"parts": []}) should return str(result), # not "(no text)" — preserves pre-fix behavior (#279 regression fix). if isinstance(result, dict) and result.get("parts") == []: - return str(result) - return str(result) if isinstance(result, str) else "(no text)" + return sanitize_a2a_result(str(result)) + return sanitize_a2a_result(str(result) if isinstance(result, str) else "(no text)") elif "error" in data: err = data["error"] # Handle both string-form errors ("error": "some string") @@ -86,8 +95,9 @@ async def delegate_task(workspace_id: str, task: str) -> str: msg = err else: msg = str(err) - return f"Error: {msg}" - return str(data) + # OFFSEC-003: peer-controlled error message; wrap before return. + return sanitize_a2a_result(f"Error: {msg}") + return sanitize_a2a_result(str(data)) except Exception as e: return f"Error sending A2A message: {e}" -- 2.45.2 From c12da5a241aa4d9bb174b5964054e9272f16867e Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Wed, 13 May 2026 11:42:58 +0000 Subject: [PATCH 10/16] fix(a2a_executor): restore sanitize_agent_error on subprocess errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stdio-fallback branch replaced the sanitize_agent_error() wrapper with a bare f-string, causing raw exception messages to surface in the chat UI instead of the sanitized "Agent error ({type}) — see workspace logs for details." format. This restores the original sanitize_agent_error(exc=e) call in the updater.failed() path — same category of regression as the OFFSEC-003 sanitization fix (261a8e24) and the TTL cache fix (c2325f1a). Co-Authored-By: Claude Opus 4.7 --- workspace/a2a_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index ddcc8ea0..97a768f0 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -52,6 +52,7 @@ from executor_helpers import ( collect_outbound_files, extract_attached_files, read_delegation_results, + sanitize_agent_error, ) from builtin_tools.telemetry import ( A2A_TASK_ID, @@ -547,7 +548,7 @@ class LangGraphA2AExecutor(AgentExecutor): # receive the error and stop polling. await updater.failed( message=new_text_message( - f"Agent error: {e}", task_id=task_id, context_id=context_id + sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id ) ) finally: -- 2.45.2 From 3e9a2665f3d960185fee59b1a22ebe530ea3b752 Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Wed, 13 May 2026 11:50:59 +0000 Subject: [PATCH 11/16] test(executor): update error-handling tests for sanitize_agent_error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sanitize_agent_error(exc=e) fix produces the sanitized format "Agent error (RuntimeError) — see workspace logs for details." instead of the raw exception string. Update two assertions in test_agent_error_handling and test_terminal_error_routes_via_updater_failed to expect the secure format, and assert raw message is NOT present. Co-Authored-By: Claude Opus 4.7 --- workspace/tests/test_a2a_executor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index 24b8fd68..05a3df09 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -165,7 +165,10 @@ async def test_agent_error_handling(): eq.enqueue_event.assert_called_once() error_msg = str(eq.enqueue_event.call_args[0][0]) - assert "model crashed" in error_msg + # sanitize_agent_error strips the raw exception message from the UI; + # raw detail goes to workspace logs only. This is the secure behaviour. + assert "Agent error (RuntimeError)" in error_msg + assert "model crashed" not in error_msg @pytest.mark.asyncio @@ -1200,7 +1203,10 @@ async def test_terminal_error_routes_via_updater_failed(): "terminal error Message must route via updater.failed() in task mode" ) err_msg = eq._failed_calls[-1] - assert "model crashed" in str(err_msg) + # sanitize_agent_error strips the raw exception message from the UI; + # raw detail goes to workspace logs only. + assert "Agent error (RuntimeError)" in str(err_msg) + assert "model crashed" not in str(err_msg) # And complete() must NOT have been called on the failure path. assert not eq._complete_calls, ( "complete() should not fire when execute() raises" -- 2.45.2 From a0da6b8db2090b61013c8bb603dbbb8e1d2fe94b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Molecule=20AI=20=C2=B7=20devops-engineer?= Date: Wed, 13 May 2026 12:26:37 +0000 Subject: [PATCH 12/16] fix(e2e): suppress shellcheck SC2034 on intentionally-unused vars in test_mcp_stdio_staging.sh entry_rc captures the trap entry exit code (intentionally unused for now); TENANT stores the provisioning response body (unused -- errors are caught by --fail-with-body exit code). Rename entry_rc -> _entry_rc and add inline disable comment on TENANT to satisfy shellcheck --severity=warning. --- tests/e2e/test_mcp_stdio_staging.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/e2e/test_mcp_stdio_staging.sh b/tests/e2e/test_mcp_stdio_staging.sh index 334b7b8f..9fa2efe2 100755 --- a/tests/e2e/test_mcp_stdio_staging.sh +++ b/tests/e2e/test_mcp_stdio_staging.sh @@ -31,7 +31,7 @@ CURL_COMMON=(-sS --fail-with-body --max-time 30) # ─── cleanup trap ─────────────────────────────────────────────────────── CLEANUP_DONE=0 cleanup_org() { - local entry_rc=$? + local _entry_rc=$? if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi CLEANUP_DONE=1 @@ -52,6 +52,7 @@ trap cleanup_org EXIT # ─── provision tenant ─────────────────────────────────────────────────── log "Provisioning tenant $SLUG..." +# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \ -H "Authorization: Bearer $ADMIN_TOKEN" \ -H "Content-Type: application/json" \ -- 2.45.2 From 98a1cf215180edad8bddd568a143e752a9f10abc Mon Sep 17 00:00:00 2001 From: infra-sre Date: Wed, 13 May 2026 12:49:31 +0000 Subject: [PATCH 13/16] ci: trigger sop-checklist gate re-evaluation -- 2.45.2 From 2067070f932f6f70daeb3998330e3a6386ce849b Mon Sep 17 00:00:00 2001 From: platform-engineer Date: Wed, 13 May 2026 08:16:09 -0700 Subject: [PATCH 14/16] fix(ci): resolve 4 CI failures on PR#778 1. ci-mcp-stdio-transport.yml: install pytest-cov so --no-cov flag doesn't conflict with workspace/pytest.ini addopts (exit code 4). Run 26124 (MCP stdio with regular-file stdout). 2. ci-mcp-stdio-transport.yml: add # mc#774 tracker on continue-on-error: true to satisfy lint-continue-on-error-tracking Tier 2e. Run 26132. 3. ci-mcp-stdio-transport.yml: add # bp-exempt directive comment above mcp-stdio-regular-file job key to satisfy lint-required-context-exists-in-bp Tier 2g. Run 26135. 4. bundle_test.go: import github.com/DATA-DOG/go-sqlmock explicitly so the package identifier resolves when compiled with -tags=integration. Run 26130 (Handlers Postgres Integration). Co-Authored-By: Claude Sonnet 4.6 --- .gitea/workflows/ci-mcp-stdio-transport.yml | 6 ++++-- workspace-server/internal/handlers/bundle_test.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.gitea/workflows/ci-mcp-stdio-transport.yml b/.gitea/workflows/ci-mcp-stdio-transport.yml index ce9ea550..43b2845f 100644 --- a/.gitea/workflows/ci-mcp-stdio-transport.yml +++ b/.gitea/workflows/ci-mcp-stdio-transport.yml @@ -46,10 +46,12 @@ env: GITHUB_SERVER_URL: https://git.moleculesai.app jobs: + # bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required. + # mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs. mcp-stdio-regular-file: name: MCP stdio with regular-file stdout runs-on: ubuntu-latest - continue-on-error: true + continue-on-error: true # mc#774 timeout-minutes: 5 env: WORKSPACE_ID: "00000000-0000-0000-0000-000000000001" @@ -63,7 +65,7 @@ jobs: python-version: '3.11' cache: pip cache-dependency-path: workspace/requirements.txt - - run: pip install -r requirements.txt pytest pytest-asyncio + - run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov - name: Reproduce runtime#61 — stdout as regular file run: | diff --git a/workspace-server/internal/handlers/bundle_test.go b/workspace-server/internal/handlers/bundle_test.go index 0494e22e..f3af6f90 100644 --- a/workspace-server/internal/handlers/bundle_test.go +++ b/workspace-server/internal/handlers/bundle_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "testing" + "github.com/DATA-DOG/go-sqlmock" "github.com/gin-gonic/gin" ) -- 2.45.2 From 27431fa852a5e05be8368c957f9ebcfd4a3f13ca Mon Sep 17 00:00:00 2001 From: core-be Date: Wed, 13 May 2026 09:29:14 -0700 Subject: [PATCH 15/16] =?UTF-8?q?test(canvas):=20freeze=20time=20in=20form?= =?UTF-8?q?atTTL=20tests=20=E2=80=94=20eliminate=20CI=20timing=20flake?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit formatTTL calls Date.now() internally; tests were computing the expected timestamp with a separate Date.now() call. On a slow CI runner the delta exceeded a bucket boundary (4m instead of 5m). vi.useFakeTimers()/vi.useRealTimers() in beforeEach/afterEach pins Date.now() to a single value for the duration of each test so the comparison is always exact. Co-Authored-By: Claude Sonnet 4.6 --- canvas/src/components/__tests__/MemoryInspectorPanel.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/canvas/src/components/__tests__/MemoryInspectorPanel.test.ts b/canvas/src/components/__tests__/MemoryInspectorPanel.test.ts index 2ea69e56..797f27ac 100644 --- a/canvas/src/components/__tests__/MemoryInspectorPanel.test.ts +++ b/canvas/src/components/__tests__/MemoryInspectorPanel.test.ts @@ -7,7 +7,7 @@ * itself (MemoryInspectorPanel) requires full API + store mocking and * is exercised by the existing MemoryTab.test.tsx. */ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { isPluginUnavailableError, formatTTL } from "../MemoryInspectorPanel"; // formatRelativeTime is not exported — tested via the component in MemoryTab.test.tsx @@ -47,6 +47,9 @@ describe("isPluginUnavailableError", () => { }); describe("formatTTL", () => { + beforeEach(() => { vi.useFakeTimers(); }); + afterEach(() => { vi.useRealTimers(); }); + it("returns '' for null", () => { expect(formatTTL(null)).toBe(""); }); -- 2.45.2 From 2cf2744fb9ce1c59b9dbd4c7c12194327096f45a Mon Sep 17 00:00:00 2001 From: devops-engineer Date: Wed, 13 May 2026 17:42:56 +0000 Subject: [PATCH 16/16] ci: retrigger CI [empty] -- 2.45.2