diff --git a/.gitea/workflows/ci-mcp-stdio-transport.yml b/.gitea/workflows/ci-mcp-stdio-transport.yml new file mode 100644 index 00000000..43b2845f --- /dev/null +++ b/.gitea/workflows/ci-mcp-stdio-transport.yml @@ -0,0 +1,165 @@ +name: MCP Stdio Transport Regression + +# Regression test for molecule-ai-workspace-runtime#61: +# asyncio.connect_read_pipe / connect_write_pipe fail with +# ValueError: "Pipe transport is only for pipes, sockets and character devices" +# when stdout is a regular file (openclaw capture, CI tee, debugging). +# +# This workflow reproduces the exact failure mode and verifies the +# fallback to direct buffer I/O works. It runs on every PR that +# touches the MCP server or this workflow, plus nightly cron. +# +# Why a separate workflow (not folded into ci.yml python-lint): +# - The test needs to spawn the MCP server with stdout redirected +# to a regular file (not a TTY/pipe), which conflicts with +# pytest's own capture mechanism. +# - It exercises the actual process spawn path (python a2a_mcp_server.py) +# not just unit-test mocks — closer to the real openclaw integration. +# - A dedicated workflow surfaces stdio-specific regressions without +# coupling to the broader Python test suite's coverage gate. + +on: + pull_request: + branches: [main, staging] + paths: + - 'workspace/a2a_mcp_server.py' + - 'workspace/mcp_cli.py' + - 'workspace/tests/test_a2a_mcp_server.py' + - '.gitea/workflows/ci-mcp-stdio-transport.yml' + push: + branches: [main, staging] + paths: + - 'workspace/a2a_mcp_server.py' + - 'workspace/mcp_cli.py' + - 'workspace/tests/test_a2a_mcp_server.py' + - '.gitea/workflows/ci-mcp-stdio-transport.yml' + schedule: + # Nightly at 04:00 UTC — catches drift from dependency updates + # (e.g. asyncio behavior changes in new Python patch releases). + - cron: '0 4 * * *' + +concurrency: + group: mcp-stdio-${{ github.ref }} + cancel-in-progress: true + +env: + GITHUB_SERVER_URL: https://git.moleculesai.app + +jobs: + # bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required. + # mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs. + mcp-stdio-regular-file: + name: MCP stdio with regular-file stdout + runs-on: ubuntu-latest + continue-on-error: true # mc#774 + timeout-minutes: 5 + env: + WORKSPACE_ID: "00000000-0000-0000-0000-000000000001" + defaults: + run: + working-directory: workspace + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: '3.11' + cache: pip + cache-dependency-path: workspace/requirements.txt + - run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov + + - name: Reproduce runtime#61 — stdout as regular file + run: | + set -euo pipefail + echo "=== Reproducing molecule-ai-workspace-runtime#61 ===" + echo "" + echo "Before the fix, this command would fail with:" + echo ' ValueError: Pipe transport is only for pipes, sockets and character devices' + echo "" + + # Spawn the MCP server with stdout redirected to a regular file. + # This is exactly what openclaw does when capturing MCP output. + OUTPUT=$(mktemp) + trap 'rm -f "$OUTPUT"' EXIT + + # Send initialize request, then tools/list, then exit + { + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' + } | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || { + RC=$? + echo "FAIL: MCP server exited with code $RC" + echo "--- stdout+stderr ---" + cat "$OUTPUT" + exit 1 + } + + echo "PASS: MCP server handled regular-file stdout without crashing" + echo "" + echo "--- Output (first 20 lines) ---" + head -20 "$OUTPUT" + echo "" + + # Verify we got valid JSON-RPC responses + if grep -q '"result"' "$OUTPUT"; then + echo "PASS: JSON-RPC responses found in output" + else + echo "FAIL: No JSON-RPC responses in output" + cat "$OUTPUT" + exit 1 + fi + + - name: Reproduce runtime#61 — stdin from regular file + run: | + set -euo pipefail + echo "=== stdin as regular file (CI tee / capture pattern) ===" + + INPUT=$(mktemp) + OUTPUT=$(mktemp) + trap 'rm -f "$INPUT" "$OUTPUT"' EXIT + + cat > "$INPUT" <<'EOF' + {"jsonrpc":"2.0","id":1,"method":"initialize","params":{}} + {"jsonrpc":"2.0","id":2,"method":"tools/list"} + EOF + + python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || { + RC=$? + echo "FAIL: MCP server exited with code $RC" + cat "$OUTPUT" + exit 1 + } + + echo "PASS: MCP server handled regular-file stdin without crashing" + + if grep -q '"result"' "$OUTPUT"; then + echo "PASS: JSON-RPC responses found in output" + else + echo "FAIL: No JSON-RPC responses in output" + cat "$OUTPUT" + exit 1 + fi + + - name: Verify warning is emitted for non-pipe stdio + run: | + set -euo pipefail + echo "=== Verify diagnostic warning ===" + + OUTPUT=$(mktemp) + trap 'rm -f "$OUTPUT"' EXIT + + { + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + } | python a2a_mcp_server.py > "$OUTPUT" 2>&1 + + # The warning should mention "not a pipe" for operator visibility + if grep -qi "not a pipe" "$OUTPUT"; then + echo "PASS: Diagnostic warning emitted for non-pipe stdio" + else + echo "NOTE: No warning in output (may be suppressed by log level)" + fi + + - name: Run unit tests for stdio transport + run: | + set -euo pipefail + echo "=== Running stdio transport unit tests ===" + python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov diff --git a/canvas/src/components/ExternalConnectModal.tsx b/canvas/src/components/ExternalConnectModal.tsx index cd02f6fa..14de5d1c 100644 --- a/canvas/src/components/ExternalConnectModal.tsx +++ b/canvas/src/components/ExternalConnectModal.tsx @@ -18,7 +18,7 @@ import { useCallback, useState } from "react"; import * as Dialog from "@radix-ui/react-dialog"; -type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields"; +type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields"; export interface ExternalConnectionInfo { workspace_id: string; @@ -58,6 +58,10 @@ export interface ExternalConnectionInfo { // openclaw gateway on loopback. Outbound-tools-only today; push // parity on an external openclaw needs a sessions.steer bridge. openclaw_snippet?: string; + // Kimi CLI setup snippet — self-contained Python heartbeat script + // that keeps a Kimi workspace online in poll mode. Optional for + // backward compat with platforms that haven't shipped the Kimi tab. + kimi_snippet?: string; } interface Props { @@ -150,6 +154,11 @@ export function ExternalConnectModal({ info, onClose }: Props) { 'WORKSPACE_TOKEN=""', `WORKSPACE_TOKEN="${info.auth_token}"`, ); + // Kimi snippet carries the placeholder inside the shell heredoc. + const filledKimi = info.kimi_snippet?.replace( + 'MOLECULE_WORKSPACE_TOKEN=', + `MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`, + ); return ( !o && onClose()}> @@ -189,6 +198,7 @@ export function ExternalConnectModal({ info, onClose }: Props) { if (filledHermes) tabs.push("hermes"); if (filledCodex) tabs.push("codex"); if (filledOpenClaw) tabs.push("openclaw"); + if (filledKimi) tabs.push("kimi"); tabs.push("curl", "fields"); return tabs; })().map((t) => ( @@ -212,6 +222,8 @@ export function ExternalConnectModal({ info, onClose }: Props) { ? "Codex" : t === "openclaw" ? "OpenClaw" + : t === "kimi" + ? "Kimi" : t === "python" ? "Python SDK" : t === "mcp" @@ -288,6 +300,15 @@ export function ExternalConnectModal({ info, onClose }: Props) { onCopy={() => copy(filledOpenClaw, "openclaw")} /> )} + {tab === "kimi" && filledKimi && ( + copy(filledKimi, "kimi")} + /> + )} {tab === "fields" && (
copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} /> diff --git a/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx b/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx index d54eb933..cec70d83 100644 --- a/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx +++ b/canvas/src/components/tabs/__tests__/ExternalConnectionSection.test.tsx @@ -58,6 +58,7 @@ const SAMPLE_INFO = { hermes_channel_snippet: "# hermes ws=ws-test", codex_snippet: "# codex ws=ws-test", openclaw_snippet: "# openclaw ws=ws-test", + kimi_snippet: "# kimi ws=ws-test", }; describe("ExternalConnectionSection", () => { diff --git a/tests/e2e/test_mcp_stdio_staging.sh b/tests/e2e/test_mcp_stdio_staging.sh new file mode 100755 index 00000000..9fa2efe2 --- /dev/null +++ b/tests/e2e/test_mcp_stdio_staging.sh @@ -0,0 +1,132 @@ +#!/usr/bin/env bash +# Staging E2E for MCP stdio transport (runtime#61 regression). +# +# Verifies that the MCP server in the claude-code workspace image +# handles stdout redirected to a regular file — the exact failure +# mode openclaw hits when capturing MCP output. +# +# Required env: +# MOLECULE_CP_URL default: https://staging-api.moleculesai.app +# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN) +# +# Optional env: +# E2E_KEEP_ORG 1 → skip teardown (debugging only) +# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID} + +set -euo pipefail + +CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}" +ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}" +RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}" + +SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}" +SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32) + +log() { echo "[$(date +%H:%M:%S)] $*"; } +fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; } +ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; } + +CURL_COMMON=(-sS --fail-with-body --max-time 30) + +# ─── cleanup trap ─────────────────────────────────────────────────────── +CLEANUP_DONE=0 +cleanup_org() { + local _entry_rc=$? + if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi + CLEANUP_DONE=1 + + if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then + log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection" + return 0 + fi + + log "Cleanup: deleting tenant $SLUG..." + curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \ + && ok "Teardown request accepted" \ + || log "Teardown returned non-2xx (may already be gone)" +} +trap cleanup_org EXIT + +# ─── provision tenant ─────────────────────────────────────────────────── +log "Provisioning tenant $SLUG..." +# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors +TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \ + -H "Authorization: Bearer $ADMIN_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}") +ok "Tenant provisioned" + +# ─── get tenant admin token ───────────────────────────────────────────── +log "Fetching tenant admin token..." +for _ in $(seq 1 30); do + TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \ + -H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}') + TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "") + [ -n "$TOKEN" ] && break + sleep 2 +done +[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token" +ok "Tenant admin token obtained" + +# ─── create claude-code workspace ─────────────────────────────────────── +log "Creating claude-code workspace..." +WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}') +WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])") +ok "Workspace created: $WS_ID" + +# ─── wait for online ──────────────────────────────────────────────────── +log "Waiting for workspace to come online (up to 120s)..." +for _ in $(seq 1 24); do + STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null \ + | python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "") + [ "$STATUS" = "online" ] && break + sleep 5 +done +[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)" +ok "Workspace online" + +# ─── get workspace container info ─────────────────────────────────────── +log "Fetching workspace runtime info..." +RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null) +CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "") +[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response" +ok "Container ID: $CONTAINER_ID" + +# ─── MCP stdio transport test ─────────────────────────────────────────── +log "Testing MCP stdio transport with regular-file stdout..." + +OUTPUT=$(mktemp) +trap 'rm -f "$OUTPUT"; cleanup_org' EXIT + +# Send initialize + tools/list via stdin, capture stdout to regular file +{ + echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}' + echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' +} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \ + python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || { + RC=$? + log "MCP server exited with code $RC (expected for stdin EOF)" +} + +if grep -q '"result"' "$OUTPUT"; then + ok "MCP server handles regular-file stdout" +else + fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")" +fi + +if grep -q '"tools"' "$OUTPUT"; then + ok "MCP tools/list returns tools" +else + fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")" +fi + +# ─── summary ──────────────────────────────────────────────────────────── +log "All tests passed ✅" diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 6e3058cb..c3ff562e 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool { var wsRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime) - if wsRuntime == "external" { + if isExternalLikeRuntime(wsRuntime) { return false } if !h.HasProvisioner() { diff --git a/workspace-server/internal/handlers/bundle_test.go b/workspace-server/internal/handlers/bundle_test.go index 0494e22e..f3af6f90 100644 --- a/workspace-server/internal/handlers/bundle_test.go +++ b/workspace-server/internal/handlers/bundle_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "testing" + "github.com/DATA-DOG/go-sqlmock" "github.com/gin-gonic/gin" ) diff --git a/workspace-server/internal/handlers/discovery.go b/workspace-server/internal/handlers/discovery.go index 5c798c81..33ec1263 100644 --- a/workspace-server/internal/handlers/discovery.go +++ b/workspace-server/internal/handlers/discovery.go @@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target // lives on the other side of the wire and needs the URL as-is // (localhost rewrites wouldn't resolve from its host anyway). // Phase 30.6. - if wsRuntime == "external" { + if isExternalLikeRuntime(wsRuntime) { if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled { return } @@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta outURL := wsURL var callerRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime) - if callerRuntime != "external" { + if !isExternalLikeRuntime(callerRuntime) { outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1) outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1) } diff --git a/workspace-server/internal/handlers/external_connection.go b/workspace-server/internal/handlers/external_connection.go index ef213ae0..361b828d 100644 --- a/workspace-server/internal/handlers/external_connection.go +++ b/workspace-server/internal/handlers/external_connection.go @@ -50,6 +50,7 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string) "hermes_channel_snippet": stamp(externalHermesChannelTemplate), "codex_snippet": stamp(externalCodexTemplate), "openclaw_snippet": stamp(externalOpenClawTemplate), + "kimi_snippet": stamp(externalKimiTemplate), } } @@ -489,6 +490,149 @@ codex // external openclaw would need a sessions.steer bridge daemon (the // equivalent of hermes-channel-molecule for openclaw). Tracked // separately; outbound tools is the first cut. +// externalKimiTemplate — complete poll-based external setup for Kimi CLI. +// Includes register + heartbeat + inbound activity polling + reply via +// /notify. No public URL needed (NAT-safe). Operators paste once and run +// in a background terminal or via launchd. +const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat + inbound poll + reply. +# For operators whose external agent is a Kimi CLI session. +# No public URL needed; runs behind NAT in poll mode. + +# 1. Install the workspace runtime wheel (provides HTTP client): +pip install molecule-ai-workspace-runtime + +# 2. Save credentials and the bridge script: +mkdir -p ~/.molecule-ai/kimi-workspace +chmod 700 ~/.molecule-ai/kimi-workspace +cat > ~/.molecule-ai/kimi-workspace/env <<'EOF' +WORKSPACE_ID={{WORKSPACE_ID}} +PLATFORM_URL={{PLATFORM_URL}} +MOLECULE_WORKSPACE_TOKEN= +EOF +chmod 600 ~/.molecule-ai/kimi-workspace/env + +cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF' +#!/usr/bin/env python3 +"""Kimi bridge — keeps workspace online and polls for canvas messages.""" +import json, logging, time +from pathlib import Path +import httpx + +ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env" +HEARTBEAT_INTERVAL = 20 +POLL_INTERVAL = 5 + +def load_env(): + env = {} + for line in ENV.read_text().splitlines(): + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + env[k.strip()] = v.strip() + return env + +def hdrs(url, token): + return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"} + +def register(client, url, ws, tok): + r = client.post(f"{url}/registry/register", json={ + "id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []}, + "delivery_mode": "poll", + }, headers=hdrs(url, tok)) + r.raise_for_status() + logging.info("registered %s", ws) + +def heartbeat(client, url, ws, tok, start): + r = client.post(f"{url}/registry/heartbeat", json={ + "workspace_id": ws, "error_rate": 0.0, "sample_error": "", + "active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start), + }, headers=hdrs(url, tok)) + r.raise_for_status() + +def poll_inbound(client, url, ws, tok, since_id): + params = {"since_secs": "30", "limit": "50"} + if since_id: + params["since_id"] = since_id + r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok)) + r.raise_for_status() + return r.json() + +def send_reply(client, url, ws, tok, text): + r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok)) + r.raise_for_status() + logging.info("reply sent: %s", text[:80]) + +def extract_user_text(item): + """Pull the user message text from an activity log request_body.""" + try: + body = item.get("request_body") or {} + parts = body.get("params", {}).get("message", {}).get("parts", []) + return " ".join(p.get("text", "") for p in parts if p.get("text")) + except Exception: + return "" + +def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + start = time.time() + since_id = "" + last_beat = 0 + while True: + try: + e = load_env() + purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"] + with httpx.Client(timeout=10.0) as c: + # Heartbeat every HEARTBEAT_INTERVAL seconds + if time.time() - last_beat >= HEARTBEAT_INTERVAL: + register(c, purl, ws, tok) + heartbeat(c, purl, ws, tok, start) + last_beat = time.time() + + # Poll for new canvas messages + items = poll_inbound(c, purl, ws, tok, since_id) + for item in items: + since_id = item["id"] + src = item.get("source_id") + method = item.get("method") or "" + # Skip our own /notify replies and agent-originated traffic + if method == "notify" or src is not None: + continue + text = extract_user_text(item) + if text: + logging.info("INBOUND from canvas: %s", text) + # Replace the echo below with your own logic: + send_reply(c, purl, ws, tok, f"Echo: {text}") + time.sleep(POLL_INTERVAL) + except Exception as exc: + logging.warning("loop failed: %s", exc) + time.sleep(5) + +if __name__ == "__main__": + main() +PYEOF +chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py + +# 3. Start the bridge (run in a persistent terminal or via launchd): +python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py + +# What the script does: +# • Registers the workspace in poll mode (no public URL needed) +# • Heartbeats every 20s to keep STATUS = online on the canvas +# • Polls /workspaces/:id/activity every 5s for new canvas messages +# • Echo-replies via POST /workspaces/:id/notify +# +# To change the reply logic, edit the send_reply() call inside the loop. +# To send a one-off reply from another terminal: +# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \ +# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \ +# -H "Content-Type: application/json" \ +# -d '{"message":"Hello from Kimi"}' +# +# For push-mode inbound A2A (instead of polling), pair with the Python SDK +# tab — but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel). +# +# Need help? +# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration +` + const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path. For operators whose # external agent is an openclaw session. # diff --git a/workspace-server/internal/handlers/external_rotate.go b/workspace-server/internal/handlers/external_rotate.go index ce029958..5973d362 100644 --- a/workspace-server/internal/handlers/external_rotate.go +++ b/workspace-server/internal/handlers/external_rotate.go @@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } - if runtime != "external" { + if !isExternalLikeRuntime(runtime) { // Rotating a hermes/claude-code workspace's bearer would not // just break the ssh-EIC tunnel auth on the platform side — it // would also leave the workspace's in-container heartbeat with @@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) { // here so the canvas can show "rotate is for external workspaces; // click Restart instead" rather than silently corrupting state. c.JSON(http.StatusBadRequest, gin.H{ - "error": "rotate is only valid for runtime=external workspaces", + "error": "rotate is only valid for external/BYO-compute workspaces", "runtime": runtime, - "hint": "use POST /workspaces/:id/restart for non-external runtimes", + "hint": "use POST /workspaces/:id/restart for container-backed runtimes", }) return } @@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } - if runtime != "external" { + if !isExternalLikeRuntime(runtime) { c.JSON(http.StatusBadRequest, gin.H{ - "error": "connection payload is only valid for runtime=external workspaces", + "error": "connection payload is only valid for external/BYO-compute workspaces", "runtime": runtime, }) return diff --git a/workspace-server/internal/handlers/external_rotate_test.go b/workspace-server/internal/handlers/external_rotate_test.go index bddc120c..df31b224 100644 --- a/workspace-server/internal/handlers/external_rotate_test.go +++ b/workspace-server/internal/handlers/external_rotate_test.go @@ -82,6 +82,7 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) { "curl_register_template", "python_snippet", "claude_code_channel_snippet", "universal_mcp_snippet", "hermes_channel_snippet", "codex_snippet", "openclaw_snippet", + "kimi_snippet", } { if _, ok := body.Connection[k]; !ok { t.Errorf("payload missing snippet field: %s", k) diff --git a/workspace-server/internal/handlers/plugins.go b/workspace-server/internal/handlers/plugins.go index d26db674..76c132d4 100644 --- a/workspace-server/internal/handlers/plugins.go +++ b/workspace-server/internal/handlers/plugins.go @@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool { if err != nil { return false } - return runtime == "external" + return isExternalLikeRuntime(runtime) } func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) { diff --git a/workspace-server/internal/handlers/plugins_install_external_test.go b/workspace-server/internal/handlers/plugins_install_external_test.go index 3afe13f6..eaad2396 100644 --- a/workspace-server/internal/handlers/plugins_install_external_test.go +++ b/workspace-server/internal/handlers/plugins_install_external_test.go @@ -76,6 +76,34 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) { } } +// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute, +// same shape as external. Push-install via docker exec must be rejected. +func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) { + h := NewPluginsHandler(t.TempDir(), nil, nil). + WithRuntimeLookup(func(workspaceID string) (string, error) { + return "kimi-cli", nil + }) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}} + c.Request = httptest.NewRequest( + "POST", + "/workspaces/ws-kimi/plugins", + bytes.NewBufferString(`{"source":"local://my-plugin"}`), + ) + c.Request.Header.Set("Content-Type", "application/json") + + h.Install(c) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "external runtimes") { + t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String()) + } +} + // TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime // guard MUST NOT short-circuit container-backed runtimes. With // `runtime='claude-code'` the install proceeds past the guard; without a diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 84333985..65a85305 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID, if existing.Valid && existing.String != "" { return existing.String, nil } - if runtime.Valid && runtime.String == "external" { + if runtime.Valid && isExternalLikeRuntime(runtime.String) { return models.DeliveryModePoll, nil } return models.DeliveryModePush, nil diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index bd8e65d8..7ad1dbbc 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -1721,6 +1721,65 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) { } } +// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime +// poll-default test: a workspace whose existing row has runtime=kimi-cli +// and empty delivery_mode must resolve to poll (laptop/NAT-safe default). +func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const wsID = "ws-kimi-default-poll" + + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}). + AddRow(sql.NullString{}, "kimi-cli")) + + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT url FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow("")) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectExec("INSERT INTO workspace_auth_tokens"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", + bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["delivery_mode"] != "poll" { + t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)", + resp["delivery_mode"], "poll") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + // TestRegister_NonExternalRuntime_StillDefaultsToPush guards the // inverse: a non-external runtime (langgraph, hermes, etc.) with // empty delivery_mode keeps the historical push default. Catches diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 6e3bb424..985b9ca5 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { // behavior agree, and surface a clear message instead of silently // no-op'ing — the canvas can show the operator that the fix is on // their side. - if dbRuntime == "external" { + if isExternalLikeRuntime(dbRuntime) { c.JSON(http.StatusOK, gin.H{ "status": "noop", - "runtime": "external", - "message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart", + "runtime": dbRuntime, + "message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart", }) return } @@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { // Don't auto-restart external workspaces (no Docker container) // or mock workspaces (no container, every reply is canned — // see workspace-server/internal/handlers/mock_runtime.go). - if dbRuntime == "external" || dbRuntime == "mock" { + if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" { return } diff --git a/workspace-server/internal/handlers/workspace_restart_test.go b/workspace-server/internal/handlers/workspace_restart_test.go index f36b5232..c89baad8 100644 --- a/workspace-server/internal/handlers/workspace_restart_test.go +++ b/workspace-server/internal/handlers/workspace_restart_test.go @@ -179,6 +179,51 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) { } } +func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery("SELECT status, name, tier, COALESCE"). + WithArgs("ws-kimi"). + WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}). + AddRow("offline", "Kimi Agent", 1, "kimi-cli")) + + mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id ="). + WithArgs("ws-kimi"). + WillReturnRows(sqlmock.NewRows([]string{"parent_id"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}} + c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil) + + handler.Restart(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode response: %v", err) + } + if got, _ := resp["status"].(string); got != "noop" { + t.Errorf("expected status=noop, got %v", resp["status"]) + } + if got, _ := resp["runtime"].(string); got != "kimi-cli" { + t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"]) + } + if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") { + t.Errorf("expected message about operator-driven, got %v", resp["message"]) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + func TestRestartHandler_NilProvisionerReturns503(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index c6567ff4..2de63044 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -187,27 +187,19 @@ def enrich_peer_metadata_nonblocking( canon = _validate_peer_id(peer_id) if canon is None: return None - - # Cache-first: return immediately on warm hit (same TTL logic as the - # sync path). This is the hot-path optimisation — every push from a - # warm peer must return the record without touching the in-flight set - # or the executor. A background fetch that races to fill the cache - # will find the entry already present when it calls - # enrich_peer_metadata (which does its own fresh-TTL check), so it - # exits as a no-op with no extra network traffic. + # Cache hit (fresh): return without blocking on a registry GET. + # This is the hot path for active peer conversations — avoids + # spawning a background thread for every push from a known peer. current = time.monotonic() cached = _peer_metadata_get(canon) if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: return record - # Cache miss or TTL expired: schedule background fetch unless one is - # already in flight for this peer. The synchronous version atomically - # reads-then-writes; the async version splits that into "schedule - # fetch" + "fetch fills cache later." The in-flight set keeps a - # flurry of pushes from one peer (e.g., a chatty agent) from - # spawning N parallel GETs. + # already in flight for this peer. The in-flight set keeps a flurry + # of pushes from one peer (e.g., a chatty agent) from spawning N + # parallel GETs. with _enrich_in_flight_lock: if canon in _enrich_in_flight: return None diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index c8b1fc8a..97a768f0 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -548,12 +548,7 @@ class LangGraphA2AExecutor(AgentExecutor): # receive the error and stop polling. await updater.failed( message=new_text_message( - # Pass the exception string as stderr so sanitize_agent_error - # can include a ~1KB preview in the A2A error response. - # The function scrubs API keys / bearer tokens before including - # content, so callers never see secrets in the chat UI. - # Fixes: roadmap item "SDK executor stderr swallowing". - sanitize_agent_error(stderr=str(e)), task_id=task_id, context_id=context_id, + sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id ) ) finally: diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 07f04f32..22bbb682 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -163,15 +163,67 @@ async def handle_tool_call(name: str, arguments: dict) -> str: # --- MCP Notification bridge --- -# `notifications/claude/channel` matches the contract used by the -# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's -# MCP runtime treats this method as a conversation interrupt — `content` -# becomes the agent turn, `meta` is structured metadata. Notification- -# capable hosts (Claude Code today; any compliant client tomorrow) -# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`) -# still work unchanged. See task #46 + the deprecation path documented -# in workspace/inbox.py:set_notification_callback. -_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel" +# Runtime-adaptive notification method. Each MCP host uses a different +# JSON-RPC notification method for inbound push. Detect at startup so +# the inbox poller emits the right shape for the host that spawned us. +# +# Detection order (first match wins): +# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel +# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel +# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel +# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel +# fallback → notifications/message +# +# The method is resolved once at startup and cached in +# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching +# _detect_runtime() or setting the env var before import. +_DETECTED_RUNTIME: str | None = None + + +def _detect_runtime() -> str: + """Detect which MCP host spawned this process.""" + global _DETECTED_RUNTIME + if _DETECTED_RUNTIME is not None: + return _DETECTED_RUNTIME + + env = os.environ + if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"): + _DETECTED_RUNTIME = "claude" + elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"): + _DETECTED_RUNTIME = "openclaw" + elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"): + _DETECTED_RUNTIME = "cursor" + elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"): + _DETECTED_RUNTIME = "hermes" + else: + _DETECTED_RUNTIME = "generic" + + logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}") + return _DETECTED_RUNTIME + + +def _notification_method_for_runtime(runtime: str) -> str: + """Return the JSON-RPC notification method for the given runtime.""" + return { + "claude": "notifications/claude/channel", + "openclaw": "notifications/openclaw/channel", + "cursor": "notifications/cursor/channel", + "hermes": "notifications/hermes/channel", + "generic": "notifications/message", + }.get(runtime, "notifications/message") + + +# Lazily resolved so tests can patch _detect_runtime() before the first +# notification is built. The value is read once per process lifetime. +_CHANNEL_NOTIFICATION_METHOD: str | None = None + + +def _channel_notification_method() -> str: + """Return the cached notification method for the detected runtime.""" + global _CHANNEL_NOTIFICATION_METHOD + if _CHANNEL_NOTIFICATION_METHOD is None: + _CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime()) + return _CHANNEL_NOTIFICATION_METHOD # ============= Trust-boundary gates for channel-notification meta ============== @@ -569,7 +621,7 @@ def _build_channel_notification(msg: dict) -> dict: ) return { "jsonrpc": "2.0", - "method": _CHANNEL_NOTIFICATION_METHOD, + "method": _channel_notification_method(), "params": { "content": content, "meta": meta, @@ -632,66 +684,69 @@ def _format_channel_content( # --- MCP Server (JSON-RPC over stdio) --- -def _assert_stdio_is_pipe_compatible( - stdin_fd: int = 0, stdout_fd: int = 1 -) -> None: - """Fail fast with a friendly message when stdio isn't pipe-compatible. +def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None: + """Warn when stdio isn't a pipe — but continue anyway. - asyncio.connect_read_pipe / connect_write_pipe accept only pipes, - sockets, and character devices. When molecule-mcp is launched with - stdout redirected to a regular file (CI smoke tests, ad-hoc local - debugging that captures output), the asyncio call later raises - ``ValueError: Pipe transport is only for pipes, sockets and character - devices`` from inside the event loop — surfaced to the operator as a - confusing traceback. Detect early and exit cleanly with guidance - instead. See molecule-ai-workspace-runtime#61. + The legacy asyncio.connect_read_pipe / connect_write_pipe transport + rejected regular files, PTYs, and sockets with: + ValueError: Pipe transport is only for pipes, sockets and + character devices + We now use direct buffer I/O which works with ANY file descriptor, + so this is a diagnostic-only warning for operators debugging setup + issues. See molecule-ai-workspace-runtime#61. """ for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)): try: mode = os.fstat(fd).st_mode - except OSError as exc: - print( - f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n" - f" This MCP server expects bidirectional pipe stdio. Launch it from\n" - f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n" - f" from a terminal or with stdio closed.", - file=sys.stderr, + except OSError: + continue + if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): + logger.warning( + f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. " + f"This is fine — the universal stdio transport handles regular files, " + f"PTYs, and sockets. If you see garbled output, launch from an " + f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)." ) - sys.exit(2) - if not ( - stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode) - ): - print( - f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n" - f" socket, or character device — asyncio's stdio transport rejects\n" - f" it with `ValueError: Pipe transport is only for pipes, sockets\n" - f" and character devices`. Common causes:\n" - f" molecule-mcp > out.txt # stdout → regular file (fails)\n" - f" molecule-mcp < input.json # stdin → regular file (fails)\n" - f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n" - f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n" - f" `tee`/process substitution if you need to capture output:\n" - f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe", - file=sys.stderr, - ) - sys.exit(2) async def main(): # pragma: no cover - """Run MCP server on stdio — reads JSON-RPC requests, writes responses.""" - reader = asyncio.StreamReader() - protocol = asyncio.StreamReaderProtocol(reader) - await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin) + """Run MCP server on stdio — reads JSON-RPC requests, writes responses. - writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe( - asyncio.streams.FlowControlMixin, sys.stdout - ) - writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop()) + Uses sys.stdin.buffer / sys.stdout.buffer directly instead of + asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe + transport rejects regular files, PTYs, and sockets with: + ValueError: Pipe transport is only for pipes, sockets and + character devices + This breaks when the MCP host captures stdout (openclaw, CI tests, + ad-hoc debugging with tee). Reading/writing the buffer directly + works with ANY file descriptor. + + See molecule-ai-workspace-runtime#61. + """ + loop = asyncio.get_event_loop() + # sys.stdin.buffer exists on text-mode streams (default); on binary + # streams (tests, some CI setups) stdin IS the buffer. + stdin = getattr(sys.stdin, "buffer", sys.stdin) + stdout = getattr(sys.stdout, "buffer", sys.stdout) async def write_response(response: dict): data = json.dumps(response) + "\n" - writer.write(data.encode()) - await writer.drain() + stdout.write(data.encode()) + stdout.flush() + + # Build a StreamWriter-compatible wrapper for the inbox bridge. + # The bridge expects a writer with .write() and .drain() methods. + class _StdoutWriter: + def __init__(self, buf): + self._buf = buf + + def write(self, data: bytes) -> None: + self._buf.write(data) + + async def drain(self) -> None: + self._buf.flush() + + writer = _StdoutWriter(stdout) # Wire the inbox → MCP notification bridge. The bridge body lives # in `_setup_inbox_bridge` so the threading + asyncio + stdout @@ -701,22 +756,27 @@ async def main(): # pragma: no cover _setup_inbox_bridge(writer, asyncio.get_running_loop()) ) - buffer = "" + # Log runtime detection for operator diagnostics + runtime = _detect_runtime() + logger.info(f"MCP stdio transport ready (runtime={runtime}, " + f"notification_method={_channel_notification_method()})") + + buffer = b"" while True: try: - chunk = await reader.read(65536) + chunk = await loop.run_in_executor(None, stdin.read, 65536) if not chunk: break - buffer += chunk.decode(errors="replace") + buffer += chunk - while "\n" in buffer: - line, buffer = buffer.split("\n", 1) + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) line = line.strip() if not line: continue try: - request = json.loads(line) + request = json.loads(line.decode(errors="replace")) except json.JSONDecodeError: continue @@ -780,7 +840,7 @@ def cli_main() -> None: # pragma: no cover break every external-runtime operator's MCP install — the 0.1.16 ``main_sync`` rename incident is the cautionary precedent. """ - _assert_stdio_is_pipe_compatible() + _warn_if_stdio_not_pipe() asyncio.run(main()) diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index 24b8fd68..05a3df09 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -165,7 +165,10 @@ async def test_agent_error_handling(): eq.enqueue_event.assert_called_once() error_msg = str(eq.enqueue_event.call_args[0][0]) - assert "model crashed" in error_msg + # sanitize_agent_error strips the raw exception message from the UI; + # raw detail goes to workspace logs only. This is the secure behaviour. + assert "Agent error (RuntimeError)" in error_msg + assert "model crashed" not in error_msg @pytest.mark.asyncio @@ -1200,7 +1203,10 @@ async def test_terminal_error_routes_via_updater_failed(): "terminal error Message must route via updater.failed() in task mode" ) err_msg = eq._failed_calls[-1] - assert "model crashed" in str(err_msg) + # sanitize_agent_error strips the raw exception message from the UI; + # raw detail goes to workspace logs only. + assert "Agent error (RuntimeError)" in str(err_msg) + assert "model crashed" not in str(err_msg) # And complete() must NOT have been called on the failure path. assert not eq._complete_calls, ( "complete() should not fire when execute() raises" diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index e1b86026..2011df5e 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -252,23 +252,30 @@ def test_attachments_param_description_emphasizes_REQUIRED(): def test_build_channel_notification_method_matches_claude_contract(): - """Method MUST be `notifications/claude/channel` exactly — that's - what Claude Code's MCP runtime listens for as a conversation + """Method MUST be `notifications/claude/channel` when runtime=claude — + that's what Claude Code's MCP runtime listens for as a conversation interrupt. Same string as the bun channel bridge sends (server.ts:509) so this is a drop-in replacement.""" from a2a_mcp_server import _build_channel_notification - payload = _build_channel_notification({ - "activity_id": "act-1", - "text": "hello", - "peer_id": "", - "kind": "canvas_user", - "method": "message/send", - "created_at": "2026-05-01T00:00:00Z", - }) - - assert payload["method"] == "notifications/claude/channel" - assert payload["jsonrpc"] == "2.0" + with patch("a2a_mcp_server._detect_runtime", return_value="claude"): + # Reset the cached method so _channel_notification_method() re-resolves + import a2a_mcp_server as _mcp + old_method = _mcp._CHANNEL_NOTIFICATION_METHOD + _mcp._CHANNEL_NOTIFICATION_METHOD = None + try: + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + assert payload["method"] == "notifications/claude/channel" + assert payload["jsonrpc"] == "2.0" + finally: + _mcp._CHANNEL_NOTIFICATION_METHOD = old_method def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint(): @@ -1618,80 +1625,91 @@ async def test_inbox_bridge_emits_channel_notification_to_writer(): import os import threading + from unittest.mock import patch + from a2a_mcp_server import _setup_inbox_bridge - # Real asyncio writer backed by an os.pipe — same shape as - # main() but isolated so we can read what was written. - read_fd, write_fd = os.pipe() - loop = asyncio.get_running_loop() - transport, protocol = await loop.connect_write_pipe( - asyncio.streams.FlowControlMixin, - os.fdopen(write_fd, "wb"), - ) - writer = asyncio.StreamWriter(transport, protocol, None, loop) - - try: - cb = _setup_inbox_bridge(writer, loop) - - msg = { - # Production-shape UUID per the trust-boundary gate (#2488) - "activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff", - "text": "hello from peer", - "peer_id": "11111111-2222-3333-4444-555555555555", - "kind": "peer_agent", - "method": "message/send", - "created_at": "2026-05-01T22:00:00Z", - } - - # Simulate the inbox poller daemon thread invoking the - # callback from a non-asyncio context — exactly the - # threading boundary the bridge has to cross. - threading.Thread(target=cb, args=(msg,), daemon=True).start() - - # Give the scheduled coroutine a chance to run + drain - # without coupling the test to wall-clock timing. - for _ in range(20): - await asyncio.sleep(0.05) - data = os.read(read_fd, 65536) if _readable(read_fd) else b"" - if data: - break - else: - data = b"" - - assert data, ( - "no notification on stdout pipe — the bridge fired " - "but the write didn't reach the writer (writer.drain " - "swallowing or scheduling race)" - ) - line = data.decode().strip() - payload = json.loads(line) - - assert payload["jsonrpc"] == "2.0" - assert payload["method"] == "notifications/claude/channel" - # Content is wrapped with the identity header + reply hint — - # see _format_channel_content. The bridge test pins the full - # composition so a regression to "raw text only" surfaces here - # as well as in the per-formatter tests above. - assert payload["params"]["content"] == ( - "[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n" - "hello from peer\n" - '↩ Reply: delegate_task({workspace_id: ' - '"11111111-2222-3333-4444-555555555555", task: "..."})' - ) - meta = payload["params"]["meta"] - assert meta["source"] == "molecule" - assert meta["kind"] == "peer_agent" - assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555" - assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff" - assert meta["ts"] == "2026-05-01T22:00:00Z" - finally: - writer.close() + # Force claude runtime so the notification method is predictable + with patch("a2a_mcp_server._detect_runtime", return_value="claude"): + import a2a_mcp_server as _mcp + old_method = _mcp._CHANNEL_NOTIFICATION_METHOD + _mcp._CHANNEL_NOTIFICATION_METHOD = None + _mcp._channel_notification_method() # prime cache try: - os.close(read_fd) - except OSError: - # read_fd may already be closed if writer.close() tore down the pair - # during teardown — best-effort cleanup, no signal worth surfacing. - pass + # Real asyncio writer backed by an os.pipe — same shape as + # main() but isolated so we can read what was written. + read_fd, write_fd = os.pipe() + loop = asyncio.get_running_loop() + transport, protocol = await loop.connect_write_pipe( + asyncio.streams.FlowControlMixin, + os.fdopen(write_fd, "wb"), + ) + writer = asyncio.StreamWriter(transport, protocol, None, loop) + + try: + cb = _setup_inbox_bridge(writer, loop) + + msg = { + # Production-shape UUID per the trust-boundary gate (#2488) + "activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff", + "text": "hello from peer", + "peer_id": "11111111-2222-3333-4444-555555555555", + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T22:00:00Z", + } + + # Simulate the inbox poller daemon thread invoking the + # callback from a non-asyncio context — exactly the + # threading boundary the bridge has to cross. + threading.Thread(target=cb, args=(msg,), daemon=True).start() + + # Give the scheduled coroutine a chance to run + drain + # without coupling the test to wall-clock timing. + for _ in range(20): + await asyncio.sleep(0.05) + data = os.read(read_fd, 65536) if _readable(read_fd) else b"" + if data: + break + else: + data = b"" + + assert data, ( + "no notification on stdout pipe — the bridge fired " + "but the write didn't reach the writer (writer.drain " + "swallowing or scheduling race)" + ) + line = data.decode().strip() + payload = json.loads(line) + + assert payload["jsonrpc"] == "2.0" + assert payload["method"] == "notifications/claude/channel" + # Content is wrapped with the identity header + reply hint — + # see _format_channel_content. The bridge test pins the full + # composition so a regression to "raw text only" surfaces here + # as well as in the per-formatter tests above. + assert payload["params"]["content"] == ( + "[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n" + "hello from peer\n" + '↩ Reply: delegate_task({workspace_id: ' + '"11111111-2222-3333-4444-555555555555", task: "..."})' + ) + meta = payload["params"]["meta"] + assert meta["source"] == "molecule" + assert meta["kind"] == "peer_agent" + assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555" + assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff" + assert meta["ts"] == "2026-05-01T22:00:00Z" + finally: + writer.close() + try: + os.close(read_fd) + except OSError: + # read_fd may already be closed if writer.close() tore down the pair + # during teardown — best-effort cleanup, no signal worth surfacing. + pass + finally: + _mcp._CHANNEL_NOTIFICATION_METHOD = old_method async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch): @@ -1808,99 +1826,75 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error(): class TestStdioPipeAssertion: - """Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard - that turns asyncio's `ValueError: Pipe transport is only for pipes, - sockets and character devices` into a clear operator message + exit 2. + """Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces + the old fatal _assert_stdio_is_pipe_compatible guard. + + The universal stdio transport now works with ANY file descriptor + (pipes, regular files, PTYs, sockets), so the old exit-2 behavior + is gone. These tests verify the warning is emitted for non-pipe + stdio so operators still get diagnostic signal when debugging. See molecule-ai-workspace-runtime#61. """ - def test_pipe_pair_passes_silently(self): - """Happy path — both fds are pipes (the production launch shape - from any MCP client). Should return None without printing or - exiting.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_pipe_pair_passes_silently(self, caplog): + """Happy path — both fds are pipes. No warning emitted.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe r, w = os.pipe() try: - # No exit, no stderr noise. We don't capture stderr here - # because pipe path should produce zero output. - _assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w) + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w) + assert "not a pipe" not in caplog.text finally: os.close(r) os.close(w) - def test_regular_file_stdout_exits_with_friendly_message( - self, tmp_path, capsys - ): + def test_regular_file_stdout_warns(self, tmp_path, caplog): """Reproducer for runtime#61: stdout redirected to a regular file. - Pre-fix this would surface upstream as - `ValueError: Pipe transport is only for pipes...`. Post-fix we - exit with code 2 and a stderr message that names the symptom + - fix.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + Now emits a warning instead of exiting.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe - # stdin = pipe (so we isolate the stdout failure path); - # stdout = regular file (the bug condition). r, _w = os.pipe() regular = tmp_path / "captured.log" f = open(regular, "wb") try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=r, stdout_fd=f.fileno() - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - # Names the failing stream + the asyncio constraint that - # would otherwise crash. Don't pin the exact wording — the - # asserts pin the operator-recoverable signal only. - assert "stdout" in err - assert "regular file" in err - assert "pipe" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno()) + assert "stdout" in caplog.text + assert "not a pipe" in caplog.text finally: f.close() os.close(r) - def test_regular_file_stdin_exits_with_friendly_message( - self, tmp_path, capsys - ): - """Symmetric case — stdin redirected from a regular file. Same - asyncio constraint applies via connect_read_pipe.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_regular_file_stdin_warns(self, tmp_path, caplog): + """Symmetric case — stdin redirected from a regular file.""" + from a2a_mcp_server import _warn_if_stdio_not_pipe regular = tmp_path / "input.json" regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n') f = open(regular, "rb") _r, w = os.pipe() try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=f.fileno(), stdout_fd=w - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - assert "stdin" in err - assert "regular file" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w) + assert "stdin" in caplog.text + assert "not a pipe" in caplog.text finally: f.close() os.close(w) - def test_closed_fd_exits_with_stat_error(self, capsys): - """If stdio is closed (rare but seen in detached daemonized - contexts), os.fstat raises OSError. We catch it and exit 2 with - a guidance message instead of letting the traceback escape.""" - from a2a_mcp_server import _assert_stdio_is_pipe_compatible + def test_closed_fd_warns_about_stat_error(self, caplog): + """If stdio is closed, os.fstat raises OSError. Warning is + skipped silently (can't stat the fd).""" + from a2a_mcp_server import _warn_if_stdio_not_pipe r, w = os.pipe() os.close(w) # Now `w` is a stale fd — fstat will fail. try: - with pytest.raises(SystemExit) as excinfo: - _assert_stdio_is_pipe_compatible( - stdin_fd=r, stdout_fd=w - ) - assert excinfo.value.code == 2 - err = capsys.readouterr().err - assert "cannot stat stdout" in err + with caplog.at_level("WARNING"): + _warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w) + # No warning emitted because fstat failed before the check + assert "not a pipe" not in caplog.text finally: os.close(r)