fix(mcp): universal stdio transport + runtime-adaptive notifications #778

Merged
devops-engineer merged 19 commits from fix/stdio-fallback-all-environments into main 2026-05-13 18:01:25 +00:00
21 changed files with 888 additions and 244 deletions

View File

@ -0,0 +1,165 @@
name: MCP Stdio Transport Regression
# Regression test for molecule-ai-workspace-runtime#61:
# asyncio.connect_read_pipe / connect_write_pipe fail with
# ValueError: "Pipe transport is only for pipes, sockets and character devices"
# when stdout is a regular file (openclaw capture, CI tee, debugging).
#
# This workflow reproduces the exact failure mode and verifies the
# fallback to direct buffer I/O works. It runs on every PR that
# touches the MCP server or this workflow, plus nightly cron.
#
# Why a separate workflow (not folded into ci.yml python-lint):
# - The test needs to spawn the MCP server with stdout redirected
# to a regular file (not a TTY/pipe), which conflicts with
# pytest's own capture mechanism.
# - It exercises the actual process spawn path (python a2a_mcp_server.py)
# not just unit-test mocks — closer to the real openclaw integration.
# - A dedicated workflow surfaces stdio-specific regressions without
# coupling to the broader Python test suite's coverage gate.
on:
pull_request:
branches: [main, staging]
paths:
- 'workspace/a2a_mcp_server.py'
- 'workspace/mcp_cli.py'
- 'workspace/tests/test_a2a_mcp_server.py'
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
push:
branches: [main, staging]
paths:
- 'workspace/a2a_mcp_server.py'
- 'workspace/mcp_cli.py'
- 'workspace/tests/test_a2a_mcp_server.py'
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
schedule:
# Nightly at 04:00 UTC — catches drift from dependency updates
# (e.g. asyncio behavior changes in new Python patch releases).
- cron: '0 4 * * *'
concurrency:
group: mcp-stdio-${{ github.ref }}
cancel-in-progress: true
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
# bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required.
# mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs.
mcp-stdio-regular-file:
name: MCP stdio with regular-file stdout
runs-on: ubuntu-latest
continue-on-error: true # mc#774
timeout-minutes: 5
env:
WORKSPACE_ID: "00000000-0000-0000-0000-000000000001"
defaults:
run:
working-directory: workspace
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.11'
cache: pip
cache-dependency-path: workspace/requirements.txt
- run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
- name: Reproduce runtime#61 — stdout as regular file
run: |
set -euo pipefail
echo "=== Reproducing molecule-ai-workspace-runtime#61 ==="
echo ""
echo "Before the fix, this command would fail with:"
echo ' ValueError: Pipe transport is only for pipes, sockets and character devices'
echo ""
# Spawn the MCP server with stdout redirected to a regular file.
# This is exactly what openclaw does when capturing MCP output.
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"' EXIT
# Send initialize request, then tools/list, then exit
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || {
RC=$?
echo "FAIL: MCP server exited with code $RC"
echo "--- stdout+stderr ---"
cat "$OUTPUT"
exit 1
}
echo "PASS: MCP server handled regular-file stdout without crashing"
echo ""
echo "--- Output (first 20 lines) ---"
head -20 "$OUTPUT"
echo ""
# Verify we got valid JSON-RPC responses
if grep -q '"result"' "$OUTPUT"; then
echo "PASS: JSON-RPC responses found in output"
else
echo "FAIL: No JSON-RPC responses in output"
cat "$OUTPUT"
exit 1
fi
- name: Reproduce runtime#61 — stdin from regular file
run: |
set -euo pipefail
echo "=== stdin as regular file (CI tee / capture pattern) ==="
INPUT=$(mktemp)
OUTPUT=$(mktemp)
trap 'rm -f "$INPUT" "$OUTPUT"' EXIT
cat > "$INPUT" <<'EOF'
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
{"jsonrpc":"2.0","id":2,"method":"tools/list"}
EOF
python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || {
RC=$?
echo "FAIL: MCP server exited with code $RC"
cat "$OUTPUT"
exit 1
}
echo "PASS: MCP server handled regular-file stdin without crashing"
if grep -q '"result"' "$OUTPUT"; then
echo "PASS: JSON-RPC responses found in output"
else
echo "FAIL: No JSON-RPC responses in output"
cat "$OUTPUT"
exit 1
fi
- name: Verify warning is emitted for non-pipe stdio
run: |
set -euo pipefail
echo "=== Verify diagnostic warning ==="
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"' EXIT
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1
# The warning should mention "not a pipe" for operator visibility
if grep -qi "not a pipe" "$OUTPUT"; then
echo "PASS: Diagnostic warning emitted for non-pipe stdio"
else
echo "NOTE: No warning in output (may be suppressed by log level)"
fi
- name: Run unit tests for stdio transport
run: |
set -euo pipefail
echo "=== Running stdio transport unit tests ==="
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov

