Merge pull request #2479 from Molecule-AI/fix/molecule-mcp-non-pipe-stdout
fix(mcp): friendly fail-fast when stdio isn't pipe-compatible
This commit is contained in:
commit
6f0e914521
@ -16,6 +16,7 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
from typing import Callable
|
||||
|
||||
@ -444,6 +445,52 @@ def _build_channel_notification(msg: dict) -> dict:
|
||||
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
|
||||
def _assert_stdio_is_pipe_compatible(
|
||||
stdin_fd: int = 0, stdout_fd: int = 1
|
||||
) -> None:
|
||||
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
|
||||
|
||||
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
|
||||
sockets, and character devices. When molecule-mcp is launched with
|
||||
stdout redirected to a regular file (CI smoke tests, ad-hoc local
|
||||
debugging that captures output), the asyncio call later raises
|
||||
``ValueError: Pipe transport is only for pipes, sockets and character
|
||||
devices`` from inside the event loop — surfaced to the operator as a
|
||||
confusing traceback. Detect early and exit cleanly with guidance
|
||||
instead. See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
|
||||
try:
|
||||
mode = os.fstat(fd).st_mode
|
||||
except OSError as exc:
|
||||
print(
|
||||
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
|
||||
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
|
||||
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
|
||||
f" from a terminal or with stdio closed.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
if not (
|
||||
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
|
||||
):
|
||||
print(
|
||||
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
|
||||
f" socket, or character device — asyncio's stdio transport rejects\n"
|
||||
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
|
||||
f" and character devices`. Common causes:\n"
|
||||
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
|
||||
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
|
||||
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
|
||||
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
|
||||
f" `tee`/process substitution if you need to capture output:\n"
|
||||
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
|
||||
reader = asyncio.StreamReader()
|
||||
@ -547,6 +594,7 @@ def cli_main() -> None: # pragma: no cover
|
||||
break every external-runtime operator's MCP install — the 0.1.16
|
||||
``main_sync`` rename incident is the cautionary precedent.
|
||||
"""
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
@ -1071,6 +1072,104 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
|
||||
})
|
||||
|
||||
|
||||
class TestStdioPipeAssertion:
|
||||
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
|
||||
that turns asyncio's `ValueError: Pipe transport is only for pipes,
|
||||
sockets and character devices` into a clear operator message + exit 2.
|
||||
See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
|
||||
def test_pipe_pair_passes_silently(self):
|
||||
"""Happy path — both fds are pipes (the production launch shape
|
||||
from any MCP client). Should return None without printing or
|
||||
exiting."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
# No exit, no stderr noise. We don't capture stderr here
|
||||
# because pipe path should produce zero output.
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
finally:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
|
||||
def test_regular_file_stdout_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
"""Reproducer for runtime#61: stdout redirected to a regular file.
|
||||
Pre-fix this would surface upstream as
|
||||
`ValueError: Pipe transport is only for pipes...`. Post-fix we
|
||||
exit with code 2 and a stderr message that names the symptom +
|
||||
fix."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
# stdin = pipe (so we isolate the stdout failure path);
|
||||
# stdout = regular file (the bug condition).
|
||||
r, _w = os.pipe()
|
||||
regular = tmp_path / "captured.log"
|
||||
f = open(regular, "wb")
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=f.fileno()
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
# Names the failing stream + the asyncio constraint that
|
||||
# would otherwise crash. Don't pin the exact wording — the
|
||||
# asserts pin the operator-recoverable signal only.
|
||||
assert "stdout" in err
|
||||
assert "regular file" in err
|
||||
assert "pipe" in err
|
||||
finally:
|
||||
f.close()
|
||||
os.close(r)
|
||||
|
||||
def test_regular_file_stdin_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
"""Symmetric case — stdin redirected from a regular file. Same
|
||||
asyncio constraint applies via connect_read_pipe."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
regular = tmp_path / "input.json"
|
||||
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
|
||||
f = open(regular, "rb")
|
||||
_r, w = os.pipe()
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=f.fileno(), stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "stdin" in err
|
||||
assert "regular file" in err
|
||||
finally:
|
||||
f.close()
|
||||
os.close(w)
|
||||
|
||||
def test_closed_fd_exits_with_stat_error(self, capsys):
|
||||
"""If stdio is closed (rare but seen in detached daemonized
|
||||
contexts), os.fstat raises OSError. We catch it and exit 2 with
|
||||
a guidance message instead of letting the traceback escape."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
os.close(w) # Now `w` is a stale fd — fstat will fail.
|
||||
try:
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "cannot stat stdout" in err
|
||||
finally:
|
||||
os.close(r)
|
||||
|
||||
|
||||
def _readable(fd: int) -> bool:
|
||||
"""True iff ``fd`` has bytes available without blocking. Lets
|
||||
us poll the pipe in a loop without the test hanging when the
|
||||
|
||||
Loading…
Reference in New Issue
Block a user