fix(mcp): universal stdio transport + runtime-adaptive notifications #778
165
.gitea/workflows/ci-mcp-stdio-transport.yml
Normal file
165
.gitea/workflows/ci-mcp-stdio-transport.yml
Normal file
@ -0,0 +1,165 @@
|
||||
name: MCP Stdio Transport Regression
|
||||
|
||||
# Regression test for molecule-ai-workspace-runtime#61:
|
||||
# asyncio.connect_read_pipe / connect_write_pipe fail with
|
||||
# ValueError: "Pipe transport is only for pipes, sockets and character devices"
|
||||
# when stdout is a regular file (openclaw capture, CI tee, debugging).
|
||||
#
|
||||
# This workflow reproduces the exact failure mode and verifies the
|
||||
# fallback to direct buffer I/O works. It runs on every PR that
|
||||
# touches the MCP server or this workflow, plus nightly cron.
|
||||
#
|
||||
# Why a separate workflow (not folded into ci.yml python-lint):
|
||||
# - The test needs to spawn the MCP server with stdout redirected
|
||||
# to a regular file (not a TTY/pipe), which conflicts with
|
||||
# pytest's own capture mechanism.
|
||||
# - It exercises the actual process spawn path (python a2a_mcp_server.py)
|
||||
# not just unit-test mocks — closer to the real openclaw integration.
|
||||
# - A dedicated workflow surfaces stdio-specific regressions without
|
||||
# coupling to the broader Python test suite's coverage gate.
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- 'workspace/a2a_mcp_server.py'
|
||||
- 'workspace/mcp_cli.py'
|
||||
- 'workspace/tests/test_a2a_mcp_server.py'
|
||||
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
|
||||
push:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- 'workspace/a2a_mcp_server.py'
|
||||
- 'workspace/mcp_cli.py'
|
||||
- 'workspace/tests/test_a2a_mcp_server.py'
|
||||
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
|
||||
schedule:
|
||||
# Nightly at 04:00 UTC — catches drift from dependency updates
|
||||
# (e.g. asyncio behavior changes in new Python patch releases).
|
||||
- cron: '0 4 * * *'
|
||||
|
||||
concurrency:
|
||||
group: mcp-stdio-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
GITHUB_SERVER_URL: https://git.moleculesai.app
|
||||
|
||||
jobs:
|
||||
# bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required.
|
||||
# mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs.
|
||||
mcp-stdio-regular-file:
|
||||
name: MCP stdio with regular-file stdout
|
||||
runs-on: ubuntu-latest
|
||||
continue-on-error: true # mc#774
|
||||
timeout-minutes: 5
|
||||
env:
|
||||
WORKSPACE_ID: "00000000-0000-0000-0000-000000000001"
|
||||
defaults:
|
||||
run:
|
||||
working-directory: workspace
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
|
||||
with:
|
||||
python-version: '3.11'
|
||||
cache: pip
|
||||
cache-dependency-path: workspace/requirements.txt
|
||||
- run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
|
||||
|
||||
- name: Reproduce runtime#61 — stdout as regular file
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Reproducing molecule-ai-workspace-runtime#61 ==="
|
||||
echo ""
|
||||
echo "Before the fix, this command would fail with:"
|
||||
echo ' ValueError: Pipe transport is only for pipes, sockets and character devices'
|
||||
echo ""
|
||||
|
||||
# Spawn the MCP server with stdout redirected to a regular file.
|
||||
# This is exactly what openclaw does when capturing MCP output.
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"' EXIT
|
||||
|
||||
# Send initialize request, then tools/list, then exit
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
|
||||
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
echo "FAIL: MCP server exited with code $RC"
|
||||
echo "--- stdout+stderr ---"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "PASS: MCP server handled regular-file stdout without crashing"
|
||||
echo ""
|
||||
echo "--- Output (first 20 lines) ---"
|
||||
head -20 "$OUTPUT"
|
||||
echo ""
|
||||
|
||||
# Verify we got valid JSON-RPC responses
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
echo "PASS: JSON-RPC responses found in output"
|
||||
else
|
||||
echo "FAIL: No JSON-RPC responses in output"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Reproduce runtime#61 — stdin from regular file
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== stdin as regular file (CI tee / capture pattern) ==="
|
||||
|
||||
INPUT=$(mktemp)
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$INPUT" "$OUTPUT"' EXIT
|
||||
|
||||
cat > "$INPUT" <<'EOF'
|
||||
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
|
||||
{"jsonrpc":"2.0","id":2,"method":"tools/list"}
|
||||
EOF
|
||||
|
||||
python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
echo "FAIL: MCP server exited with code $RC"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "PASS: MCP server handled regular-file stdin without crashing"
|
||||
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
echo "PASS: JSON-RPC responses found in output"
|
||||
else
|
||||
echo "FAIL: No JSON-RPC responses in output"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Verify warning is emitted for non-pipe stdio
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Verify diagnostic warning ==="
|
||||
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"' EXIT
|
||||
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1
|
||||
|
||||
# The warning should mention "not a pipe" for operator visibility
|
||||
if grep -qi "not a pipe" "$OUTPUT"; then
|
||||
echo "PASS: Diagnostic warning emitted for non-pipe stdio"
|
||||
else
|
||||
echo "NOTE: No warning in output (may be suppressed by log level)"
|
||||
fi
|
||||
|
||||
- name: Run unit tests for stdio transport
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Running stdio transport unit tests ==="
|
||||
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov
|
||||
@ -18,7 +18,7 @@
|
||||
import { useCallback, useState } from "react";
|
||||
import * as Dialog from "@radix-ui/react-dialog";
|
||||
|
||||
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields";
|
||||
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields";
|
||||
|
||||
export interface ExternalConnectionInfo {
|
||||
workspace_id: string;
|
||||
@ -58,6 +58,10 @@ export interface ExternalConnectionInfo {
|
||||
// openclaw gateway on loopback. Outbound-tools-only today; push
|
||||
// parity on an external openclaw needs a sessions.steer bridge.
|
||||
openclaw_snippet?: string;
|
||||
// Kimi CLI setup snippet — self-contained Python heartbeat script
|
||||
// that keeps a Kimi workspace online in poll mode. Optional for
|
||||
// backward compat with platforms that haven't shipped the Kimi tab.
|
||||
kimi_snippet?: string;
|
||||
}
|
||||
|
||||
interface Props {
|
||||
@ -150,6 +154,11 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
'WORKSPACE_TOKEN="<paste from create response>"',
|
||||
`WORKSPACE_TOKEN="${info.auth_token}"`,
|
||||
);
|
||||
// Kimi snippet carries the placeholder inside the shell heredoc.
|
||||
const filledKimi = info.kimi_snippet?.replace(
|
||||
'MOLECULE_WORKSPACE_TOKEN=<paste from create response>',
|
||||
`MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`,
|
||||
);
|
||||
|
||||
return (
|
||||
<Dialog.Root open onOpenChange={(o) => !o && onClose()}>
|
||||
@ -189,6 +198,7 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
if (filledHermes) tabs.push("hermes");
|
||||
if (filledCodex) tabs.push("codex");
|
||||
if (filledOpenClaw) tabs.push("openclaw");
|
||||
if (filledKimi) tabs.push("kimi");
|
||||
tabs.push("curl", "fields");
|
||||
return tabs;
|
||||
})().map((t) => (
|
||||
@ -212,6 +222,8 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
? "Codex"
|
||||
: t === "openclaw"
|
||||
? "OpenClaw"
|
||||
: t === "kimi"
|
||||
? "Kimi"
|
||||
: t === "python"
|
||||
? "Python SDK"
|
||||
: t === "mcp"
|
||||
@ -288,6 +300,15 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
onCopy={() => copy(filledOpenClaw, "openclaw")}
|
||||
/>
|
||||
)}
|
||||
{tab === "kimi" && filledKimi && (
|
||||
<SnippetBlock
|
||||
value={filledKimi}
|
||||
label="Kimi CLI — self-contained Python bridge. Registers, heartbeats, polls for canvas messages, and echoes replies back. NAT-safe (no public URL). Run in a background terminal or via launchd."
|
||||
copyKey="kimi"
|
||||
copied={copiedKey === "kimi"}
|
||||
onCopy={() => copy(filledKimi, "kimi")}
|
||||
/>
|
||||
)}
|
||||
{tab === "fields" && (
|
||||
<div className="space-y-2">
|
||||
<Field label="workspace_id" value={info.workspace_id} onCopy={() => copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} />
|
||||
|
||||
@ -58,6 +58,7 @@ const SAMPLE_INFO = {
|
||||
hermes_channel_snippet: "# hermes ws=ws-test",
|
||||
codex_snippet: "# codex ws=ws-test",
|
||||
openclaw_snippet: "# openclaw ws=ws-test",
|
||||
kimi_snippet: "# kimi ws=ws-test",
|
||||
};
|
||||
|
||||
describe("ExternalConnectionSection", () => {
|
||||
|
||||
132
tests/e2e/test_mcp_stdio_staging.sh
Executable file
132
tests/e2e/test_mcp_stdio_staging.sh
Executable file
@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env bash
|
||||
# Staging E2E for MCP stdio transport (runtime#61 regression).
|
||||
#
|
||||
# Verifies that the MCP server in the claude-code workspace image
|
||||
# handles stdout redirected to a regular file — the exact failure
|
||||
# mode openclaw hits when capturing MCP output.
|
||||
#
|
||||
# Required env:
|
||||
# MOLECULE_CP_URL default: https://staging-api.moleculesai.app
|
||||
# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN)
|
||||
#
|
||||
# Optional env:
|
||||
# E2E_KEEP_ORG 1 → skip teardown (debugging only)
|
||||
# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID}
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
|
||||
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}"
|
||||
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
|
||||
|
||||
SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}"
|
||||
SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32)
|
||||
|
||||
log() { echo "[$(date +%H:%M:%S)] $*"; }
|
||||
fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; }
|
||||
ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
|
||||
|
||||
CURL_COMMON=(-sS --fail-with-body --max-time 30)
|
||||
|
||||
# ─── cleanup trap ───────────────────────────────────────────────────────
|
||||
CLEANUP_DONE=0
|
||||
cleanup_org() {
|
||||
local _entry_rc=$?
|
||||
if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi
|
||||
CLEANUP_DONE=1
|
||||
|
||||
if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then
|
||||
log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection"
|
||||
return 0
|
||||
fi
|
||||
|
||||
log "Cleanup: deleting tenant $SLUG..."
|
||||
curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \
|
||||
&& ok "Teardown request accepted" \
|
||||
|| log "Teardown returned non-2xx (may already be gone)"
|
||||
}
|
||||
trap cleanup_org EXIT
|
||||
|
||||
# ─── provision tenant ───────────────────────────────────────────────────
|
||||
log "Provisioning tenant $SLUG..."
|
||||
# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors
|
||||
TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}")
|
||||
ok "Tenant provisioned"
|
||||
|
||||
# ─── get tenant admin token ─────────────────────────────────────────────
|
||||
log "Fetching tenant admin token..."
|
||||
for _ in $(seq 1 30); do
|
||||
TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}')
|
||||
TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "")
|
||||
[ -n "$TOKEN" ] && break
|
||||
sleep 2
|
||||
done
|
||||
[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token"
|
||||
ok "Tenant admin token obtained"
|
||||
|
||||
# ─── create claude-code workspace ───────────────────────────────────────
|
||||
log "Creating claude-code workspace..."
|
||||
WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \
|
||||
-H "Authorization: Bearer $TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}')
|
||||
WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
|
||||
ok "Workspace created: $WS_ID"
|
||||
|
||||
# ─── wait for online ────────────────────────────────────────────────────
|
||||
log "Waiting for workspace to come online (up to 120s)..."
|
||||
for _ in $(seq 1 24); do
|
||||
STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
|
||||
-H "Authorization: Bearer $TOKEN" 2>/dev/null \
|
||||
| python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "")
|
||||
[ "$STATUS" = "online" ] && break
|
||||
sleep 5
|
||||
done
|
||||
[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)"
|
||||
ok "Workspace online"
|
||||
|
||||
# ─── get workspace container info ───────────────────────────────────────
|
||||
log "Fetching workspace runtime info..."
|
||||
RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
|
||||
-H "Authorization: Bearer $TOKEN" 2>/dev/null)
|
||||
CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "")
|
||||
[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response"
|
||||
ok "Container ID: $CONTAINER_ID"
|
||||
|
||||
# ─── MCP stdio transport test ───────────────────────────────────────────
|
||||
log "Testing MCP stdio transport with regular-file stdout..."
|
||||
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"; cleanup_org' EXIT
|
||||
|
||||
# Send initialize + tools/list via stdin, capture stdout to regular file
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
|
||||
} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \
|
||||
python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
log "MCP server exited with code $RC (expected for stdin EOF)"
|
||||
}
|
||||
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
ok "MCP server handles regular-file stdout"
|
||||
else
|
||||
fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")"
|
||||
fi
|
||||
|
||||
if grep -q '"tools"' "$OUTPUT"; then
|
||||
ok "MCP tools/list returns tools"
|
||||
else
|
||||
fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")"
|
||||
fi
|
||||
|
||||
# ─── summary ────────────────────────────────────────────────────────────
|
||||
log "All tests passed ✅"
|
||||
@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if wsRuntime == "external" {
|
||||
if isExternalLikeRuntime(wsRuntime) {
|
||||
return false
|
||||
}
|
||||
if !h.HasProvisioner() {
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
|
||||
@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if wsRuntime == "external" {
|
||||
if isExternalLikeRuntime(wsRuntime) {
|
||||
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
|
||||
return
|
||||
}
|
||||
@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if callerRuntime != "external" {
|
||||
if !isExternalLikeRuntime(callerRuntime) {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
|
||||
@ -50,6 +50,7 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string)
|
||||
"hermes_channel_snippet": stamp(externalHermesChannelTemplate),
|
||||
"codex_snippet": stamp(externalCodexTemplate),
|
||||
"openclaw_snippet": stamp(externalOpenClawTemplate),
|
||||
"kimi_snippet": stamp(externalKimiTemplate),
|
||||
}
|
||||
}
|
||||
|
||||
@ -489,6 +490,149 @@ codex
|
||||
// external openclaw would need a sessions.steer bridge daemon (the
|
||||
// equivalent of hermes-channel-molecule for openclaw). Tracked
|
||||
// separately; outbound tools is the first cut.
|
||||
// externalKimiTemplate — complete poll-based external setup for Kimi CLI.
|
||||
// Includes register + heartbeat + inbound activity polling + reply via
|
||||
// /notify. No public URL needed (NAT-safe). Operators paste once and run
|
||||
// in a background terminal or via launchd.
|
||||
const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat + inbound poll + reply.
|
||||
# For operators whose external agent is a Kimi CLI session.
|
||||
# No public URL needed; runs behind NAT in poll mode.
|
||||
|
||||
# 1. Install the workspace runtime wheel (provides HTTP client):
|
||||
pip install molecule-ai-workspace-runtime
|
||||
|
||||
# 2. Save credentials and the bridge script:
|
||||
mkdir -p ~/.molecule-ai/kimi-workspace
|
||||
chmod 700 ~/.molecule-ai/kimi-workspace
|
||||
cat > ~/.molecule-ai/kimi-workspace/env <<'EOF'
|
||||
WORKSPACE_ID={{WORKSPACE_ID}}
|
||||
PLATFORM_URL={{PLATFORM_URL}}
|
||||
MOLECULE_WORKSPACE_TOKEN=<paste from create response>
|
||||
EOF
|
||||
chmod 600 ~/.molecule-ai/kimi-workspace/env
|
||||
|
||||
cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF'
|
||||
#!/usr/bin/env python3
|
||||
"""Kimi bridge — keeps workspace online and polls for canvas messages."""
|
||||
import json, logging, time
|
||||
from pathlib import Path
|
||||
import httpx
|
||||
|
||||
ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env"
|
||||
HEARTBEAT_INTERVAL = 20
|
||||
POLL_INTERVAL = 5
|
||||
|
||||
def load_env():
|
||||
env = {}
|
||||
for line in ENV.read_text().splitlines():
|
||||
if "=" in line and not line.startswith("#"):
|
||||
k, v = line.split("=", 1)
|
||||
env[k.strip()] = v.strip()
|
||||
return env
|
||||
|
||||
def hdrs(url, token):
|
||||
return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"}
|
||||
|
||||
def register(client, url, ws, tok):
|
||||
r = client.post(f"{url}/registry/register", json={
|
||||
"id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []},
|
||||
"delivery_mode": "poll",
|
||||
}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
logging.info("registered %s", ws)
|
||||
|
||||
def heartbeat(client, url, ws, tok, start):
|
||||
r = client.post(f"{url}/registry/heartbeat", json={
|
||||
"workspace_id": ws, "error_rate": 0.0, "sample_error": "",
|
||||
"active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start),
|
||||
}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
|
||||
def poll_inbound(client, url, ws, tok, since_id):
|
||||
params = {"since_secs": "30", "limit": "50"}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def send_reply(client, url, ws, tok, text):
|
||||
r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
logging.info("reply sent: %s", text[:80])
|
||||
|
||||
def extract_user_text(item):
|
||||
"""Pull the user message text from an activity log request_body."""
|
||||
try:
|
||||
body = item.get("request_body") or {}
|
||||
parts = body.get("params", {}).get("message", {}).get("parts", [])
|
||||
return " ".join(p.get("text", "") for p in parts if p.get("text"))
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
start = time.time()
|
||||
since_id = ""
|
||||
last_beat = 0
|
||||
while True:
|
||||
try:
|
||||
e = load_env()
|
||||
purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]
|
||||
with httpx.Client(timeout=10.0) as c:
|
||||
# Heartbeat every HEARTBEAT_INTERVAL seconds
|
||||
if time.time() - last_beat >= HEARTBEAT_INTERVAL:
|
||||
register(c, purl, ws, tok)
|
||||
heartbeat(c, purl, ws, tok, start)
|
||||
last_beat = time.time()
|
||||
|
||||
# Poll for new canvas messages
|
||||
items = poll_inbound(c, purl, ws, tok, since_id)
|
||||
for item in items:
|
||||
since_id = item["id"]
|
||||
src = item.get("source_id")
|
||||
method = item.get("method") or ""
|
||||
# Skip our own /notify replies and agent-originated traffic
|
||||
if method == "notify" or src is not None:
|
||||
continue
|
||||
text = extract_user_text(item)
|
||||
if text:
|
||||
logging.info("INBOUND from canvas: %s", text)
|
||||
# Replace the echo below with your own logic:
|
||||
send_reply(c, purl, ws, tok, f"Echo: {text}")
|
||||
time.sleep(POLL_INTERVAL)
|
||||
except Exception as exc:
|
||||
logging.warning("loop failed: %s", exc)
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
PYEOF
|
||||
chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py
|
||||
|
||||
# 3. Start the bridge (run in a persistent terminal or via launchd):
|
||||
python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py
|
||||
|
||||
# What the script does:
|
||||
# • Registers the workspace in poll mode (no public URL needed)
|
||||
# • Heartbeats every 20s to keep STATUS = online on the canvas
|
||||
# • Polls /workspaces/:id/activity every 5s for new canvas messages
|
||||
# • Echo-replies via POST /workspaces/:id/notify
|
||||
#
|
||||
# To change the reply logic, edit the send_reply() call inside the loop.
|
||||
# To send a one-off reply from another terminal:
|
||||
# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \
|
||||
# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \
|
||||
# -H "Content-Type: application/json" \
|
||||
# -d '{"message":"Hello from Kimi"}'
|
||||
#
|
||||
# For push-mode inbound A2A (instead of polling), pair with the Python SDK
|
||||
# tab — but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel).
|
||||
#
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
|
||||
`
|
||||
|
||||
const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path. For operators whose
|
||||
# external agent is an openclaw session.
|
||||
#
|
||||
|
||||
@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||
return
|
||||
}
|
||||
if runtime != "external" {
|
||||
if !isExternalLikeRuntime(runtime) {
|
||||
// Rotating a hermes/claude-code workspace's bearer would not
|
||||
// just break the ssh-EIC tunnel auth on the platform side — it
|
||||
// would also leave the workspace's in-container heartbeat with
|
||||
@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
|
||||
// here so the canvas can show "rotate is for external workspaces;
|
||||
// click Restart instead" rather than silently corrupting state.
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "rotate is only valid for runtime=external workspaces",
|
||||
"error": "rotate is only valid for external/BYO-compute workspaces",
|
||||
"runtime": runtime,
|
||||
"hint": "use POST /workspaces/:id/restart for non-external runtimes",
|
||||
"hint": "use POST /workspaces/:id/restart for container-backed runtimes",
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||
return
|
||||
}
|
||||
if runtime != "external" {
|
||||
if !isExternalLikeRuntime(runtime) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "connection payload is only valid for runtime=external workspaces",
|
||||
"error": "connection payload is only valid for external/BYO-compute workspaces",
|
||||
"runtime": runtime,
|
||||
})
|
||||
return
|
||||
|
||||
@ -82,6 +82,7 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) {
|
||||
"curl_register_template", "python_snippet",
|
||||
"claude_code_channel_snippet", "universal_mcp_snippet",
|
||||
"hermes_channel_snippet", "codex_snippet", "openclaw_snippet",
|
||||
"kimi_snippet",
|
||||
} {
|
||||
if _, ok := body.Connection[k]; !ok {
|
||||
t.Errorf("payload missing snippet field: %s", k)
|
||||
|
||||
@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return runtime == "external"
|
||||
return isExternalLikeRuntime(runtime)
|
||||
}
|
||||
|
||||
func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) {
|
||||
|
||||
@ -76,6 +76,34 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute,
|
||||
// same shape as external. Push-install via docker exec must be rejected.
|
||||
func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) {
|
||||
h := NewPluginsHandler(t.TempDir(), nil, nil).
|
||||
WithRuntimeLookup(func(workspaceID string) (string, error) {
|
||||
return "kimi-cli", nil
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
|
||||
c.Request = httptest.NewRequest(
|
||||
"POST",
|
||||
"/workspaces/ws-kimi/plugins",
|
||||
bytes.NewBufferString(`{"source":"local://my-plugin"}`),
|
||||
)
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Install(c)
|
||||
|
||||
if w.Code != http.StatusUnprocessableEntity {
|
||||
t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "external runtimes") {
|
||||
t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime
|
||||
// guard MUST NOT short-circuit container-backed runtimes. With
|
||||
// `runtime='claude-code'` the install proceeds past the guard; without a
|
||||
|
||||
@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
|
||||
if existing.Valid && existing.String != "" {
|
||||
return existing.String, nil
|
||||
}
|
||||
if runtime.Valid && runtime.String == "external" {
|
||||
if runtime.Valid && isExternalLikeRuntime(runtime.String) {
|
||||
return models.DeliveryModePoll, nil
|
||||
}
|
||||
return models.DeliveryModePush, nil
|
||||
|
||||
@ -1721,6 +1721,65 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime
|
||||
// poll-default test: a workspace whose existing row has runtime=kimi-cli
|
||||
// and empty delivery_mode must resolve to poll (laptop/NAT-safe default).
|
||||
func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-kimi-default-poll"
|
||||
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
|
||||
AddRow(sql.NullString{}, "kimi-cli"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)",
|
||||
resp["delivery_mode"], "poll")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_NonExternalRuntime_StillDefaultsToPush guards the
|
||||
// inverse: a non-external runtime (langgraph, hermes, etc.) with
|
||||
// empty delivery_mode keeps the historical push default. Catches
|
||||
|
||||
@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
// behavior agree, and surface a clear message instead of silently
|
||||
// no-op'ing — the canvas can show the operator that the fix is on
|
||||
// their side.
|
||||
if dbRuntime == "external" {
|
||||
if isExternalLikeRuntime(dbRuntime) {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "noop",
|
||||
"runtime": "external",
|
||||
"message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart",
|
||||
"runtime": dbRuntime,
|
||||
"message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart",
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
// Don't auto-restart external workspaces (no Docker container)
|
||||
// or mock workspaces (no container, every reply is canned —
|
||||
// see workspace-server/internal/handlers/mock_runtime.go).
|
||||
if dbRuntime == "external" || dbRuntime == "mock" {
|
||||
if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -179,6 +179,51 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery("SELECT status, name, tier, COALESCE").
|
||||
WithArgs("ws-kimi").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}).
|
||||
AddRow("offline", "Kimi Agent", 1, "kimi-cli"))
|
||||
|
||||
mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id =").
|
||||
WithArgs("ws-kimi").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"parent_id"}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil)
|
||||
|
||||
handler.Restart(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if got, _ := resp["status"].(string); got != "noop" {
|
||||
t.Errorf("expected status=noop, got %v", resp["status"])
|
||||
}
|
||||
if got, _ := resp["runtime"].(string); got != "kimi-cli" {
|
||||
t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"])
|
||||
}
|
||||
if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") {
|
||||
t.Errorf("expected message about operator-driven, got %v", resp["message"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartHandler_NilProvisionerReturns503(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@ -187,27 +187,19 @@ def enrich_peer_metadata_nonblocking(
|
||||
canon = _validate_peer_id(peer_id)
|
||||
if canon is None:
|
||||
return None
|
||||
|
||||
# Cache-first: return immediately on warm hit (same TTL logic as the
|
||||
# sync path). This is the hot-path optimisation — every push from a
|
||||
# warm peer must return the record without touching the in-flight set
|
||||
# or the executor. A background fetch that races to fill the cache
|
||||
# will find the entry already present when it calls
|
||||
# enrich_peer_metadata (which does its own fresh-TTL check), so it
|
||||
# exits as a no-op with no extra network traffic.
|
||||
# Cache hit (fresh): return without blocking on a registry GET.
|
||||
# This is the hot path for active peer conversations — avoids
|
||||
# spawning a background thread for every push from a known peer.
|
||||
current = time.monotonic()
|
||||
cached = _peer_metadata_get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
return record
|
||||
|
||||
# Cache miss or TTL expired: schedule background fetch unless one is
|
||||
# already in flight for this peer. The synchronous version atomically
|
||||
# reads-then-writes; the async version splits that into "schedule
|
||||
# fetch" + "fetch fills cache later." The in-flight set keeps a
|
||||
# flurry of pushes from one peer (e.g., a chatty agent) from
|
||||
# spawning N parallel GETs.
|
||||
# already in flight for this peer. The in-flight set keeps a flurry
|
||||
# of pushes from one peer (e.g., a chatty agent) from spawning N
|
||||
# parallel GETs.
|
||||
with _enrich_in_flight_lock:
|
||||
if canon in _enrich_in_flight:
|
||||
return None
|
||||
|
||||
@ -548,12 +548,7 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
# receive the error and stop polling.
|
||||
await updater.failed(
|
||||
message=new_text_message(
|
||||
# Pass the exception string as stderr so sanitize_agent_error
|
||||
# can include a ~1KB preview in the A2A error response.
|
||||
# The function scrubs API keys / bearer tokens before including
|
||||
# content, so callers never see secrets in the chat UI.
|
||||
# Fixes: roadmap item "SDK executor stderr swallowing".
|
||||
sanitize_agent_error(stderr=str(e)), task_id=task_id, context_id=context_id,
|
||||
sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id
|
||||
)
|
||||
)
|
||||
finally:
|
||||
|
||||
@ -163,15 +163,67 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
|
||||
# --- MCP Notification bridge ---
|
||||
|
||||
# `notifications/claude/channel` matches the contract used by the
|
||||
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
|
||||
# MCP runtime treats this method as a conversation interrupt — `content`
|
||||
# becomes the agent turn, `meta` is structured metadata. Notification-
|
||||
# capable hosts (Claude Code today; any compliant client tomorrow)
|
||||
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
|
||||
# still work unchanged. See task #46 + the deprecation path documented
|
||||
# in workspace/inbox.py:set_notification_callback.
|
||||
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
|
||||
# Runtime-adaptive notification method. Each MCP host uses a different
|
||||
# JSON-RPC notification method for inbound push. Detect at startup so
|
||||
# the inbox poller emits the right shape for the host that spawned us.
|
||||
#
|
||||
# Detection order (first match wins):
|
||||
# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel
|
||||
# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel
|
||||
# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel
|
||||
# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel
|
||||
# fallback → notifications/message
|
||||
#
|
||||
# The method is resolved once at startup and cached in
|
||||
# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching
|
||||
# _detect_runtime() or setting the env var before import.
|
||||
_DETECTED_RUNTIME: str | None = None
|
||||
|
||||
|
||||
def _detect_runtime() -> str:
|
||||
"""Detect which MCP host spawned this process."""
|
||||
global _DETECTED_RUNTIME
|
||||
if _DETECTED_RUNTIME is not None:
|
||||
return _DETECTED_RUNTIME
|
||||
|
||||
env = os.environ
|
||||
if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"):
|
||||
_DETECTED_RUNTIME = "claude"
|
||||
elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"):
|
||||
_DETECTED_RUNTIME = "openclaw"
|
||||
elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"):
|
||||
_DETECTED_RUNTIME = "cursor"
|
||||
elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"):
|
||||
_DETECTED_RUNTIME = "hermes"
|
||||
else:
|
||||
_DETECTED_RUNTIME = "generic"
|
||||
|
||||
logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}")
|
||||
return _DETECTED_RUNTIME
|
||||
|
||||
|
||||
def _notification_method_for_runtime(runtime: str) -> str:
|
||||
"""Return the JSON-RPC notification method for the given runtime."""
|
||||
return {
|
||||
"claude": "notifications/claude/channel",
|
||||
"openclaw": "notifications/openclaw/channel",
|
||||
"cursor": "notifications/cursor/channel",
|
||||
"hermes": "notifications/hermes/channel",
|
||||
"generic": "notifications/message",
|
||||
}.get(runtime, "notifications/message")
|
||||
|
||||
|
||||
# Lazily resolved so tests can patch _detect_runtime() before the first
|
||||
# notification is built. The value is read once per process lifetime.
|
||||
_CHANNEL_NOTIFICATION_METHOD: str | None = None
|
||||
|
||||
|
||||
def _channel_notification_method() -> str:
|
||||
"""Return the cached notification method for the detected runtime."""
|
||||
global _CHANNEL_NOTIFICATION_METHOD
|
||||
if _CHANNEL_NOTIFICATION_METHOD is None:
|
||||
_CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime())
|
||||
return _CHANNEL_NOTIFICATION_METHOD
|
||||
|
||||
|
||||
# ============= Trust-boundary gates for channel-notification meta ==============
|
||||
@ -569,7 +621,7 @@ def _build_channel_notification(msg: dict) -> dict:
|
||||
)
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"method": _CHANNEL_NOTIFICATION_METHOD,
|
||||
"method": _channel_notification_method(),
|
||||
"params": {
|
||||
"content": content,
|
||||
"meta": meta,
|
||||
@ -632,66 +684,69 @@ def _format_channel_content(
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
|
||||
def _assert_stdio_is_pipe_compatible(
|
||||
stdin_fd: int = 0, stdout_fd: int = 1
|
||||
) -> None:
|
||||
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
|
||||
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Warn when stdio isn't a pipe — but continue anyway.
|
||||
|
||||
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
|
||||
sockets, and character devices. When molecule-mcp is launched with
|
||||
stdout redirected to a regular file (CI smoke tests, ad-hoc local
|
||||
debugging that captures output), the asyncio call later raises
|
||||
``ValueError: Pipe transport is only for pipes, sockets and character
|
||||
devices`` from inside the event loop — surfaced to the operator as a
|
||||
confusing traceback. Detect early and exit cleanly with guidance
|
||||
instead. See molecule-ai-workspace-runtime#61.
|
||||
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
|
||||
rejected regular files, PTYs, and sockets with:
|
||||
ValueError: Pipe transport is only for pipes, sockets and
|
||||
character devices
|
||||
We now use direct buffer I/O which works with ANY file descriptor,
|
||||
so this is a diagnostic-only warning for operators debugging setup
|
||||
issues. See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
|
||||
try:
|
||||
mode = os.fstat(fd).st_mode
|
||||
except OSError as exc:
|
||||
print(
|
||||
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
|
||||
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
|
||||
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
|
||||
f" from a terminal or with stdio closed.",
|
||||
file=sys.stderr,
|
||||
except OSError:
|
||||
continue
|
||||
if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
|
||||
logger.warning(
|
||||
f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. "
|
||||
f"This is fine — the universal stdio transport handles regular files, "
|
||||
f"PTYs, and sockets. If you see garbled output, launch from an "
|
||||
f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)."
|
||||
)
|
||||
sys.exit(2)
|
||||
if not (
|
||||
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
|
||||
):
|
||||
print(
|
||||
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
|
||||
f" socket, or character device — asyncio's stdio transport rejects\n"
|
||||
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
|
||||
f" and character devices`. Common causes:\n"
|
||||
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
|
||||
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
|
||||
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
|
||||
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
|
||||
f" `tee`/process substitution if you need to capture output:\n"
|
||||
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
|
||||
reader = asyncio.StreamReader()
|
||||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
|
||||
|
||||
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin, sys.stdout
|
||||
)
|
||||
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
|
||||
Uses sys.stdin.buffer / sys.stdout.buffer directly instead of
|
||||
asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe
|
||||
transport rejects regular files, PTYs, and sockets with:
|
||||
ValueError: Pipe transport is only for pipes, sockets and
|
||||
character devices
|
||||
This breaks when the MCP host captures stdout (openclaw, CI tests,
|
||||
ad-hoc debugging with tee). Reading/writing the buffer directly
|
||||
works with ANY file descriptor.
|
||||
|
||||
See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
# sys.stdin.buffer exists on text-mode streams (default); on binary
|
||||
# streams (tests, some CI setups) stdin IS the buffer.
|
||||
stdin = getattr(sys.stdin, "buffer", sys.stdin)
|
||||
stdout = getattr(sys.stdout, "buffer", sys.stdout)
|
||||
|
||||
async def write_response(response: dict):
|
||||
data = json.dumps(response) + "\n"
|
||||
writer.write(data.encode())
|
||||
await writer.drain()
|
||||
stdout.write(data.encode())
|
||||
stdout.flush()
|
||||
|
||||
# Build a StreamWriter-compatible wrapper for the inbox bridge.
|
||||
# The bridge expects a writer with .write() and .drain() methods.
|
||||
class _StdoutWriter:
|
||||
def __init__(self, buf):
|
||||
self._buf = buf
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
self._buf.write(data)
|
||||
|
||||
async def drain(self) -> None:
|
||||
self._buf.flush()
|
||||
|
||||
writer = _StdoutWriter(stdout)
|
||||
|
||||
# Wire the inbox → MCP notification bridge. The bridge body lives
|
||||
# in `_setup_inbox_bridge` so the threading + asyncio + stdout
|
||||
@ -701,22 +756,27 @@ async def main(): # pragma: no cover
|
||||
_setup_inbox_bridge(writer, asyncio.get_running_loop())
|
||||
)
|
||||
|
||||
buffer = ""
|
||||
# Log runtime detection for operator diagnostics
|
||||
runtime = _detect_runtime()
|
||||
logger.info(f"MCP stdio transport ready (runtime={runtime}, "
|
||||
f"notification_method={_channel_notification_method()})")
|
||||
|
||||
buffer = b""
|
||||
while True:
|
||||
try:
|
||||
chunk = await reader.read(65536)
|
||||
chunk = await loop.run_in_executor(None, stdin.read, 65536)
|
||||
if not chunk:
|
||||
break
|
||||
buffer += chunk.decode(errors="replace")
|
||||
buffer += chunk
|
||||
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
while b"\n" in buffer:
|
||||
line, buffer = buffer.split(b"\n", 1)
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
request = json.loads(line)
|
||||
request = json.loads(line.decode(errors="replace"))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
@ -780,7 +840,7 @@ def cli_main() -> None: # pragma: no cover
|
||||
break every external-runtime operator's MCP install — the 0.1.16
|
||||
``main_sync`` rename incident is the cautionary precedent.
|
||||
"""
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
_warn_if_stdio_not_pipe()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
@ -165,7 +165,10 @@ async def test_agent_error_handling():
|
||||
|
||||
eq.enqueue_event.assert_called_once()
|
||||
error_msg = str(eq.enqueue_event.call_args[0][0])
|
||||
assert "model crashed" in error_msg
|
||||
# sanitize_agent_error strips the raw exception message from the UI;
|
||||
# raw detail goes to workspace logs only. This is the secure behaviour.
|
||||
assert "Agent error (RuntimeError)" in error_msg
|
||||
assert "model crashed" not in error_msg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -1200,7 +1203,10 @@ async def test_terminal_error_routes_via_updater_failed():
|
||||
"terminal error Message must route via updater.failed() in task mode"
|
||||
)
|
||||
err_msg = eq._failed_calls[-1]
|
||||
assert "model crashed" in str(err_msg)
|
||||
# sanitize_agent_error strips the raw exception message from the UI;
|
||||
# raw detail goes to workspace logs only.
|
||||
assert "Agent error (RuntimeError)" in str(err_msg)
|
||||
assert "model crashed" not in str(err_msg)
|
||||
# And complete() must NOT have been called on the failure path.
|
||||
assert not eq._complete_calls, (
|
||||
"complete() should not fire when execute() raises"
|
||||
|
||||
@ -252,23 +252,30 @@ def test_attachments_param_description_emphasizes_REQUIRED():
|
||||
|
||||
|
||||
def test_build_channel_notification_method_matches_claude_contract():
|
||||
"""Method MUST be `notifications/claude/channel` exactly — that's
|
||||
what Claude Code's MCP runtime listens for as a conversation
|
||||
"""Method MUST be `notifications/claude/channel` when runtime=claude —
|
||||
that's what Claude Code's MCP runtime listens for as a conversation
|
||||
interrupt. Same string as the bun channel bridge sends
|
||||
(server.ts:509) so this is a drop-in replacement."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
|
||||
# Reset the cached method so _channel_notification_method() re-resolves
|
||||
import a2a_mcp_server as _mcp
|
||||
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = None
|
||||
try:
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
finally:
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
|
||||
|
||||
|
||||
def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint():
|
||||
@ -1618,80 +1625,91 @@ async def test_inbox_bridge_emits_channel_notification_to_writer():
|
||||
import os
|
||||
import threading
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from a2a_mcp_server import _setup_inbox_bridge
|
||||
|
||||
# Real asyncio writer backed by an os.pipe — same shape as
|
||||
# main() but isolated so we can read what was written.
|
||||
read_fd, write_fd = os.pipe()
|
||||
loop = asyncio.get_running_loop()
|
||||
transport, protocol = await loop.connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin,
|
||||
os.fdopen(write_fd, "wb"),
|
||||
)
|
||||
writer = asyncio.StreamWriter(transport, protocol, None, loop)
|
||||
|
||||
try:
|
||||
cb = _setup_inbox_bridge(writer, loop)
|
||||
|
||||
msg = {
|
||||
# Production-shape UUID per the trust-boundary gate (#2488)
|
||||
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
|
||||
"text": "hello from peer",
|
||||
"peer_id": "11111111-2222-3333-4444-555555555555",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T22:00:00Z",
|
||||
}
|
||||
|
||||
# Simulate the inbox poller daemon thread invoking the
|
||||
# callback from a non-asyncio context — exactly the
|
||||
# threading boundary the bridge has to cross.
|
||||
threading.Thread(target=cb, args=(msg,), daemon=True).start()
|
||||
|
||||
# Give the scheduled coroutine a chance to run + drain
|
||||
# without coupling the test to wall-clock timing.
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(0.05)
|
||||
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
|
||||
if data:
|
||||
break
|
||||
else:
|
||||
data = b""
|
||||
|
||||
assert data, (
|
||||
"no notification on stdout pipe — the bridge fired "
|
||||
"but the write didn't reach the writer (writer.drain "
|
||||
"swallowing or scheduling race)"
|
||||
)
|
||||
line = data.decode().strip()
|
||||
payload = json.loads(line)
|
||||
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
# Content is wrapped with the identity header + reply hint —
|
||||
# see _format_channel_content. The bridge test pins the full
|
||||
# composition so a regression to "raw text only" surfaces here
|
||||
# as well as in the per-formatter tests above.
|
||||
assert payload["params"]["content"] == (
|
||||
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
|
||||
"hello from peer\n"
|
||||
'↩ Reply: delegate_task({workspace_id: '
|
||||
'"11111111-2222-3333-4444-555555555555", task: "..."})'
|
||||
)
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
|
||||
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
|
||||
assert meta["ts"] == "2026-05-01T22:00:00Z"
|
||||
finally:
|
||||
writer.close()
|
||||
# Force claude runtime so the notification method is predictable
|
||||
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
|
||||
import a2a_mcp_server as _mcp
|
||||
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = None
|
||||
_mcp._channel_notification_method() # prime cache
|
||||
try:
|
||||
os.close(read_fd)
|
||||
except OSError:
|
||||
# read_fd may already be closed if writer.close() tore down the pair
|
||||
# during teardown — best-effort cleanup, no signal worth surfacing.
|
||||
pass
|
||||
# Real asyncio writer backed by an os.pipe — same shape as
|
||||
# main() but isolated so we can read what was written.
|
||||
read_fd, write_fd = os.pipe()
|
||||
loop = asyncio.get_running_loop()
|
||||
transport, protocol = await loop.connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin,
|
||||
os.fdopen(write_fd, "wb"),
|
||||
)
|
||||
writer = asyncio.StreamWriter(transport, protocol, None, loop)
|
||||
|
||||
try:
|
||||
cb = _setup_inbox_bridge(writer, loop)
|
||||
|
||||
msg = {
|
||||
# Production-shape UUID per the trust-boundary gate (#2488)
|
||||
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
|
||||
"text": "hello from peer",
|
||||
"peer_id": "11111111-2222-3333-4444-555555555555",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T22:00:00Z",
|
||||
}
|
||||
|
||||
# Simulate the inbox poller daemon thread invoking the
|
||||
# callback from a non-asyncio context — exactly the
|
||||
# threading boundary the bridge has to cross.
|
||||
threading.Thread(target=cb, args=(msg,), daemon=True).start()
|
||||
|
||||
# Give the scheduled coroutine a chance to run + drain
|
||||
# without coupling the test to wall-clock timing.
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(0.05)
|
||||
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
|
||||
if data:
|
||||
break
|
||||
else:
|
||||
data = b""
|
||||
|
||||
assert data, (
|
||||
"no notification on stdout pipe — the bridge fired "
|
||||
"but the write didn't reach the writer (writer.drain "
|
||||
"swallowing or scheduling race)"
|
||||
)
|
||||
line = data.decode().strip()
|
||||
payload = json.loads(line)
|
||||
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
# Content is wrapped with the identity header + reply hint —
|
||||
# see _format_channel_content. The bridge test pins the full
|
||||
# composition so a regression to "raw text only" surfaces here
|
||||
# as well as in the per-formatter tests above.
|
||||
assert payload["params"]["content"] == (
|
||||
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
|
||||
"hello from peer\n"
|
||||
'↩ Reply: delegate_task({workspace_id: '
|
||||
'"11111111-2222-3333-4444-555555555555", task: "..."})'
|
||||
)
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
|
||||
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
|
||||
assert meta["ts"] == "2026-05-01T22:00:00Z"
|
||||
finally:
|
||||
writer.close()
|
||||
try:
|
||||
os.close(read_fd)
|
||||
except OSError:
|
||||
# read_fd may already be closed if writer.close() tore down the pair
|
||||
# during teardown — best-effort cleanup, no signal worth surfacing.
|
||||
pass
|
||||
finally:
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
|
||||
|
||||
|
||||
async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch):
|
||||
@ -1808,99 +1826,75 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
|
||||
|
||||
|
||||
class TestStdioPipeAssertion:
|
||||
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
|
||||
that turns asyncio's `ValueError: Pipe transport is only for pipes,
|
||||
sockets and character devices` into a clear operator message + exit 2.
|
||||
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
|
||||
the old fatal _assert_stdio_is_pipe_compatible guard.
|
||||
|
||||
The universal stdio transport now works with ANY file descriptor
|
||||
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
|
||||
is gone. These tests verify the warning is emitted for non-pipe
|
||||
stdio so operators still get diagnostic signal when debugging.
|
||||
See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
|
||||
def test_pipe_pair_passes_silently(self):
|
||||
"""Happy path — both fds are pipes (the production launch shape
|
||||
from any MCP client). Should return None without printing or
|
||||
exiting."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
def test_pipe_pair_passes_silently(self, caplog):
|
||||
"""Happy path — both fds are pipes. No warning emitted."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
# No exit, no stderr noise. We don't capture stderr here
|
||||
# because pipe path should produce zero output.
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
|
||||
def test_regular_file_stdout_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
def test_regular_file_stdout_warns(self, tmp_path, caplog):
|
||||
"""Reproducer for runtime#61: stdout redirected to a regular file.
|
||||
Pre-fix this would surface upstream as
|
||||
`ValueError: Pipe transport is only for pipes...`. Post-fix we
|
||||
exit with code 2 and a stderr message that names the symptom +
|
||||
fix."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
Now emits a warning instead of exiting."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
# stdin = pipe (so we isolate the stdout failure path);
|
||||
# stdout = regular file (the bug condition).
|
||||
r, _w = os.pipe()
|
||||
regular = tmp_path / "captured.log"
|
||||
f = open(regular, "wb")
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=f.fileno()
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
# Names the failing stream + the asyncio constraint that
|
||||
# would otherwise crash. Don't pin the exact wording — the
|
||||
# asserts pin the operator-recoverable signal only.
|
||||
assert "stdout" in err
|
||||
assert "regular file" in err
|
||||
assert "pipe" in err
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
|
||||
assert "stdout" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
f.close()
|
||||
os.close(r)
|
||||
|
||||
def test_regular_file_stdin_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
"""Symmetric case — stdin redirected from a regular file. Same
|
||||
asyncio constraint applies via connect_read_pipe."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
def test_regular_file_stdin_warns(self, tmp_path, caplog):
|
||||
"""Symmetric case — stdin redirected from a regular file."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
regular = tmp_path / "input.json"
|
||||
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
|
||||
f = open(regular, "rb")
|
||||
_r, w = os.pipe()
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=f.fileno(), stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "stdin" in err
|
||||
assert "regular file" in err
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
assert "stdin" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
f.close()
|
||||
os.close(w)
|
||||
|
||||
def test_closed_fd_exits_with_stat_error(self, capsys):
|
||||
"""If stdio is closed (rare but seen in detached daemonized
|
||||
contexts), os.fstat raises OSError. We catch it and exit 2 with
|
||||
a guidance message instead of letting the traceback escape."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
def test_closed_fd_warns_about_stat_error(self, caplog):
|
||||
"""If stdio is closed, os.fstat raises OSError. Warning is
|
||||
skipped silently (can't stat the fd)."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
|
||||
r, w = os.pipe()
|
||||
os.close(w) # Now `w` is a stale fd — fstat will fail.
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "cannot stat stdout" in err
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
# No warning emitted because fstat failed before the check
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
os.close(r)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user