View File

@ -18,7 +18,7 @@
import { useCallback, useState } from "react";
import * as Dialog from "@radix-ui/react-dialog";
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields";
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields";
export interface ExternalConnectionInfo {
workspace_id: string;
@ -58,6 +58,10 @@ export interface ExternalConnectionInfo {
// openclaw gateway on loopback. Outbound-tools-only today; push
// parity on an external openclaw needs a sessions.steer bridge.
openclaw_snippet?: string;
// Kimi CLI setup snippet — self-contained Python heartbeat script
// that keeps a Kimi workspace online in poll mode. Optional for
// backward compat with platforms that haven't shipped the Kimi tab.
kimi_snippet?: string;
}
interface Props {
@ -150,6 +154,11 @@ export function ExternalConnectModal({ info, onClose }: Props) {
'WORKSPACE_TOKEN="<paste from create response>"',
`WORKSPACE_TOKEN="${info.auth_token}"`,
);
// Kimi snippet carries the placeholder inside the shell heredoc.
const filledKimi = info.kimi_snippet?.replace(
'MOLECULE_WORKSPACE_TOKEN=<paste from create response>',
`MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`,
);
return (
<Dialog.Root open onOpenChange={(o) => !o && onClose()}>
@ -189,6 +198,7 @@ export function ExternalConnectModal({ info, onClose }: Props) {
if (filledHermes) tabs.push("hermes");
if (filledCodex) tabs.push("codex");
if (filledOpenClaw) tabs.push("openclaw");
if (filledKimi) tabs.push("kimi");
tabs.push("curl", "fields");
return tabs;
})().map((t) => (
@ -212,6 +222,8 @@ export function ExternalConnectModal({ info, onClose }: Props) {
? "Codex"
: t === "openclaw"
? "OpenClaw"
: t === "kimi"
? "Kimi"
: t === "python"
? "Python SDK"
: t === "mcp"
@ -288,6 +300,15 @@ export function ExternalConnectModal({ info, onClose }: Props) {
onCopy={() => copy(filledOpenClaw, "openclaw")}
/>
)}
{tab === "kimi" && filledKimi && (
<SnippetBlock
value={filledKimi}
label="Kimi CLI — self-contained Python bridge. Registers, heartbeats, polls for canvas messages, and echoes replies back. NAT-safe (no public URL). Run in a background terminal or via launchd."
copyKey="kimi"
copied={copiedKey === "kimi"}
onCopy={() => copy(filledKimi, "kimi")}
/>
)}
{tab === "fields" && (
<div className="space-y-2">
<Field label="workspace_id" value={info.workspace_id} onCopy={() => copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} />

View File

@ -58,6 +58,7 @@ const SAMPLE_INFO = {
hermes_channel_snippet: "# hermes ws=ws-test",
codex_snippet: "# codex ws=ws-test",
openclaw_snippet: "# openclaw ws=ws-test",
kimi_snippet: "# kimi ws=ws-test",
};
describe("ExternalConnectionSection", () => {

View File

@ -0,0 +1,132 @@
#!/usr/bin/env bash
# Staging E2E for MCP stdio transport (runtime#61 regression).
#
# Verifies that the MCP server in the claude-code workspace image
# handles stdout redirected to a regular file — the exact failure
# mode openclaw hits when capturing MCP output.
#
# Required env:
# MOLECULE_CP_URL default: https://staging-api.moleculesai.app
# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN)
#
# Optional env:
# E2E_KEEP_ORG 1 → skip teardown (debugging only)
# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID}
set -euo pipefail
CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}"
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}"
SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32)
log() { echo "[$(date +%H:%M:%S)] $*"; }
fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; }
ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
CURL_COMMON=(-sS --fail-with-body --max-time 30)
# ─── cleanup trap ───────────────────────────────────────────────────────
CLEANUP_DONE=0
cleanup_org() {
local _entry_rc=$?
if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi
CLEANUP_DONE=1
if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then
log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection"
return 0
fi
log "Cleanup: deleting tenant $SLUG..."
curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \
&& ok "Teardown request accepted" \
|| log "Teardown returned non-2xx (may already be gone)"
}
trap cleanup_org EXIT
# ─── provision tenant ───────────────────────────────────────────────────
log "Provisioning tenant $SLUG..."
# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors
TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}")
ok "Tenant provisioned"
# ─── get tenant admin token ─────────────────────────────────────────────
log "Fetching tenant admin token..."
for _ in $(seq 1 30); do
TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \
-H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}')
TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "")
[ -n "$TOKEN" ] && break
sleep 2
done
[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token"
ok "Tenant admin token obtained"
# ─── create claude-code workspace ───────────────────────────────────────
log "Creating claude-code workspace..."
WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}')
WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
ok "Workspace created: $WS_ID"
# ─── wait for online ────────────────────────────────────────────────────
log "Waiting for workspace to come online (up to 120s)..."
for _ in $(seq 1 24); do
STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
-H "Authorization: Bearer $TOKEN" 2>/dev/null \
| python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "")
[ "$STATUS" = "online" ] && break
sleep 5
done
[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)"
ok "Workspace online"
# ─── get workspace container info ───────────────────────────────────────
log "Fetching workspace runtime info..."
RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
-H "Authorization: Bearer $TOKEN" 2>/dev/null)
CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "")
[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response"
ok "Container ID: $CONTAINER_ID"
# ─── MCP stdio transport test ───────────────────────────────────────────
log "Testing MCP stdio transport with regular-file stdout..."
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"; cleanup_org' EXIT
# Send initialize + tools/list via stdin, capture stdout to regular file
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \
python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || {
RC=$?
log "MCP server exited with code $RC (expected for stdin EOF)"
}
if grep -q '"result"' "$OUTPUT"; then
ok "MCP server handles regular-file stdout"
else
fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")"
fi
if grep -q '"tools"' "$OUTPUT"; then
ok "MCP tools/list returns tools"
else
fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")"
fi
# ─── summary ────────────────────────────────────────────────────────────
log "All tests passed ✅"

View File

@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
var wsRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
if wsRuntime == "external" {
if isExternalLikeRuntime(wsRuntime) {
return false
}
if !h.HasProvisioner() {

View File

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)

View File

@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target
// lives on the other side of the wire and needs the URL as-is
// (localhost rewrites wouldn't resolve from its host anyway).
// Phase 30.6.
if wsRuntime == "external" {
if isExternalLikeRuntime(wsRuntime) {
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
return
}
@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta
outURL := wsURL
var callerRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
if callerRuntime != "external" {
if !isExternalLikeRuntime(callerRuntime) {
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
}

View File

@ -50,6 +50,7 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string)
"hermes_channel_snippet": stamp(externalHermesChannelTemplate),
"codex_snippet": stamp(externalCodexTemplate),
"openclaw_snippet": stamp(externalOpenClawTemplate),
"kimi_snippet": stamp(externalKimiTemplate),
}
}
@ -489,6 +490,149 @@ codex
// external openclaw would need a sessions.steer bridge daemon (the
// equivalent of hermes-channel-molecule for openclaw). Tracked
// separately; outbound tools is the first cut.
// externalKimiTemplate — complete poll-based external setup for Kimi CLI.
// Includes register + heartbeat + inbound activity polling + reply via
// /notify. No public URL needed (NAT-safe). Operators paste once and run
// in a background terminal or via launchd.
const externalKimiTemplate = `# Kimi CLI external setup register + heartbeat + inbound poll + reply.
# For operators whose external agent is a Kimi CLI session.
# No public URL needed; runs behind NAT in poll mode.
# 1. Install the workspace runtime wheel (provides HTTP client):
pip install molecule-ai-workspace-runtime
# 2. Save credentials and the bridge script:
mkdir -p ~/.molecule-ai/kimi-workspace
chmod 700 ~/.molecule-ai/kimi-workspace
cat > ~/.molecule-ai/kimi-workspace/env <<'EOF'
WORKSPACE_ID={{WORKSPACE_ID}}
PLATFORM_URL={{PLATFORM_URL}}
MOLECULE_WORKSPACE_TOKEN=<paste from create response>
EOF
chmod 600 ~/.molecule-ai/kimi-workspace/env
cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF'
#!/usr/bin/env python3
"""Kimi bridge — keeps workspace online and polls for canvas messages."""
import json, logging, time
from pathlib import Path
import httpx
ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env"
HEARTBEAT_INTERVAL = 20
POLL_INTERVAL = 5
def load_env():
env = {}
for line in ENV.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
env[k.strip()] = v.strip()
return env
def hdrs(url, token):
return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"}
def register(client, url, ws, tok):
r = client.post(f"{url}/registry/register", json={
"id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []},
"delivery_mode": "poll",
}, headers=hdrs(url, tok))
r.raise_for_status()
logging.info("registered %s", ws)
def heartbeat(client, url, ws, tok, start):
r = client.post(f"{url}/registry/heartbeat", json={
"workspace_id": ws, "error_rate": 0.0, "sample_error": "",
"active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start),
}, headers=hdrs(url, tok))
r.raise_for_status()
def poll_inbound(client, url, ws, tok, since_id):
params = {"since_secs": "30", "limit": "50"}
if since_id:
params["since_id"] = since_id
r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok))
r.raise_for_status()
return r.json()
def send_reply(client, url, ws, tok, text):
r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok))
r.raise_for_status()
logging.info("reply sent: %s", text[:80])
def extract_user_text(item):
"""Pull the user message text from an activity log request_body."""
try:
body = item.get("request_body") or {}
parts = body.get("params", {}).get("message", {}).get("parts", [])
return " ".join(p.get("text", "") for p in parts if p.get("text"))
except Exception:
return ""
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
start = time.time()
since_id = ""
last_beat = 0
while True:
try:
e = load_env()
purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]
with httpx.Client(timeout=10.0) as c:
# Heartbeat every HEARTBEAT_INTERVAL seconds
if time.time() - last_beat >= HEARTBEAT_INTERVAL:
register(c, purl, ws, tok)
heartbeat(c, purl, ws, tok, start)
last_beat = time.time()
# Poll for new canvas messages
items = poll_inbound(c, purl, ws, tok, since_id)
for item in items:
since_id = item["id"]
src = item.get("source_id")
method = item.get("method") or ""
# Skip our own /notify replies and agent-originated traffic
if method == "notify" or src is not None:
continue
text = extract_user_text(item)
if text:
logging.info("INBOUND from canvas: %s", text)
# Replace the echo below with your own logic:
send_reply(c, purl, ws, tok, f"Echo: {text}")
time.sleep(POLL_INTERVAL)
except Exception as exc:
logging.warning("loop failed: %s", exc)
time.sleep(5)
if __name__ == "__main__":
main()
PYEOF
chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py
# 3. Start the bridge (run in a persistent terminal or via launchd):
python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py
# What the script does:
# Registers the workspace in poll mode (no public URL needed)
# Heartbeats every 20s to keep STATUS = online on the canvas
# Polls /workspaces/:id/activity every 5s for new canvas messages
# Echo-replies via POST /workspaces/:id/notify
#
# To change the reply logic, edit the send_reply() call inside the loop.
# To send a one-off reply from another terminal:
# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \
# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \
# -H "Content-Type: application/json" \
# -d '{"message":"Hello from Kimi"}'
#
# For push-mode inbound A2A (instead of polling), pair with the Python SDK
# tab but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel).
#
# Need help?
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
`
const externalOpenClawTemplate = `# OpenClaw MCP config outbound tool path. For operators whose
# external agent is an openclaw session.
#

View File

@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
if runtime != "external" {
if !isExternalLikeRuntime(runtime) {
// Rotating a hermes/claude-code workspace's bearer would not
// just break the ssh-EIC tunnel auth on the platform side — it
// would also leave the workspace's in-container heartbeat with
@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
// here so the canvas can show "rotate is for external workspaces;
// click Restart instead" rather than silently corrupting state.
c.JSON(http.StatusBadRequest, gin.H{
"error": "rotate is only valid for runtime=external workspaces",
"error": "rotate is only valid for external/BYO-compute workspaces",
"runtime": runtime,
"hint": "use POST /workspaces/:id/restart for non-external runtimes",
"hint": "use POST /workspaces/:id/restart for container-backed runtimes",
})
return
}
@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
if runtime != "external" {
if !isExternalLikeRuntime(runtime) {
c.JSON(http.StatusBadRequest, gin.H{
"error": "connection payload is only valid for runtime=external workspaces",
"error": "connection payload is only valid for external/BYO-compute workspaces",
"runtime": runtime,
})
return

View File

@ -82,6 +82,7 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) {
"curl_register_template", "python_snippet",
"claude_code_channel_snippet", "universal_mcp_snippet",
"hermes_channel_snippet", "codex_snippet", "openclaw_snippet",
"kimi_snippet",
} {
if _, ok := body.Connection[k]; !ok {
t.Errorf("payload missing snippet field: %s", k)

View File

@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool {
if err != nil {
return false
}
return runtime == "external"
return isExternalLikeRuntime(runtime)
}
func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) {

View File

@ -76,6 +76,34 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) {
}
}
// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute,
// same shape as external. Push-install via docker exec must be rejected.
func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) {
h := NewPluginsHandler(t.TempDir(), nil, nil).
WithRuntimeLookup(func(workspaceID string) (string, error) {
return "kimi-cli", nil
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
c.Request = httptest.NewRequest(
"POST",
"/workspaces/ws-kimi/plugins",
bytes.NewBufferString(`{"source":"local://my-plugin"}`),
)
c.Request.Header.Set("Content-Type", "application/json")
h.Install(c)
if w.Code != http.StatusUnprocessableEntity {
t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "external runtimes") {
t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String())
}
}
// TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime
// guard MUST NOT short-circuit container-backed runtimes. With
// `runtime='claude-code'` the install proceeds past the guard; without a

View File

@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
if existing.Valid && existing.String != "" {
return existing.String, nil
}
if runtime.Valid && runtime.String == "external" {
if runtime.Valid && isExternalLikeRuntime(runtime.String) {
return models.DeliveryModePoll, nil
}
return models.DeliveryModePush, nil

View File

@ -1721,6 +1721,65 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
}
}
// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime
// poll-default test: a workspace whose existing row has runtime=kimi-cli
// and empty delivery_mode must resolve to poll (laptop/NAT-safe default).
func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-kimi-default-poll"
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
AddRow(sql.NullString{}, "kimi-cli"))
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_mode"] != "poll" {
t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)",
resp["delivery_mode"], "poll")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_NonExternalRuntime_StillDefaultsToPush guards the
// inverse: a non-external runtime (langgraph, hermes, etc.) with
// empty delivery_mode keeps the historical push default. Catches

View File

@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
// behavior agree, and surface a clear message instead of silently
// no-op'ing — the canvas can show the operator that the fix is on
// their side.
if dbRuntime == "external" {
if isExternalLikeRuntime(dbRuntime) {
c.JSON(http.StatusOK, gin.H{
"status": "noop",
"runtime": "external",
"message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart",
"runtime": dbRuntime,
"message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart",
})
return
}
@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
// Don't auto-restart external workspaces (no Docker container)
// or mock workspaces (no container, every reply is canned —
// see workspace-server/internal/handlers/mock_runtime.go).
if dbRuntime == "external" || dbRuntime == "mock" {
if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" {
return
}

View File

@ -179,6 +179,51 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) {
}
}
func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT status, name, tier, COALESCE").
WithArgs("ws-kimi").
WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}).
AddRow("offline", "Kimi Agent", 1, "kimi-cli"))
mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id =").
WithArgs("ws-kimi").
WillReturnRows(sqlmock.NewRows([]string{"parent_id"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil)
handler.Restart(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode response: %v", err)
}
if got, _ := resp["status"].(string); got != "noop" {
t.Errorf("expected status=noop, got %v", resp["status"])
}
if got, _ := resp["runtime"].(string); got != "kimi-cli" {
t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"])
}
if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") {
t.Errorf("expected message about operator-driven, got %v", resp["message"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestRestartHandler_NilProvisionerReturns503(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)

View File

@ -187,27 +187,19 @@ def enrich_peer_metadata_nonblocking(
canon = _validate_peer_id(peer_id)
if canon is None:
return None
# Cache-first: return immediately on warm hit (same TTL logic as the
# sync path). This is the hot-path optimisation — every push from a
# warm peer must return the record without touching the in-flight set
# or the executor. A background fetch that races to fill the cache
# will find the entry already present when it calls
# enrich_peer_metadata (which does its own fresh-TTL check), so it
# exits as a no-op with no extra network traffic.
# Cache hit (fresh): return without blocking on a registry GET.
# This is the hot path for active peer conversations — avoids
# spawning a background thread for every push from a known peer.
current = time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Cache miss or TTL expired: schedule background fetch unless one is
# already in flight for this peer. The synchronous version atomically
# reads-then-writes; the async version splits that into "schedule
# fetch" + "fetch fills cache later." The in-flight set keeps a
# flurry of pushes from one peer (e.g., a chatty agent) from
# spawning N parallel GETs.
# already in flight for this peer. The in-flight set keeps a flurry
# of pushes from one peer (e.g., a chatty agent) from spawning N
# parallel GETs.
with _enrich_in_flight_lock:
if canon in _enrich_in_flight:
return None

View File

@ -548,12 +548,7 @@ class LangGraphA2AExecutor(AgentExecutor):
# receive the error and stop polling.
await updater.failed(
message=new_text_message(
# Pass the exception string as stderr so sanitize_agent_error
# can include a ~1KB preview in the A2A error response.
# The function scrubs API keys / bearer tokens before including
# content, so callers never see secrets in the chat UI.
# Fixes: roadmap item "SDK executor stderr swallowing".
sanitize_agent_error(stderr=str(e)), task_id=task_id, context_id=context_id,
sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id
)
)
finally:

View File

@ -163,15 +163,67 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
# --- MCP Notification bridge ---
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
# Runtime-adaptive notification method. Each MCP host uses a different
# JSON-RPC notification method for inbound push. Detect at startup so
# the inbox poller emits the right shape for the host that spawned us.
#
# Detection order (first match wins):
# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel
# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel
# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel
# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel
# fallback → notifications/message
#
# The method is resolved once at startup and cached in
# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching
# _detect_runtime() or setting the env var before import.
_DETECTED_RUNTIME: str | None = None
def _detect_runtime() -> str:
"""Detect which MCP host spawned this process."""
global _DETECTED_RUNTIME
if _DETECTED_RUNTIME is not None:
return _DETECTED_RUNTIME
env = os.environ
if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"):
_DETECTED_RUNTIME = "claude"
elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"):
_DETECTED_RUNTIME = "openclaw"
elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"):
_DETECTED_RUNTIME = "cursor"
elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"):
_DETECTED_RUNTIME = "hermes"
else:
_DETECTED_RUNTIME = "generic"
logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}")
return _DETECTED_RUNTIME
def _notification_method_for_runtime(runtime: str) -> str:
"""Return the JSON-RPC notification method for the given runtime."""
return {
"claude": "notifications/claude/channel",
"openclaw": "notifications/openclaw/channel",
"cursor": "notifications/cursor/channel",
"hermes": "notifications/hermes/channel",
"generic": "notifications/message",
}.get(runtime, "notifications/message")
# Lazily resolved so tests can patch _detect_runtime() before the first
# notification is built. The value is read once per process lifetime.
_CHANNEL_NOTIFICATION_METHOD: str | None = None
def _channel_notification_method() -> str:
"""Return the cached notification method for the detected runtime."""
global _CHANNEL_NOTIFICATION_METHOD
if _CHANNEL_NOTIFICATION_METHOD is None:
_CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime())
return _CHANNEL_NOTIFICATION_METHOD
# ============= Trust-boundary gates for channel-notification meta ==============
@ -569,7 +621,7 @@ def _build_channel_notification(msg: dict) -> dict:
)
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"method": _channel_notification_method(),
"params": {
"content": content,
"meta": meta,
@ -632,66 +684,69 @@ def _format_channel_content(
# --- MCP Server (JSON-RPC over stdio) ---
def _assert_stdio_is_pipe_compatible(
stdin_fd: int = 0, stdout_fd: int = 1
) -> None:
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Warn when stdio isn't a pipe — but continue anyway.
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
sockets, and character devices. When molecule-mcp is launched with
stdout redirected to a regular file (CI smoke tests, ad-hoc local
debugging that captures output), the asyncio call later raises
``ValueError: Pipe transport is only for pipes, sockets and character
devices`` from inside the event loop surfaced to the operator as a
confusing traceback. Detect early and exit cleanly with guidance
instead. See molecule-ai-workspace-runtime#61.
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
rejected regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
We now use direct buffer I/O which works with ANY file descriptor,
so this is a diagnostic-only warning for operators debugging setup
issues. See molecule-ai-workspace-runtime#61.
"""
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
try:
mode = os.fstat(fd).st_mode
except OSError as exc:
print(
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
f" from a terminal or with stdio closed.",
file=sys.stderr,
except OSError:
continue
if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
logger.warning(
f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. "
f"This is fine — the universal stdio transport handles regular files, "
f"PTYs, and sockets. If you see garbled output, launch from an "
f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)."
)
sys.exit(2)
if not (
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
):
print(
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
f" socket, or character device — asyncio's stdio transport rejects\n"
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
f" and character devices`. Common causes:\n"
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
f" `tee`/process substitution if you need to capture output:\n"
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
file=sys.stderr,
)
sys.exit(2)
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
Uses sys.stdin.buffer / sys.stdout.buffer directly instead of
asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe
transport rejects regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
This breaks when the MCP host captures stdout (openclaw, CI tests,
ad-hoc debugging with tee). Reading/writing the buffer directly
works with ANY file descriptor.
See molecule-ai-workspace-runtime#61.
"""
loop = asyncio.get_event_loop()
# sys.stdin.buffer exists on text-mode streams (default); on binary
# streams (tests, some CI setups) stdin IS the buffer.
stdin = getattr(sys.stdin, "buffer", sys.stdin)
stdout = getattr(sys.stdout, "buffer", sys.stdout)
async def write_response(response: dict):
data = json.dumps(response) + "\n"
writer.write(data.encode())
await writer.drain()
stdout.write(data.encode())
stdout.flush()
# Build a StreamWriter-compatible wrapper for the inbox bridge.
# The bridge expects a writer with .write() and .drain() methods.
class _StdoutWriter:
def __init__(self, buf):
self._buf = buf
def write(self, data: bytes) -> None:
self._buf.write(data)
async def drain(self) -> None:
self._buf.flush()
writer = _StdoutWriter(stdout)
# Wire the inbox → MCP notification bridge. The bridge body lives
# in `_setup_inbox_bridge` so the threading + asyncio + stdout
@ -701,22 +756,27 @@ async def main(): # pragma: no cover
_setup_inbox_bridge(writer, asyncio.get_running_loop())
)
buffer = ""
# Log runtime detection for operator diagnostics
runtime = _detect_runtime()
logger.info(f"MCP stdio transport ready (runtime={runtime}, "
f"notification_method={_channel_notification_method()})")
buffer = b""
while True:
try:
chunk = await reader.read(65536)
chunk = await loop.run_in_executor(None, stdin.read, 65536)
if not chunk:
break
buffer += chunk.decode(errors="replace")
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
request = json.loads(line.decode(errors="replace"))
except json.JSONDecodeError:
continue
@ -780,7 +840,7 @@ def cli_main() -> None: # pragma: no cover
break every external-runtime operator's MCP install — the 0.1.16
``main_sync`` rename incident is the cautionary precedent.
"""
_assert_stdio_is_pipe_compatible()
_warn_if_stdio_not_pipe()
asyncio.run(main())

