Merge pull request 'fix(a2a-mcp): use readline() not read(65536) for pipe-safe stdio (openclaw peer-visibility root cause)' (#1307) from fix/a2a-mcp-stdio-pipe-blocking-readline into main
Block internal-flavored paths / Block forbidden paths (push) Waiting to run
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (push) Waiting to run
CI / Detect changes (push) Waiting to run
CI / Platform (Go) (push) Waiting to run
CI / Canvas (Next.js) (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Waiting to run
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Waiting to run
CI / all-required (push) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
E2E API Smoke Test / detect-changes (push) Waiting to run
E2E Chat / detect-changes (push) Waiting to run
E2E Chat / E2E Chat (push) Blocked by required conditions
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Waiting to run
E2E Staging Canvas (Playwright) / detect-changes (push) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Blocked by required conditions
Handlers Postgres Integration / detect-changes (push) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
publish-runtime-autobump / pr-validate (push) Waiting to run
publish-runtime-autobump / bump-and-tag (push) Waiting to run
publish-workspace-server-image / Production auto-deploy (push) Blocked by required conditions
Runtime PR-Built Compatibility / detect-changes (push) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (push) Waiting to run
publish-workspace-server-image / build-and-push (push) Has been cancelled
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 11s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m25s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m34s
Block internal-flavored paths / Block forbidden paths (push) Waiting to run
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (push) Waiting to run
CI / Detect changes (push) Waiting to run
CI / Platform (Go) (push) Waiting to run
CI / Canvas (Next.js) (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Waiting to run
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Waiting to run
CI / all-required (push) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
E2E API Smoke Test / detect-changes (push) Waiting to run
E2E Chat / detect-changes (push) Waiting to run
E2E Chat / E2E Chat (push) Blocked by required conditions
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Waiting to run
E2E Staging Canvas (Playwright) / detect-changes (push) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Blocked by required conditions
Handlers Postgres Integration / detect-changes (push) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
publish-runtime-autobump / pr-validate (push) Waiting to run
publish-runtime-autobump / bump-and-tag (push) Waiting to run
publish-workspace-server-image / Production auto-deploy (push) Blocked by required conditions
Runtime PR-Built Compatibility / detect-changes (push) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (push) Waiting to run
publish-workspace-server-image / build-and-push (push) Has been cancelled
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 11s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m25s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m34s
This commit was merged in pull request #1307.
This commit is contained in:
@@ -158,8 +158,68 @@ jobs:
|
||||
echo "NOTE: No warning in output (may be suppressed by log level)"
|
||||
fi
|
||||
|
||||
- name: Reproduce openclaw failure — pipe held OPEN, no EOF
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== keep-stdin-open pipe (the real openclaw / Claude Code case) ==="
|
||||
echo ""
|
||||
echo "Before the readline() fix this HANGS: main() did"
|
||||
echo " stdin.read(65536) -> on a pipe, blocks until 64KB OR EOF."
|
||||
echo "An MCP client sends one ~150B initialize and keeps stdin"
|
||||
echo "open waiting for the response, so the server never parsed"
|
||||
echo "the request and the client timed out (openclaw: 'MCP error"
|
||||
echo "-32000: Connection closed'). The earlier regular-file /"
|
||||
echo "heredoc-pipe steps PASSED through this bug because a file"
|
||||
echo "(or a closing heredoc) yields EOF immediately."
|
||||
echo ""
|
||||
|
||||
# Drive the server through a real pipe that stays OPEN: write
|
||||
# one initialize, do NOT close stdin, and require a response
|
||||
# within a hard timeout. read(65536) -> no output -> timeout
|
||||
# kills it -> FAIL. readline() -> immediate response -> PASS.
|
||||
python - <<'PYEOF'
|
||||
import json, subprocess, sys, time, select
|
||||
|
||||
proc = subprocess.Popen(
|
||||
[sys.executable, "a2a_mcp_server.py"],
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
env={**__import__("os").environ},
|
||||
)
|
||||
req = json.dumps({
|
||||
"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
||||
"params": {"protocolVersion": "2024-11-05",
|
||||
"capabilities": {},
|
||||
"clientInfo": {"name": "keepopen", "version": "1"}},
|
||||
}) + "\n"
|
||||
proc.stdin.write(req.encode())
|
||||
proc.stdin.flush()
|
||||
# Deliberately DO NOT close proc.stdin — mirror a live MCP client.
|
||||
|
||||
deadline = time.time() + 15
|
||||
line = b""
|
||||
while time.time() < deadline:
|
||||
r, _, _ = select.select([proc.stdout], [], [], 1)
|
||||
if r:
|
||||
line = proc.stdout.readline()
|
||||
if line:
|
||||
break
|
||||
proc.kill()
|
||||
|
||||
if not line:
|
||||
print("FAIL: no response within 15s on an open pipe — "
|
||||
"stdin.read(65536) regression is back")
|
||||
sys.exit(1)
|
||||
resp = json.loads(line.decode())
|
||||
assert resp.get("id") == 1 and "result" in resp, \
|
||||
f"unexpected response: {line[:200]!r}"
|
||||
assert resp["result"]["serverInfo"]["name"] == "molecule", \
|
||||
f"wrong serverInfo: {line[:200]!r}"
|
||||
print("PASS: server answered initialize on a still-open pipe")
|
||||
PYEOF
|
||||
|
||||
- name: Run unit tests for stdio transport
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Running stdio transport unit tests ==="
|
||||
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov
|
||||
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion tests/test_a2a_mcp_server.py::TestStdioKeepOpenPipe -v --no-cov
|
||||
|
||||
@@ -788,7 +788,23 @@ async def main(): # pragma: no cover
|
||||
buffer = b""
|
||||
while True:
|
||||
try:
|
||||
chunk = await loop.run_in_executor(None, stdin.read, 65536)
|
||||
# MUST be readline(), NOT read(65536). MCP is a line-delimited
|
||||
# JSON-RPC stream where the client (openclaw bundle-mcp,
|
||||
# Claude Code, Cursor, ...) sends one small (~150B) request
|
||||
# and keeps stdin OPEN waiting for the response. A fixed-size
|
||||
# `stdin.read(65536)` on a PIPE blocks until either 64KB
|
||||
# accumulate OR EOF — neither happens during a normal MCP
|
||||
# handshake — so the server never parses `initialize` and the
|
||||
# client times out (~30s; openclaw: "MCP error -32000:
|
||||
# Connection closed"). This made the stdio transport unusable
|
||||
# for every pipe-spawned MCP host while passing tests/manual
|
||||
# checks that fed stdin from a regular FILE (where read()
|
||||
# returns immediately at the short file's end). readline()
|
||||
# returns as soon as one newline-terminated line is available,
|
||||
# which is exactly the JSON-RPC framing. Diagnosed 2026-05-15
|
||||
# against a live openclaw workspace; see
|
||||
# molecule-ai-workspace-runtime#61 (same fd-compat lineage).
|
||||
chunk = await loop.run_in_executor(None, stdin.readline)
|
||||
if not chunk:
|
||||
break
|
||||
buffer += chunk
|
||||
|
||||
@@ -2097,3 +2097,124 @@ def test_peer_metadata_set_replaces_existing_entry_in_place(_reset_peer_metadata
|
||||
)
|
||||
cached = a2a_client._peer_metadata[peer]
|
||||
assert cached[1]["name"] == "v2", "re-write must update the value in place"
|
||||
|
||||
|
||||
class TestStdioKeepOpenPipe:
|
||||
"""Regression for the openclaw peer-visibility outage (2026-05-15).
|
||||
|
||||
main()'s read loop used `await loop.run_in_executor(None,
|
||||
stdin.read, 65536)`. On a PIPE, `read(n)` blocks until n bytes
|
||||
accumulate OR EOF. A real MCP client (openclaw bundle-mcp, Claude
|
||||
Code, Cursor) sends ONE ~150-byte newline-delimited request and
|
||||
keeps stdin OPEN waiting for the reply — so neither condition is
|
||||
met, the server never parses `initialize`, and the client times
|
||||
out (~30s; openclaw surfaced "MCP error -32000: Connection
|
||||
closed"). Every prior stdio test fed stdin from a regular file or
|
||||
a heredoc-pipe that CLOSES (EOF), masking the bug.
|
||||
|
||||
These spawn the real a2a_mcp_server.py process, write one request
|
||||
over a pipe, and DELIBERATELY keep stdin open. With the buggy
|
||||
read(65536) the assertion times out and fails; with readline() it
|
||||
passes promptly. This is the literal user-facing path, not a
|
||||
mock — see feedback_smoke_test_vendor_truth_not_shape_match.
|
||||
"""
|
||||
|
||||
def _spawn(self):
|
||||
import subprocess
|
||||
env = dict(os.environ)
|
||||
env.setdefault("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
|
||||
server = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||||
"a2a_mcp_server.py",
|
||||
)
|
||||
return subprocess.Popen(
|
||||
["python3", server],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
env=env,
|
||||
)
|
||||
|
||||
def _read_line_with_deadline(self, proc, deadline_s=15):
|
||||
import select
|
||||
import time
|
||||
end = time.time() + deadline_s
|
||||
while time.time() < end:
|
||||
r, _, _ = select.select([proc.stdout], [], [], 1)
|
||||
if r:
|
||||
line = proc.stdout.readline()
|
||||
if line:
|
||||
return line
|
||||
return b""
|
||||
|
||||
def test_initialize_answered_on_still_open_pipe(self):
|
||||
"""One initialize, stdin kept OPEN, response required <15s.
|
||||
|
||||
FAILS (times out -> empty line) on stdin.read(65536).
|
||||
PASSES on stdin.readline().
|
||||
"""
|
||||
proc = self._spawn()
|
||||
try:
|
||||
req = json.dumps({
|
||||
"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
||||
"params": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {},
|
||||
"clientInfo": {"name": "keepopen", "version": "1"},
|
||||
},
|
||||
}) + "\n"
|
||||
proc.stdin.write(req.encode())
|
||||
proc.stdin.flush()
|
||||
# NOTE: stdin is intentionally NOT closed — mirrors a live
|
||||
# MCP client. Closing it here would yield EOF and let the
|
||||
# buggy read(65536) return, hiding the regression.
|
||||
|
||||
line = self._read_line_with_deadline(proc, 15)
|
||||
finally:
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
|
||||
assert line, (
|
||||
"no response within 15s on a still-open pipe — the "
|
||||
"stdin.read(65536) pipe-blocking regression is back "
|
||||
"(this is the exact openclaw peer-visibility outage)"
|
||||
)
|
||||
resp = json.loads(line.decode())
|
||||
assert resp.get("id") == 1, f"unexpected id: {line[:200]!r}"
|
||||
assert "result" in resp, f"no result envelope: {line[:200]!r}"
|
||||
assert resp["result"]["serverInfo"]["name"] == "molecule", (
|
||||
f"wrong serverInfo: {line[:200]!r}"
|
||||
)
|
||||
|
||||
def test_two_sequential_requests_on_open_pipe(self):
|
||||
"""initialize THEN tools/list on the same open pipe — proves
|
||||
the loop keeps reading line-by-line, not just the first 64KB
|
||||
chunk. tools/list must include list_peers (the peer-visibility
|
||||
tool the outage was about)."""
|
||||
proc = self._spawn()
|
||||
try:
|
||||
proc.stdin.write((json.dumps({
|
||||
"jsonrpc": "2.0", "id": 1, "method": "initialize",
|
||||
"params": {"protocolVersion": "2024-11-05",
|
||||
"capabilities": {},
|
||||
"clientInfo": {"name": "x", "version": "1"}},
|
||||
}) + "\n").encode())
|
||||
proc.stdin.flush()
|
||||
init = self._read_line_with_deadline(proc, 15)
|
||||
assert init, "initialize unanswered on open pipe"
|
||||
|
||||
proc.stdin.write((json.dumps({
|
||||
"jsonrpc": "2.0", "id": 2, "method": "tools/list",
|
||||
}) + "\n").encode())
|
||||
proc.stdin.flush()
|
||||
tl = self._read_line_with_deadline(proc, 15)
|
||||
finally:
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
|
||||
assert tl, "tools/list unanswered — loop stopped after one read"
|
||||
resp = json.loads(tl.decode())
|
||||
names = {t["name"] for t in resp["result"]["tools"]}
|
||||
assert "list_peers" in names, (
|
||||
f"list_peers missing from tools/list: {sorted(names)}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user