diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 0bf2d095..f15a2777 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -16,6 +16,7 @@ import asyncio import json import logging import os +import stat import sys from typing import Callable @@ -444,6 +445,52 @@ def _build_channel_notification(msg: dict) -> dict: # --- MCP Server (JSON-RPC over stdio) --- + +def _assert_stdio_is_pipe_compatible( + stdin_fd: int = 0, stdout_fd: int = 1 +) -> None: + """Fail fast with a friendly message when stdio isn't pipe-compatible. + + asyncio.connect_read_pipe / connect_write_pipe accept only pipes, + sockets, and character devices. When molecule-mcp is launched with + stdout redirected to a regular file (CI smoke tests, ad-hoc local + debugging that captures output), the asyncio call later raises + ``ValueError: Pipe transport is only for pipes, sockets and character + devices`` from inside the event loop — surfaced to the operator as a + confusing traceback. Detect early and exit cleanly with guidance + instead. See molecule-ai-workspace-runtime#61. + """ + for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)): + try: + mode = os.fstat(fd).st_mode + except OSError as exc: + print( + f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n" + f" This MCP server expects bidirectional pipe stdio. Launch it from\n" + f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n" + f" from a terminal or with stdio closed.", + file=sys.stderr, + ) + sys.exit(2) + if not ( + stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode) + ): + print( + f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n" + f" socket, or character device — asyncio's stdio transport rejects\n" + f" it with `ValueError: Pipe transport is only for pipes, sockets\n" + f" and character devices`. Common causes:\n" + f" molecule-mcp > out.txt # stdout → regular file (fails)\n" + f" molecule-mcp < input.json # stdin → regular file (fails)\n" + f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n" + f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n" + f" `tee`/process substitution if you need to capture output:\n" + f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe", + file=sys.stderr, + ) + sys.exit(2) + + async def main(): # pragma: no cover """Run MCP server on stdio — reads JSON-RPC requests, writes responses.""" reader = asyncio.StreamReader() @@ -547,6 +594,7 @@ def cli_main() -> None: # pragma: no cover break every external-runtime operator's MCP install — the 0.1.16 ``main_sync`` rename incident is the cautionary precedent. """ + _assert_stdio_is_pipe_compatible() asyncio.run(main()) diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 85b14dd1..6d3799fc 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -2,6 +2,7 @@ import asyncio import json +import os from unittest.mock import AsyncMock, MagicMock, patch @@ -1071,6 +1072,104 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error(): }) +class TestStdioPipeAssertion: + """Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard + that turns asyncio's `ValueError: Pipe transport is only for pipes, + sockets and character devices` into a clear operator message + exit 2. + See molecule-ai-workspace-runtime#61. + """ + + def test_pipe_pair_passes_silently(self): + """Happy path — both fds are pipes (the production launch shape + from any MCP client). Should return None without printing or + exiting.""" + from a2a_mcp_server import _assert_stdio_is_pipe_compatible + + r, w = os.pipe() + try: + # No exit, no stderr noise. We don't capture stderr here + # because pipe path should produce zero output. + _assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w) + finally: + os.close(r) + os.close(w) + + def test_regular_file_stdout_exits_with_friendly_message( + self, tmp_path, capsys + ): + """Reproducer for runtime#61: stdout redirected to a regular file. + Pre-fix this would surface upstream as + `ValueError: Pipe transport is only for pipes...`. Post-fix we + exit with code 2 and a stderr message that names the symptom + + fix.""" + from a2a_mcp_server import _assert_stdio_is_pipe_compatible + + # stdin = pipe (so we isolate the stdout failure path); + # stdout = regular file (the bug condition). + r, _w = os.pipe() + regular = tmp_path / "captured.log" + f = open(regular, "wb") + try: + with pytest.raises(SystemExit) as excinfo: + _assert_stdio_is_pipe_compatible( + stdin_fd=r, stdout_fd=f.fileno() + ) + assert excinfo.value.code == 2 + err = capsys.readouterr().err + # Names the failing stream + the asyncio constraint that + # would otherwise crash. Don't pin the exact wording — the + # asserts pin the operator-recoverable signal only. + assert "stdout" in err + assert "regular file" in err + assert "pipe" in err + finally: + f.close() + os.close(r) + + def test_regular_file_stdin_exits_with_friendly_message( + self, tmp_path, capsys + ): + """Symmetric case — stdin redirected from a regular file. Same + asyncio constraint applies via connect_read_pipe.""" + from a2a_mcp_server import _assert_stdio_is_pipe_compatible + + regular = tmp_path / "input.json" + regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n') + f = open(regular, "rb") + _r, w = os.pipe() + try: + with pytest.raises(SystemExit) as excinfo: + _assert_stdio_is_pipe_compatible( + stdin_fd=f.fileno(), stdout_fd=w + ) + assert excinfo.value.code == 2 + err = capsys.readouterr().err + assert "stdin" in err + assert "regular file" in err + finally: + f.close() + os.close(w) + + def test_closed_fd_exits_with_stat_error(self, capsys): + """If stdio is closed (rare but seen in detached daemonized + contexts), os.fstat raises OSError. We catch it and exit 2 with + a guidance message instead of letting the traceback escape.""" + from a2a_mcp_server import _assert_stdio_is_pipe_compatible + + r, w = os.pipe() + os.close(w) # Now `w` is a stale fd — fstat will fail. + try: + with pytest.raises(SystemExit) as excinfo: + _assert_stdio_is_pipe_compatible( + stdin_fd=r, stdout_fd=w + ) + assert excinfo.value.code == 2 + err = capsys.readouterr().err + assert "cannot stat stdout" in err + finally: + os.close(r) + + def _readable(fd: int) -> bool: """True iff ``fd`` has bytes available without blocking. Lets us poll the pipe in a loop without the test hanging when the