fix(mcp): universal stdio transport + runtime-adaptive notifications
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 18s
Check migration collisions / Migration version collision check (pull_request) Successful in 33s
CI / Detect changes (pull_request) Successful in 35s
E2E API Smoke Test / detect-changes (pull_request) Successful in 47s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 56s
Harness Replays / detect-changes (pull_request) Successful in 19s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 12s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 30s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 22s
Runtime Pin Compatibility / PyPI-latest install + import smoke (pull_request) Successful in 1m57s
Harness Replays / Harness Replays (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 1m29s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m18s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Failing after 5m36s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m52s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3m35s
CI / Platform (Go) (pull_request) Failing after 7m54s
CI / Python Lint & Test (pull_request) Failing after 7m25s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8m5s
CI / Canvas (Next.js) (pull_request) Failing after 9m3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped

Root fix for molecule-ai-workspace-runtime#61:
- Replace asyncio.connect_read_pipe/connect_write_pipe with direct
  sys.stdin.buffer/sys.stdout.buffer I/O. The asyncio pipe transport
  rejects regular files, PTYs, and sockets — breaking openclaw, CI
  tests, and tee-captured debugging. Direct buffer I/O works with
  ANY file descriptor.
- Replace fatal _assert_stdio_is_pipe_compatible() with non-fatal
  _warn_if_stdio_not_pipe() — operators get diagnostic signal without
  the hard exit.

Runtime detection for adaptive push notifications:
- Detect MCP host from env vars: CLAUDE_CODE, OPENCLAW_SESSION_ID,
  CURSOR_MCP, HERMES_RUNTIME
- Emit the correct JSON-RPC notification method per host:
  notifications/claude/channel, notifications/openclaw/channel, etc.
- Unifies the molecule-mcp-claude-channel plugin behavior into the
  universal MCP server — one implementation for all runtimes.

Tests:
- Update TestStdioPipeAssertion for warning-based behavior
- Patch runtime detection in channel-notification tests
- 80 passed, 5 pre-existing failures (enrichment cache unrelated)
This commit is contained in:
hongming-kimi-laptop (Molecule AI agent) 2026-05-12 19:55:45 -07:00
parent 97dba0a95f
commit e1aac92539
2 changed files with 261 additions and 207 deletions

View File

@ -163,15 +163,67 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
# --- MCP Notification bridge ---
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
# Runtime-adaptive notification method. Each MCP host uses a different
# JSON-RPC notification method for inbound push. Detect at startup so
# the inbox poller emits the right shape for the host that spawned us.
#
# Detection order (first match wins):
# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel
# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel
# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel
# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel
# fallback → notifications/message
#
# The method is resolved once at startup and cached in
# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching
# _detect_runtime() or setting the env var before import.
_DETECTED_RUNTIME: str | None = None
def _detect_runtime() -> str:
"""Detect which MCP host spawned this process."""
global _DETECTED_RUNTIME
if _DETECTED_RUNTIME is not None:
return _DETECTED_RUNTIME
env = os.environ
if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"):
_DETECTED_RUNTIME = "claude"
elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"):
_DETECTED_RUNTIME = "openclaw"
elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"):
_DETECTED_RUNTIME = "cursor"
elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"):
_DETECTED_RUNTIME = "hermes"
else:
_DETECTED_RUNTIME = "generic"
logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}")
return _DETECTED_RUNTIME
def _notification_method_for_runtime(runtime: str) -> str:
"""Return the JSON-RPC notification method for the given runtime."""
return {
"claude": "notifications/claude/channel",
"openclaw": "notifications/openclaw/channel",
"cursor": "notifications/cursor/channel",
"hermes": "notifications/hermes/channel",
"generic": "notifications/message",
}.get(runtime, "notifications/message")
# Lazily resolved so tests can patch _detect_runtime() before the first
# notification is built. The value is read once per process lifetime.
_CHANNEL_NOTIFICATION_METHOD: str | None = None
def _channel_notification_method() -> str:
"""Return the cached notification method for the detected runtime."""
global _CHANNEL_NOTIFICATION_METHOD
if _CHANNEL_NOTIFICATION_METHOD is None:
_CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime())
return _CHANNEL_NOTIFICATION_METHOD
# ============= Trust-boundary gates for channel-notification meta ==============
@ -569,7 +621,7 @@ def _build_channel_notification(msg: dict) -> dict:
)
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"method": _channel_notification_method(),
"params": {
"content": content,
"meta": meta,
@ -632,66 +684,69 @@ def _format_channel_content(
# --- MCP Server (JSON-RPC over stdio) ---
def _assert_stdio_is_pipe_compatible(
stdin_fd: int = 0, stdout_fd: int = 1
) -> None:
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Warn when stdio isn't a pipe — but continue anyway.
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
sockets, and character devices. When molecule-mcp is launched with
stdout redirected to a regular file (CI smoke tests, ad-hoc local
debugging that captures output), the asyncio call later raises
``ValueError: Pipe transport is only for pipes, sockets and character
devices`` from inside the event loop surfaced to the operator as a
confusing traceback. Detect early and exit cleanly with guidance
instead. See molecule-ai-workspace-runtime#61.
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
rejected regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
We now use direct buffer I/O which works with ANY file descriptor,
so this is a diagnostic-only warning for operators debugging setup
issues. See molecule-ai-workspace-runtime#61.
"""
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
try:
mode = os.fstat(fd).st_mode
except OSError as exc:
print(
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
f" from a terminal or with stdio closed.",
file=sys.stderr,
except OSError:
continue
if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
logger.warning(
f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. "
f"This is fine — the universal stdio transport handles regular files, "
f"PTYs, and sockets. If you see garbled output, launch from an "
f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)."
)
sys.exit(2)
if not (
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
):
print(
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
f" socket, or character device — asyncio's stdio transport rejects\n"
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
f" and character devices`. Common causes:\n"
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
f" `tee`/process substitution if you need to capture output:\n"
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
file=sys.stderr,
)
sys.exit(2)
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
Uses sys.stdin.buffer / sys.stdout.buffer directly instead of
asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe
transport rejects regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
This breaks when the MCP host captures stdout (openclaw, CI tests,
ad-hoc debugging with tee). Reading/writing the buffer directly
works with ANY file descriptor.
See molecule-ai-workspace-runtime#61.
"""
loop = asyncio.get_event_loop()
# sys.stdin.buffer exists on text-mode streams (default); on binary
# streams (tests, some CI setups) stdin IS the buffer.
stdin = getattr(sys.stdin, "buffer", sys.stdin)
stdout = getattr(sys.stdout, "buffer", sys.stdout)
async def write_response(response: dict):
data = json.dumps(response) + "\n"
writer.write(data.encode())
await writer.drain()
stdout.write(data.encode())
stdout.flush()
# Build a StreamWriter-compatible wrapper for the inbox bridge.
# The bridge expects a writer with .write() and .drain() methods.
class _StdoutWriter:
def __init__(self, buf):
self._buf = buf
def write(self, data: bytes) -> None:
self._buf.write(data)
async def drain(self) -> None:
self._buf.flush()
writer = _StdoutWriter(stdout)
# Wire the inbox → MCP notification bridge. The bridge body lives
# in `_setup_inbox_bridge` so the threading + asyncio + stdout
@ -701,22 +756,27 @@ async def main(): # pragma: no cover
_setup_inbox_bridge(writer, asyncio.get_running_loop())
)
buffer = ""
# Log runtime detection for operator diagnostics
runtime = _detect_runtime()
logger.info(f"MCP stdio transport ready (runtime={runtime}, "
f"notification_method={_channel_notification_method()})")
buffer = b""
while True:
try:
chunk = await reader.read(65536)
chunk = await loop.run_in_executor(None, stdin.read, 65536)
if not chunk:
break
buffer += chunk.decode(errors="replace")
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
request = json.loads(line.decode(errors="replace"))
except json.JSONDecodeError:
continue
@ -780,7 +840,7 @@ def cli_main() -> None: # pragma: no cover
break every external-runtime operator's MCP install — the 0.1.16
``main_sync`` rename incident is the cautionary precedent.
"""
_assert_stdio_is_pipe_compatible()
_warn_if_stdio_not_pipe()
asyncio.run(main())

View File

@ -252,23 +252,30 @@ def test_attachments_param_description_emphasizes_REQUIRED():
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
"""Method MUST be `notifications/claude/channel` when runtime=claude —
that's what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
# Reset the cached method so _channel_notification_method() re-resolves
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
try:
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint():
@ -1618,80 +1625,91 @@ async def test_inbox_bridge_emits_channel_notification_to_writer():
import os
import threading
from unittest.mock import patch
from a2a_mcp_server import _setup_inbox_bridge
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
# Force claude runtime so the notification method is predictable
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
_mcp._channel_notification_method() # prime cache
try:
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
try:
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch):
@ -1808,99 +1826,75 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
class TestStdioPipeAssertion:
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
that turns asyncio's `ValueError: Pipe transport is only for pipes,
sockets and character devices` into a clear operator message + exit 2.
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
the old fatal _assert_stdio_is_pipe_compatible guard.
The universal stdio transport now works with ANY file descriptor
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
is gone. These tests verify the warning is emitted for non-pipe
stdio so operators still get diagnostic signal when debugging.
See molecule-ai-workspace-runtime#61.
"""
def test_pipe_pair_passes_silently(self):
"""Happy path — both fds are pipes (the production launch shape
from any MCP client). Should return None without printing or
exiting."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_pipe_pair_passes_silently(self, caplog):
"""Happy path — both fds are pipes. No warning emitted."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
try:
# No exit, no stderr noise. We don't capture stderr here
# because pipe path should produce zero output.
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
assert "not a pipe" not in caplog.text
finally:
os.close(r)
os.close(w)
def test_regular_file_stdout_exits_with_friendly_message(
self, tmp_path, capsys
):
def test_regular_file_stdout_warns(self, tmp_path, caplog):
"""Reproducer for runtime#61: stdout redirected to a regular file.
Pre-fix this would surface upstream as
`ValueError: Pipe transport is only for pipes...`. Post-fix we
exit with code 2 and a stderr message that names the symptom +
fix."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
Now emits a warning instead of exiting."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
# stdin = pipe (so we isolate the stdout failure path);
# stdout = regular file (the bug condition).
r, _w = os.pipe()
regular = tmp_path / "captured.log"
f = open(regular, "wb")
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=f.fileno()
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
# Names the failing stream + the asyncio constraint that
# would otherwise crash. Don't pin the exact wording — the
# asserts pin the operator-recoverable signal only.
assert "stdout" in err
assert "regular file" in err
assert "pipe" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
assert "stdout" in caplog.text
assert "not a pipe" in caplog.text
finally:
f.close()
os.close(r)
def test_regular_file_stdin_exits_with_friendly_message(
self, tmp_path, capsys
):
"""Symmetric case — stdin redirected from a regular file. Same
asyncio constraint applies via connect_read_pipe."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_regular_file_stdin_warns(self, tmp_path, caplog):
"""Symmetric case — stdin redirected from a regular file."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
regular = tmp_path / "input.json"
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
f = open(regular, "rb")
_r, w = os.pipe()
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=f.fileno(), stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "stdin" in err
assert "regular file" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
assert "stdin" in caplog.text
assert "not a pipe" in caplog.text
finally:
f.close()
os.close(w)
def test_closed_fd_exits_with_stat_error(self, capsys):
"""If stdio is closed (rare but seen in detached daemonized
contexts), os.fstat raises OSError. We catch it and exit 2 with
a guidance message instead of letting the traceback escape."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
def test_closed_fd_warns_about_stat_error(self, caplog):
"""If stdio is closed, os.fstat raises OSError. Warning is
skipped silently (can't stat the fd)."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
os.close(w) # Now `w` is a stale fd — fstat will fail.
try:
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "cannot stat stdout" in err
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
# No warning emitted because fstat failed before the check
assert "not a pipe" not in caplog.text
finally:
os.close(r)