View File

@ -165,7 +165,10 @@ async def test_agent_error_handling():
eq.enqueue_event.assert_called_once()
error_msg = str(eq.enqueue_event.call_args[0][0])
assert "model crashed" in error_msg
# sanitize_agent_error strips the raw exception message from the UI;
# raw detail goes to workspace logs only. This is the secure behaviour.
assert "Agent error (RuntimeError)" in error_msg
assert "model crashed" not in error_msg
@pytest.mark.asyncio
@ -1200,7 +1203,10 @@ async def test_terminal_error_routes_via_updater_failed():
"terminal error Message must route via updater.failed() in task mode"
)
err_msg = eq._failed_calls[-1]
assert "model crashed" in str(err_msg)
# sanitize_agent_error strips the raw exception message from the UI;
# raw detail goes to workspace logs only.
assert "Agent error (RuntimeError)" in str(err_msg)
assert "model crashed" not in str(err_msg)
# And complete() must NOT have been called on the failure path.
assert not eq._complete_calls, (
"complete() should not fire when execute() raises"

View File

@ -252,23 +252,30 @@ def test_attachments_param_description_emphasizes_REQUIRED():
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
"""Method MUST be `notifications/claude/channel` when runtime=claude —
that's what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
# Reset the cached method so _channel_notification_method() re-resolves
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
try:
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint():
@ -1618,80 +1625,91 @@ async def test_inbox_bridge_emits_channel_notification_to_writer():
import os
import threading
from unittest.mock import patch
from a2a_mcp_server import _setup_inbox_bridge
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
# Force claude runtime so the notification method is predictable
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
_mcp._channel_notification_method() # prime cache
try:
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
try:
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch):
@ -1808,99 +1826,75 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
class TestStdioPipeAssertion:
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
that turns asyncio's `ValueError: Pipe transport is only for pipes,
sockets and character devices` into a clear operator message + exit 2.
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
the old fatal _assert_stdio_is_pipe_compatible guard.
The universal stdio transport now works with ANY file descriptor
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
is gone. These tests verify the warning is emitted for non-pipe
stdio so operators still get diagnostic signal when debugging.
See molecule-ai-workspace-runtime#61.
"""
def test_pipe_pair_passes_silently(self):
"""Happy path — both fds are pipes (the production launch shape
from any MCP client). Should return None without printing or
exiting."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_pipe_pair_passes_silently(self, caplog):
"""Happy path — both fds are pipes. No warning emitted."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
try:
# No exit, no stderr noise. We don't capture stderr here
# because pipe path should produce zero output.
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
assert "not a pipe" not in caplog.text
finally:
os.close(r)
os.close(w)
def test_regular_file_stdout_exits_with_friendly_message(
self, tmp_path, capsys
):
def test_regular_file_stdout_warns(self, tmp_path, caplog):
"""Reproducer for runtime#61: stdout redirected to a regular file.
Pre-fix this would surface upstream as
`ValueError: Pipe transport is only for pipes...`. Post-fix we
exit with code 2 and a stderr message that names the symptom +
fix."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
Now emits a warning instead of exiting."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
# stdin = pipe (so we isolate the stdout failure path);
# stdout = regular file (the bug condition).
r, _w = os.pipe()
regular = tmp_path / "captured.log"
f = open(regular, "wb")
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=f.fileno()
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
# Names the failing stream + the asyncio constraint that
# would otherwise crash. Don't pin the exact wording — the
# asserts pin the operator-recoverable signal only.
assert "stdout" in err
assert "regular file" in err
assert "pipe" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
assert "stdout" in caplog.text
assert "not a pipe" in caplog.text
finally:
f.close()
os.close(r)
def test_regular_file_stdin_exits_with_friendly_message(
self, tmp_path, capsys
):
"""Symmetric case — stdin redirected from a regular file. Same
asyncio constraint applies via connect_read_pipe."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_regular_file_stdin_warns(self, tmp_path, caplog):
"""Symmetric case — stdin redirected from a regular file."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
regular = tmp_path / "input.json"
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
f = open(regular, "rb")
_r, w = os.pipe()
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=f.fileno(), stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "stdin" in err
assert "regular file" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
assert "stdin" in caplog.text
assert "not a pipe" in caplog.text
finally:
f.close()
os.close(w)
def test_closed_fd_exits_with_stat_error(self, capsys):
"""If stdio is closed (rare but seen in detached daemonized
contexts), os.fstat raises OSError. We catch it and exit 2 with
a guidance message instead of letting the traceback escape."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_closed_fd_warns_about_stat_error(self, caplog):
"""If stdio is closed, os.fstat raises OSError. Warning is
skipped silently (can't stat the fd)."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
os.close(w) # Now `w` is a stale fd — fstat will fail.
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "cannot stat stdout" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
# No warning emitted because fstat failed before the check
assert "not a pipe" not in caplog.text
finally:
os.close(r)