Merge pull request #2981 from Molecule-AI/staging

staging → main: auto-promote 9dd2988
This commit is contained in:
molecule-ai[bot] 2026-05-05 18:13:50 -07:00 committed by GitHub
commit d75b73e713
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1743 additions and 36 deletions

View File

@ -0,0 +1,83 @@
name: auto-promote-stale-alarm
# Hourly cron + on-demand alarm for the silent-block failure mode that
# motivated issue #2975:
# - The auto-promote-staging.yml workflow opened a PR + armed
# auto-merge, but main's branch protection requires a human review
# (reviewDecision=REVIEW_REQUIRED). The PR sat BLOCKED with no
# surface-up-the-stack for 12+ hours, holding 25 commits hostage
# including the Memory v2 redesign and a reno-stars data-loss fix.
#
# This workflow runs `scripts/check-stale-promote-pr.sh` against the
# repo's open auto-promote PRs (base=main head=staging). When a PR has
# been BLOCKED on REVIEW_REQUIRED for >4h, it:
# 1. Emits a workflow-level warning (visible in run summary + the
# Actions UI feed).
# 2. Posts a comment on the PR (idempotent — one alarm per PR).
#
# The detection logic lives in scripts/check-stale-promote-pr.sh so
# it's unit-testable with stubbed `gh` (see test-check-stale-promote-pr.sh).
# This file is the schedule + invocation surface only — SSOT for the
# detector itself.
on:
schedule:
# Hourly. Cheap (one `gh pr list` + jq), and 1h granularity is
# plenty for a 4h staleness threshold — operators see the alarm
# within at most 1h of crossing the threshold.
- cron: "27 * * * *" # at :27 to dodge the cron herd at :00
workflow_dispatch:
inputs:
stale_hours:
description: "Hours after which a BLOCKED+REVIEW_REQUIRED PR is stale (default 4)"
required: false
default: "4"
post_comment:
description: "Post a comment on stale PRs (default true)"
required: false
default: "true"
permissions:
contents: read
pull-requests: write # post comments on stale PRs
# Serialize so the on-demand and scheduled runs don't double-comment
# the same PR. cancel-in-progress=false because the script is idempotent
# (existing comment marker prevents dupes), but a scheduled run firing
# while a manual one runs would just re-list the same PR set.
concurrency:
group: auto-promote-stale-alarm
cancel-in-progress: false
jobs:
scan:
runs-on: ubuntu-latest
steps:
- name: Checkout (need scripts/ only)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
sparse-checkout: |
scripts/check-stale-promote-pr.sh
sparse-checkout-cone-mode: false
- name: Run stale-PR detector
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
STALE_HOURS: ${{ inputs.stale_hours || '4' }}
POST_COMMENT: ${{ inputs.post_comment || 'true' }}
run: |
# The script's exit code reflects the count of stale PRs.
# We don't want a stale finding to fail the workflow run —
# the warning + comment are the signal, the green/red is
# noise. So convert any non-zero exit to a workflow notice
# and exit 0.
set +e
bash scripts/check-stale-promote-pr.sh
rc=$?
set -e
if [ "$rc" -ne 0 ]; then
echo "::notice::Stale PR detector found $rc PR(s) needing attention. See warnings above + comments on the PRs."
fi
# Always succeed — operator-facing surface is the warning,
# not the workflow status.
exit 0

View File

@ -1,5 +1,5 @@
import { describe, it, expect } from "vitest";
import { resolveAttachmentHref } from "../uploads";
import { isPlatformAttachment, resolveAttachmentHref } from "../uploads";
describe("resolveAttachmentHref — URI scheme normalisation", () => {
const wsId = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
@ -39,3 +39,128 @@ describe("resolveAttachmentHref — URI scheme normalisation", () => {
expect(resolveAttachmentHref(wsId, "s3://bucket/key")).toBe("s3://bucket/key");
});
});
// #2973 follow-up to #2968: cover the platform-pending: scheme branch
// (poll-mode chat uploads) + the isPlatformAttachment SSOT helper that
// the chip-download and markdown-link paths both consume.
//
// Pre-fix the platform-pending: URI fell through to the raw URI →
// browser saw an unhandled-protocol click → about:blank. The fix
// resolves it to the platform pending-uploads endpoint with auth
// headers attached.
describe("resolveAttachmentHref — platform-pending: scheme (poll-mode uploads)", () => {
// Use a chat workspace ID that DIFFERS from the one in the URI, so
// tests can verify which one the resolver uses. The forward-across-
// workspace case is real production behavior — files dragged into one
// workspace's chat can be referenced from another.
const chatWs = "chat-ws-aaaaaaaa";
const sourceWs = "source-ws-bbbbbbbb";
it("resolves a well-formed platform-pending: URI to /pending-uploads/<file>/content", () => {
const url = resolveAttachmentHref(
chatWs,
`platform-pending:${sourceWs}/file-12345`,
);
expect(url).toContain(`/workspaces/${sourceWs}/pending-uploads/file-12345/content`);
});
it("uses the URI's wsid, NOT the chat workspace_id (cross-workspace forwarding)", () => {
// The two ids differ — this is the case PR #2968's commit
// explicitly calls out. A regression that flipped this would
// silently mis-route the download to the WRONG workspace's
// pending-uploads store, returning 404 (or worse, leaking).
const url = resolveAttachmentHref(
chatWs,
`platform-pending:${sourceWs}/file-xyz`,
);
expect(url).toContain(`/workspaces/${sourceWs}/`);
expect(url).not.toContain(`/workspaces/${chatWs}/`);
});
it("falls back to raw URI when platform-pending: is missing the slash", () => {
// Defensive: a URI that drifted from the expected wsid/fileid shape
// returns raw rather than producing a broken /pending-uploads//
// path. Pinned to detect a regression where a future "helpful"
// change synthesizes empty wsid/fileID.
expect(resolveAttachmentHref(chatWs, "platform-pending:no-slash")).toBe(
"platform-pending:no-slash",
);
});
it("falls back to raw URI when platform-pending: has empty fileID", () => {
expect(resolveAttachmentHref(chatWs, "platform-pending:abc/")).toBe(
"platform-pending:abc/",
);
});
it("falls back to raw URI when platform-pending: has empty wsid", () => {
expect(resolveAttachmentHref(chatWs, "platform-pending:/file-xyz")).toBe(
"platform-pending:/file-xyz",
);
});
it("regression: exact production repro from #2968 (reno-stars)", () => {
// From the original PR #2968 body: the chat's markdown-link
// override fell through on this exact shape and the browser
// navigated to about:blank. Pin the post-fix output so a future
// refactor can't reintroduce the original bug.
const url = resolveAttachmentHref(
"chat-ws",
"platform-pending:d76977b1-uuid/bb0dcaf3-uuid",
);
expect(url).toContain("/workspaces/d76977b1-uuid/pending-uploads/bb0dcaf3-uuid/content");
expect(url).not.toContain("chat-ws");
});
});
describe("isPlatformAttachment", () => {
it("returns true for platform-pending: URIs", () => {
expect(isPlatformAttachment("platform-pending:abc/file")).toBe(true);
});
it("returns true even for malformed platform-pending: URIs", () => {
// The helper is a SHAPE check — caller routes through
// downloadChatFile and downloadChatFile handles the malformed case
// downstream. Pinning so a future helper that "validates" the
// wsid/fileID shape doesn't silently break the auth-attached
// download flow for in-flight URIs.
expect(isPlatformAttachment("platform-pending:no-slash")).toBe(true);
});
it("returns true for workspace:<allowed-root> URIs", () => {
expect(isPlatformAttachment("workspace:/configs/foo")).toBe(true);
expect(isPlatformAttachment("workspace:/workspace/x.pdf")).toBe(true);
});
it("returns true for file:///<allowed-root> URIs", () => {
expect(isPlatformAttachment("file:///workspace/x")).toBe(true);
});
it("returns true for absolute paths under allowed roots", () => {
expect(isPlatformAttachment("/home/user/x")).toBe(true);
expect(isPlatformAttachment("/configs/y")).toBe(true);
});
it("returns FALSE for bare HTTPS URLs to other origins", () => {
// Auth-leak class regression: a helper that always returned true
// would attach workspace tokens to third-party requests. Pin
// the negative case explicitly.
expect(isPlatformAttachment("https://example.com/file")).toBe(false);
expect(isPlatformAttachment("http://example.com/file")).toBe(false);
});
it("returns FALSE for non-allowlisted root paths", () => {
expect(isPlatformAttachment("/etc/passwd")).toBe(false);
expect(isPlatformAttachment("/var/log/x")).toBe(false);
expect(isPlatformAttachment("/tmp/x")).toBe(false);
});
it("returns FALSE for empty string", () => {
expect(isPlatformAttachment("")).toBe(false);
});
it("returns FALSE for unrecognised schemes", () => {
expect(isPlatformAttachment("s3://bucket/key")).toBe(false);
expect(isPlatformAttachment("ftp://server/file")).toBe(false);
});
});

View File

@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = {
"a2a_client",
"a2a_executor",
"a2a_mcp_server",
"a2a_response",
"a2a_tools",
"a2a_tools_delegation",
"a2a_tools_inbox",

216
scripts/check-stale-promote-pr.sh Executable file
View File

@ -0,0 +1,216 @@
#!/usr/bin/env bash
# scripts/check-stale-promote-pr.sh
#
# Scan open auto-promote PRs (base=main head=staging) for the
# silent-block failure mode that motivated issue #2975:
# - PR sat for hours with mergeStateStatus=BLOCKED
# - reviewDecision=REVIEW_REQUIRED (auto-merge armed but waiting
# on a human approval that never comes)
#
# When found, emit:
# - GitHub Actions notice/warning lines (workflow summary surface)
# - Optionally post a comment on the PR (--comment)
#
# Exit code is the count of stale PRs found, capped at 125 so callers
# can detect "alarm fired" via `if ! check-stale-promote-pr.sh; then …`.
# Exit 0 = clean, exit ≥1 = at least N stale PRs need attention.
#
# Used by .github/workflows/auto-promote-stale-alarm.yml. Logic lives
# here (not inline in the workflow YAML) so we can:
# - Unit-test it with a stubbed `gh` (see test-check-stale-promote-pr.sh)
# - Run it ad-hoc by an operator: `scripts/check-stale-promote-pr.sh`
# - Reuse the same surface in any sibling workflow that needs the same
# check (SSOT — one detector, many callers).
#
# Requires: `gh` CLI, `jq`. `GH_TOKEN` env in the workflow context.
set -euo pipefail
# -----------------------------------------------------------------------------
# Inputs
# -----------------------------------------------------------------------------
# Threshold beyond which a BLOCKED+REVIEW_REQUIRED promote PR is "stale"
# enough to alarm. 4 hours is the floor: most legitimate gates clear
# inside an hour, so 4× headroom is plenty for slow CI without false-
# alarming. Override via env for tests + edge ops.
STALE_HOURS="${STALE_HOURS:-4}"
# Repo defaults to the current `gh` context. Tests pass --repo explicitly.
REPO="${GITHUB_REPOSITORY:-}"
# Whether to post a comment to the PR. Off by default to avoid noise on
# manual ad-hoc runs; the cron workflow turns it on.
POST_COMMENT="${POST_COMMENT:-false}"
# Where to read the open-PR JSON from. Empty = call `gh` live. Tests
# point this at a fixture file.
PR_FIXTURE="${PR_FIXTURE:-}"
# Where to read "now" from. Empty = real clock. Tests freeze time so
# the staleness math is deterministic.
NOW_OVERRIDE="${NOW_OVERRIDE:-}"
while [ $# -gt 0 ]; do
case "$1" in
--repo) REPO="$2"; shift 2 ;;
--comment) POST_COMMENT="true"; shift ;;
--no-comment) POST_COMMENT="false"; shift ;;
--fixture) PR_FIXTURE="$2"; shift 2 ;;
--stale-hours) STALE_HOURS="$2"; shift 2 ;;
-h|--help)
sed -n '1,/^set /p' "$0" | grep '^# ' | sed 's/^# //'
exit 0
;;
*) echo "unknown arg: $1" >&2; exit 64 ;;
esac
done
if [ -z "$REPO" ] && [ -z "$PR_FIXTURE" ]; then
echo "::error::REPO env (or GITHUB_REPOSITORY) required when no fixture given" >&2
exit 2
fi
# -----------------------------------------------------------------------------
# Clock helpers — split out so tests can freeze time
# -----------------------------------------------------------------------------
now_epoch() {
if [ -n "$NOW_OVERRIDE" ]; then
printf '%s\n' "$NOW_OVERRIDE"
else
date -u +%s
fi
}
# Parse RFC3339 timestamps the way GitHub emits them (e.g.
# "2026-05-05T23:15:00Z"). gnu-date uses -d, bsd-date uses -j -f. Cover
# both because the workflow runs on ubuntu-latest (gnu) but operators
# may run this script on macOS (bsd).
to_epoch() {
local ts="$1"
# gnu-date path first.
if date -u -d "$ts" +%s 2>/dev/null; then
return 0
fi
# bsd-date fallback — strip optional fractional seconds before %S.
local ts_clean="${ts%%.*}"
ts_clean="${ts_clean%Z}Z"
date -u -j -f "%Y-%m-%dT%H:%M:%SZ" "$ts_clean" +%s 2>/dev/null || {
echo "::error::cannot parse timestamp: $ts" >&2
return 1
}
}
# -----------------------------------------------------------------------------
# Fetch open auto-promote PRs
# -----------------------------------------------------------------------------
fetch_prs() {
if [ -n "$PR_FIXTURE" ]; then
cat "$PR_FIXTURE"
return 0
fi
gh pr list --repo "$REPO" \
--base main --head staging --state open \
--json number,title,createdAt,mergeStateStatus,reviewDecision,url
}
# -----------------------------------------------------------------------------
# Stale detection
# -----------------------------------------------------------------------------
# Read PR list from stdin, emit one TSV line per stale PR:
# <num>\t<age_hours>\t<url>\t<title>
# Caller decides what to do (warn, comment, escalate).
detect_stale() {
local now_ts
now_ts="$(now_epoch)"
local stale_seconds=$((STALE_HOURS * 3600))
jq -r '.[] | [.number, .createdAt, .mergeStateStatus, .reviewDecision, .url, .title] | @tsv' \
| while IFS=$'\t' read -r num created_at merge_state review_decision url title; do
# Only alarm on the specific failure mode: BLOCKED + REVIEW_REQUIRED.
# Other BLOCKED reasons (DIRTY, BEHIND, failed checks) are the
# author's signal-to-fix; this script targets the silent
# "no human reviewed yet" wedge specifically.
[ "$merge_state" = "BLOCKED" ] || continue
[ "$review_decision" = "REVIEW_REQUIRED" ] || continue
local created_ts
created_ts="$(to_epoch "$created_at")" || continue
local age=$((now_ts - created_ts))
if [ "$age" -ge "$stale_seconds" ]; then
local age_h=$((age / 3600))
printf '%s\t%d\t%s\t%s\n' "$num" "$age_h" "$url" "$title"
fi
done
}
# -----------------------------------------------------------------------------
# Reporting
# -----------------------------------------------------------------------------
# Comment body — kept short; the issue body has the full design.
comment_body() {
local age_h="$1"
cat <<EOF
⚠️ This auto-promote PR has been BLOCKED on \`REVIEW_REQUIRED\` for **${age_h}h**.
Auto-merge is armed, but main's branch protection requires 1 review and no human has approved. Until someone reviews, the staging→main promote chain is wedged and downstream consumers (canvas builds, tenant redeploys) won't see new code.
**Action**: a human reviewer on \`@Molecule-AI/maintainers\` should approve this PR (or mark it as not ready and close).
Detected by \`scripts/check-stale-promote-pr.sh\` per issue #2975.
EOF
}
post_comment() {
local pr_num="$1"
local age_h="$2"
if [ "$POST_COMMENT" != "true" ]; then
return 0
fi
# Idempotency: only one alarm comment per PR. Look for the marker
# string in existing comments before posting a new one.
local existing
existing="$(gh pr view "$pr_num" --repo "$REPO" --json comments \
--jq '.comments[] | select(.body | test("scripts/check-stale-promote-pr.sh per issue #2975")) | .databaseId' \
| head -n1)"
if [ -n "$existing" ]; then
echo "::notice::PR #$pr_num already has a stale-alarm comment ($existing) — not re-posting"
return 0
fi
comment_body "$age_h" | gh pr comment "$pr_num" --repo "$REPO" --body-file -
echo "::notice::Posted stale-alarm comment on PR #$pr_num (age=${age_h}h)"
}
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
stale_count=0
while IFS=$'\t' read -r num age_h url title; do
[ -n "$num" ] || continue
stale_count=$((stale_count + 1))
echo "::warning title=Stale auto-promote PR::PR #$num — BLOCKED on REVIEW_REQUIRED for ${age_h}h. $url"
{
echo "## ⚠️ Stale auto-promote PR detected"
echo
echo "- PR: #$num — \`$title\`"
echo "- Age: ${age_h}h"
echo "- State: BLOCKED on REVIEW_REQUIRED"
echo "- URL: $url"
echo
echo "Auto-merge is armed but waiting on a human review. See issue #2975."
} >> "${GITHUB_STEP_SUMMARY:-/dev/null}"
post_comment "$num" "$age_h"
done < <(fetch_prs | detect_stale)
if [ "$stale_count" -eq 0 ]; then
echo "::notice::No stale auto-promote PRs detected (threshold: ${STALE_HOURS}h)"
fi
# Cap exit code so we don't accidentally break shells that interpret
# >125 as signal-style. 1..N maps to "1..N stale PRs".
exit $(( stale_count > 125 ? 125 : stale_count ))

View File

@ -0,0 +1,257 @@
#!/usr/bin/env bash
# scripts/test-check-stale-promote-pr.sh
#
# Exhaustive bash unit tests for check-stale-promote-pr.sh.
# Goal: 100% branch coverage on the detector logic.
#
# Each case writes a fixture JSON, freezes the clock with NOW_OVERRIDE,
# runs the script with --fixture + --no-comment (so we don't try to
# actually call `gh pr comment`), and asserts on stdout/exit code.
#
# Run: bash scripts/test-check-stale-promote-pr.sh
# Expected: "All N tests passed" + exit 0.
set -euo pipefail
SCRIPT="$(cd "$(dirname "$0")" && pwd)/check-stale-promote-pr.sh"
TMP="$(mktemp -d)"
trap 'rm -rf "$TMP"' EXIT
PASS=0
FAIL=0
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
# Frozen "now" — 2026-05-06T05:00:00Z. Compute dynamically so the
# tests stay correct regardless of platform-specific date semantics
# (gnu vs bsd) and any author math errors on the epoch.
if FROZEN_NOW="$(date -u -d '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then
: # gnu-date worked
elif FROZEN_NOW="$(date -u -j -f '%Y-%m-%dT%H:%M:%SZ' '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then
: # bsd-date worked
else
echo "FATAL: cannot compute FROZEN_NOW on this platform" >&2
exit 1
fi
run_script() {
# Args: <fixture-file>
# Returns stdout + exit code via a known marker.
local fixture="$1"
shift
set +e
NOW_OVERRIDE="$FROZEN_NOW" \
POST_COMMENT="false" \
bash "$SCRIPT" --fixture "$fixture" "$@" 2>&1
local rc=$?
set -e
echo "EXIT_CODE=$rc"
}
assert_pass() {
local name="$1"
local got="$2"
local want_pattern="$3"
if printf '%s' "$got" | grep -qE "$want_pattern"; then
PASS=$((PASS + 1))
printf ' ✓ %s\n' "$name"
else
FAIL=$((FAIL + 1))
printf ' ✗ %s\n want pattern: %s\n got:\n%s\n' "$name" "$want_pattern" "$got"
fi
}
assert_no_match() {
local name="$1"
local got="$2"
local bad_pattern="$3"
if printf '%s' "$got" | grep -qE "$bad_pattern"; then
FAIL=$((FAIL + 1))
printf ' ✗ %s\n bad pattern matched: %s\n got:\n%s\n' "$name" "$bad_pattern" "$got"
else
PASS=$((PASS + 1))
printf ' ✓ %s\n' "$name"
fi
}
# ─────────────────────────────────────────────────────────────────────────────
# Test cases
# ─────────────────────────────────────────────────────────────────────────────
echo "1. Empty PR list — clean exit"
echo '[]' > "$TMP/empty.json"
got=$(run_script "$TMP/empty.json")
assert_pass "empty-no-warning" "$got" "No stale auto-promote PRs detected"
assert_pass "empty-exit-zero" "$got" "EXIT_CODE=0"
echo
echo "2. Single PR, BLOCKED+REVIEW_REQUIRED, 5h old — fires alarm"
cat > "$TMP/stale1.json" <<EOF
[{
"number": 2963,
"title": "staging → main",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://github.com/test/test/pull/2963"
}]
EOF
got=$(run_script "$TMP/stale1.json")
assert_pass "stale1-warning" "$got" "Stale auto-promote PR"
assert_pass "stale1-pr-number" "$got" "PR #2963"
assert_pass "stale1-age" "$got" "for 5h"
assert_pass "stale1-exit-1" "$got" "EXIT_CODE=1"
echo
echo "3. Same PR but only 3h old — under threshold, NO alarm"
cat > "$TMP/young.json" <<EOF
[{
"number": 100,
"title": "fresh promote",
"createdAt": "2026-05-06T02:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://github.com/test/test/pull/100"
}]
EOF
got=$(run_script "$TMP/young.json")
assert_pass "young-no-alarm" "$got" "No stale auto-promote PRs"
assert_pass "young-exit-zero" "$got" "EXIT_CODE=0"
assert_no_match "young-no-warning" "$got" "Stale auto-promote PR"
echo
echo "4. PR is BLOCKED but for the wrong reason (DIRTY, not REVIEW_REQUIRED)"
cat > "$TMP/dirty.json" <<EOF
[{
"number": 200,
"title": "needs rebase",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "APPROVED",
"url": "https://github.com/test/test/pull/200"
}]
EOF
got=$(run_script "$TMP/dirty.json")
assert_pass "dirty-no-alarm" "$got" "No stale auto-promote PRs"
assert_pass "dirty-exit-zero" "$got" "EXIT_CODE=0"
echo
echo "5. PR is APPROVED but mergeStateStatus is CLEAN — NOT alarming"
cat > "$TMP/clean.json" <<EOF
[{
"number": 300,
"title": "all green",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "CLEAN",
"reviewDecision": "APPROVED",
"url": "https://github.com/test/test/pull/300"
}]
EOF
got=$(run_script "$TMP/clean.json")
assert_pass "clean-no-alarm" "$got" "No stale auto-promote PRs"
echo
echo "6. Multiple PRs — only the BLOCKED+REVIEW_REQUIRED+old one alarms"
cat > "$TMP/mixed.json" <<EOF
[
{
"number": 100,
"title": "fresh",
"createdAt": "2026-05-06T04:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/100"
},
{
"number": 200,
"title": "stale + alarming",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/200"
},
{
"number": 300,
"title": "approved + clean",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "CLEAN",
"reviewDecision": "APPROVED",
"url": "https://x/300"
}
]
EOF
got=$(run_script "$TMP/mixed.json")
assert_pass "mixed-only-200" "$got" "PR #200"
assert_no_match "mixed-not-100" "$got" "PR #100"
assert_no_match "mixed-not-300" "$got" "PR #300"
assert_pass "mixed-exit-1" "$got" "EXIT_CODE=1"
echo
echo "7. Custom STALE_HOURS via --stale-hours overrides threshold"
got=$(run_script "$TMP/young.json" --stale-hours 1)
assert_pass "custom-threshold-fires" "$got" "PR #100"
assert_pass "custom-threshold-exit-1" "$got" "EXIT_CODE=1"
echo
echo "8. Two stale PRs — exit code reflects count"
cat > "$TMP/two-stale.json" <<EOF
[
{
"number": 200,
"title": "stale-A",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/200"
},
{
"number": 201,
"title": "stale-B",
"createdAt": "2026-05-05T19:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/201"
}
]
EOF
got=$(run_script "$TMP/two-stale.json")
assert_pass "two-stale-exit-2" "$got" "EXIT_CODE=2"
echo
echo "9. Help text is shown for --help"
set +e
help_out=$(bash "$SCRIPT" --help 2>&1)
help_rc=$?
set -e
assert_pass "help-exits-zero" "EXIT_CODE=$help_rc" "EXIT_CODE=0"
assert_pass "help-mentions-issue" "$help_out" "issue #2975"
echo
echo "10. Unknown arg exits 64 (EX_USAGE)"
set +e
bad_out=$(bash "$SCRIPT" --bogus 2>&1)
bad_rc=$?
set -e
assert_pass "unknown-arg-rc" "EXIT_CODE=$bad_rc" "EXIT_CODE=64"
echo
echo "11. Missing repo + missing fixture exits 2"
set +e
out=$(REPO="" bash "$SCRIPT" 2>&1)
rc=$?
set -e
assert_pass "no-repo-exit-2" "EXIT_CODE=$rc" "EXIT_CODE=2"
# ─────────────────────────────────────────────────────────────────────────────
# Summary
# ─────────────────────────────────────────────────────────────────────────────
echo
echo "─────────────────────────────────────────────"
echo "Tests: $PASS passed, $FAIL failed"
if [ "$FAIL" -gt 0 ]; then
exit 1
fi
echo "All tests passed."

View File

@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a
}')
check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP"
# ---------- Phase 3.5: Python parser classifies queued envelope correctly ----------
# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse()
# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare
# message/send parser in a2a_client.py:587 misclassified this and returned
# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A
# on poll-mode peers.
#
# This phase exercises the actual on-the-wire response from a real
# workspace-server (NOT a mocked dict) through the same module the production
# wheel ships, so a regression in either the server emit shape OR the client
# parser fails this E2E.
echo ""
echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---"
# Pipe the queued response captured above through a2a_response.parse and
# assert the classification. WORKSPACE_ID is required at module import
# time but irrelevant to this parsing call (any UUID is fine).
PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \
python3 -c "
import json, sys
sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)')
import a2a_response
data = json.loads(r'''$A2A_RESP''')
v = a2a_response.parse(data)
print(type(v).__name__)
if isinstance(v, a2a_response.Queued):
print(f'method={v.method} delivery_mode={v.delivery_mode}')
")
check_eq "Python parser classifies real server response as Queued" \
"Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)"
check "Queued variant captures method=message/send" \
"method=message/send" "$PARSE_RESULT"
check "Queued variant captures delivery_mode=poll" \
"delivery_mode=poll" "$PARSE_RESULT"
check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP"
check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP"

View File

@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
import httpx
import a2a_response
from platform_auth import auth_headers, self_source_headers
logger = logging.getLogger(__name__)
@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str:
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
# When the target workspace is registered as delivery_mode=poll (no
# public URL — typical for external molecule-mcp standalone runtimes),
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
# instead of dispatching over HTTP. The message IS delivered (written
# to the platform's inbox queue); there's just no synchronous reply
# to relay. Pre-#2967 the client treated this as "unexpected response
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
# duplicates. The Queued prefix lets callers branch on this outcome
# explicitly: "delivered async, no synchronous reply expected" is
# different from both success-with-text and failure.
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
# Dispatch via the SSOT response model (a2a_response.py).
# All shape detection lives in one place — the parser
# never raises and routes unknown shapes to Malformed
# so a future server-side change is loud, not silent.
variant = a2a_response.parse(data)
if isinstance(variant, a2a_response.Result):
# Match legacy semantics:
# parts non-empty + first part has no text → ""
# parts empty → "(no response)"
# Differentiation matters for callers that assert
# on the empty-string case (test_a2a_client).
if variant.parts:
text = variant.text
else:
text = "(no response)"
# Tag child-reported errors so the caller can
# detect them reliably — agent-side bug surfaces
# text like "Agent error: <traceback>" inside a
# JSON-RPC success envelope.
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if isinstance(variant, a2a_response.Queued):
# Poll-mode peer — message accepted into the inbox
# queue, target agent will fetch via poll. NOT a
# failure. Return the queued sentinel so callers
# (delegate_task etc.) can render the outcome
# accurately instead of treating it as an error.
logger.info(
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
target_url,
variant.method,
)
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
if isinstance(variant, a2a_response.Error):
msg = variant.message
code = variant.code
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
elif data.get("status") == "queued" and data.get("delivery_mode") == "poll":
# Workspace-server's poll-mode short-circuit envelope
# (workspace-server/internal/handlers/a2a_proxy.go ~line 402).
# The peer is poll-mode and has no URL to dispatch to, so
# the server queued the message for the peer's next inbox
# poll instead of forwarding it. Delivery is acknowledged
# but pending consumption.
#
# Pre-fix this fell through to the "unexpected response
# shape" error path → callers logged false failures, then
# delegate_task retried, and the peer received duplicate
# delegations. Issue #2967.
method = data.get("method") or "message/send"
logger.info(
"send_a2a_message: queued for poll-mode peer (method=%s, target=%s)",
method, target_url,
if variant.restarting:
# Surface platform-restart-in-progress
# explicitly — caller (UI / delegating agent)
# can render a softer "agent is restarting"
# message rather than a generic failure.
retry = (
f", retry_after={variant.retry_after}s"
if variant.retry_after is not None
else ""
)
detail = f"{detail} (restarting{retry})"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
# Malformed — log loud + surface as error so the
# operator notices a server change. SSOT refactor
# subsumes the inline "queued" check that landed in
# the #2972 hotfix; that branch is now the typed
# Queued variant above.
logger.warning(
"send_a2a_message: malformed response (target=%s body=%.200s)",
target_url,
str(variant.raw),
)
return (
f"{_A2A_ERROR_PREFIX}unexpected response shape "
f"(no result, error, or queued envelope): "
f"{str(variant.raw)[:200]} [target={target_url}]"
)
return f"queued for poll-mode peer (method={method})"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)

246
workspace/a2a_response.py Normal file
View File

@ -0,0 +1,246 @@
"""Single source of truth for A2A ``/workspaces/<id>/a2a`` response shapes.
The workspace-server proxy at
``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical
emitter) returns one of the following shapes for a single A2A call:
* **JSON-RPC success**
``{"jsonrpc": "2.0", "result": {...}, "id": "..."}``
The agent's reply, passed through unchanged.
* **JSON-RPC error**
``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}``
The agent reported a structured error.
* **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see
``a2a_proxy.go:402-406``)
``{"status": "queued", "delivery_mode": "poll", "method": "..."}``
The target is a poll-mode workspace (no public URL); the message
was written to the platform's inbox queue. The target agent will
fetch it via ``GET /activity?since_id=`` polling. NOT a failure
delivery succeeded, there's just no synchronous reply to relay.
* **Platform error** ``{"error": "...", "restarting": true?, "retry_after": int?}``
HTTP-level failure synthesized by the proxy when the agent is
unreachable, the container is restarting, or some other infrastructure
failure happened. ``restarting=true`` flags the platform-initiated
container-restart path.
* **Malformed** anything else. Surfaced explicitly so a future server
change is loud rather than silent.
The ``parse(data)`` function classifies a pre-decoded JSON body into a
typed variant. Callers ``match`` on the variant and never re-implement
shape detection that's the SSOT discipline.
# SSOT contract
This file is the Python half. The Go server emits these shapes today
via inline ``gin.H{...}`` literals. A future PR can introduce a Go
mirror (e.g. ``workspace-server/internal/models/a2a_response.go``)
with a typed marshaller until then, **any change to the wire shape
must be reflected here** and gated by ``test_a2a_response.py``'s
fixture corpus. The corpus exists specifically so a one-sided edit
breaks CI.
# Why a typed model (vs. dict-key sniffing at every site)
The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result``
or ``error`` keys inline and treated everything else as malformed
which silently broke poll-mode peers (the queued envelope has neither
key). Inline sniffing per call site multiplies the surface area where
a new shape gets misclassified. A single typed parser with an
explicit ``Malformed`` escape hatch makes shape additions a
one-line change here + a fixture entry in the test corpus, instead of
a hunt through every parsing site in the runtime.
"""
from __future__ import annotations
import dataclasses
import logging
from typing import Any, Optional, Union
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Result:
"""JSON-RPC success — agent's reply available synchronously.
``text`` is the convenience extraction from ``parts[0].text`` (the
A2A multipart shape). ``parts`` is the full list, available for
callers that need richer rendering (multiple parts, non-text parts).
``raw_result`` preserves the unparsed ``result`` field for any
caller that needs it (e.g. activity-row response_body audit).
"""
text: str
parts: list[dict[str, Any]] = dataclasses.field(default_factory=list)
raw_result: Optional[dict[str, Any]] = None
@dataclasses.dataclass(frozen=True)
class Error:
"""JSON-RPC error or platform-level error response.
``code`` is the JSON-RPC integer code when present, else None.
``restarting`` / ``retry_after`` are platform-restart-in-progress
metadata: when both are set, the caller knows the container is
being recycled and may surface a softer error to the user.
"""
message: str
code: Optional[int] = None
restarting: bool = False
retry_after: Optional[int] = None
@dataclasses.dataclass(frozen=True)
class Queued:
"""Platform poll-mode short-circuit — message accepted, peer will pick up async.
Returned when the target workspace is registered as
``delivery_mode=poll`` (no public URL typical for external
standalone ``molecule-mcp`` runtimes). The message was written to
the platform's inbox queue; the target agent will fetch it via
``GET /activity?since_id=`` polling.
NOT a failure. Callers that expect a synchronous reply (the agent's
response text) won't get one here — they should either:
* Tolerate the absence of a reply (fire-and-forget semantics).
* Fall back to the durable ``/workspaces/:id/delegate`` +
``/delegations`` polling path (see ``a2a_tools_delegation``'s
``_delegate_sync_via_polling``), which writes the same A2A
request through the platform's executeDelegation goroutine
and lets the caller poll for the result row.
``method`` echoes the request method (``message/send``, ``notify``,
etc.) so callers can correlate.
"""
method: str
delivery_mode: str = "poll"
@dataclasses.dataclass(frozen=True)
class Malformed:
"""Server returned a body the parser can't classify.
Carries the raw decoded payload for diagnostic logging. Callers
typically render this as an error to the user (see
``send_a2a_message``) but the Malformed variant is a separate
type so logging / metrics can distinguish it from genuine
JSON-RPC ``Error`` responses.
"""
raw: Any # whatever the server returned: dict / list / str / number / etc.
Variant = Union[Result, Error, Queued, Malformed]
# Field-name constants — the wire vocabulary. Single source of truth;
# the parser references these by name so a change here is a
# one-line edit instead of a hunt through string literals.
_KEY_RESULT = "result"
_KEY_ERROR = "error"
_KEY_STATUS = "status"
_KEY_DELIVERY_MODE = "delivery_mode"
_KEY_METHOD = "method"
_KEY_RESTARTING = "restarting"
_KEY_RETRY_AFTER = "retry_after"
_STATUS_QUEUED = "queued"
_DELIVERY_MODE_POLL = "poll"
def parse(data: Any) -> Variant:
"""Classify a pre-decoded ``/a2a`` JSON response into a typed variant.
Never raises. Every branch is total: any input that doesn't match a
known shape routes to ``Malformed`` so the caller can decide how
to surface it.
The order of checks matters:
1. Non-dict input Malformed (server contract is dict-shaped).
2. Poll-queued envelope is checked BEFORE result/error because a
server bug that sets both ``status=queued`` and ``result``
should be loud, not silently treated as Result.
3. ``result`` Result (the JSON-RPC success path).
4. ``error`` Error (JSON-RPC error or platform error).
5. Anything else Malformed.
"""
if not isinstance(data, dict):
logger.warning(
"a2a_response.parse: non-dict body — got %s",
type(data).__name__,
)
return Malformed(raw=data)
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
# ambiguous and we route to Malformed for visibility.
if (
data.get(_KEY_STATUS) == _STATUS_QUEUED
and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL
):
method_raw = data.get(_KEY_METHOD)
method = str(method_raw) if method_raw is not None else "unknown"
logger.info(
"a2a_response.parse: queued for poll-mode peer (method=%s)",
method,
)
return Queued(method=method)
# JSON-RPC success.
if _KEY_RESULT in data:
result = data[_KEY_RESULT]
if isinstance(result, dict):
parts_raw = result.get("parts")
parts = parts_raw if isinstance(parts_raw, list) else []
text = ""
if parts:
first = parts[0]
if isinstance(first, dict):
text_raw = first.get("text")
text = str(text_raw) if text_raw is not None else ""
return Result(text=text, parts=parts, raw_result=result)
# ``result`` present but not a dict — unusual but not an error;
# surface as a Result with the value rendered to text.
return Result(text=str(result), parts=[], raw_result=None)
# JSON-RPC error or platform error.
if _KEY_ERROR in data:
err_raw = data[_KEY_ERROR]
message = ""
code: Optional[int] = None
if isinstance(err_raw, dict):
msg_raw = err_raw.get("message")
if msg_raw is not None:
message = str(msg_raw).strip()
code_raw = err_raw.get("code")
if isinstance(code_raw, int):
code = code_raw
elif isinstance(err_raw, str):
message = err_raw.strip()
else:
message = str(err_raw)
restarting = bool(data.get(_KEY_RESTARTING, False))
retry_after_raw = data.get(_KEY_RETRY_AFTER)
retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None
return Error(
message=message,
code=code,
restarting=restarting,
retry_after=retry_after,
)
logger.warning(
"a2a_response.parse: unrecognized shape — keys=%s",
sorted(data.keys()),
)
return Malformed(raw=data)

View File

@ -29,14 +29,18 @@ from __future__ import annotations
import hashlib
import json
import logging
import os
import httpx
logger = logging.getLogger(__name__)
from a2a_client import (
PLATFORM_URL,
WORKSPACE_ID,
_A2A_ERROR_PREFIX,
_A2A_QUEUED_PREFIX,
_peer_names,
_peer_to_source,
discover_peer,
@ -245,6 +249,29 @@ async def tool_delegate_task(
# (the platform proxy) so the same code works for in-container and
# external (standalone molecule-mcp) callers.
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
# #2967: when the target is a poll-mode peer, the platform's
# a2a_proxy short-circuits and returns a queued envelope —
# send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX
# sentinel. The synchronous proxy path can't deliver a reply
# because the target has no public URL; fall back to the
# durable /delegate + /delegations polling path which DOES
# work for poll-mode peers (the executeDelegation goroutine
# writes to the inbox queue and the result row arrives when
# the target picks it up + replies).
#
# This is what makes external-runtime-to-external-runtime
# A2A actually deliver synchronous replies — without the
# fallback the calling agent sees the queued sentinel as
# success-with-no-text and never gets the peer's response.
if result.startswith(_A2A_QUEUED_PREFIX):
logger.info(
"tool_delegate_task: target=%s is poll-mode; "
"falling back from message/send to /delegate-poll path",
workspace_id,
)
result = await _delegate_sync_via_polling(
workspace_id, task, src or WORKSPACE_ID,
)
# Detect delegation failures — wrap them clearly so the calling agent
# can decide to retry, use another peer, or handle the task itself.

View File

@ -281,11 +281,11 @@ class TestSendA2AMessage:
to the 'unexpected response shape' error path callers retried,
peer got duplicate delegations.
Pin: poll-queued envelope returns a clean success string that does
NOT start with _A2A_ERROR_PREFIX, so callers route it through the
normal-outcome path. Verified discriminating: assert_NOT_startswith
the error prefix would FAIL on the old code (which returned an
error-prefixed string) and PASSES on the new code.
Pin: poll-queued envelope returns a string tagged with the
_A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers
can branch on the typed outcome without substring-sniffing.
Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so
the not-startswith assertion would FAIL on the old code.
"""
import a2a_client
@ -301,12 +301,13 @@ class TestSendA2AMessage:
# Discriminating: pre-fix returned a string that startswith
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
# old code. New code returns a queued-success string.
# old code. New code returns the queued-success sentinel.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
)
assert "queued" in result.lower()
assert "poll" in result.lower()
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), (
f"poll-queued envelope must use the queued sentinel; got: {result!r}"
)
# The method is included so a structured-log scraper can route by
# protocol verb if needed.
assert "message/send" in result
@ -329,6 +330,7 @@ class TestSendA2AMessage:
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "message/sendStream" in result
async def test_status_queued_without_poll_mode_still_falls_through(self):
@ -462,6 +464,98 @@ def _make_seq_mock_client(post_side_effect):
return mock_client
class TestSendA2AMessagePollMode:
"""Pin the #2967 fix: send_a2a_message recognizes the platform's
poll-mode short-circuit envelope and returns a queued sentinel
instead of an "unexpected response shape" error.
Pre-#2967 the client treated the queued envelope as malformed,
causing the calling agent to retry, which delivered the same
message twice to the (poll-mode) recipient. The Queued sentinel
lets delegate_task fall back to the durable polling path
transparently see test_delegation_sync_via_polling for the
fallback verification.
"""
async def test_poll_queued_envelope_returns_queued_sentinel(self):
# Workspace-server returns this shape (a2a_proxy.go:402-406)
# when the target workspace is registered as delivery_mode=poll
# (no public URL, typical for external molecule-mcp standalone
# runtimes).
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Sentinel + structured payload so callers can branch on it.
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
# Critically: NOT the error sentinel. Pre-#2967 it was the error path.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
# Carries enough info for the caller to log meaningfully.
assert _TEST_PEER_ID in result
assert "message/send" in result
async def test_poll_queued_envelope_method_is_recorded(self):
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "notify" in result
async def test_status_queued_without_delivery_mode_is_unexpected_shape(self):
# Server bug: only ``status=queued`` set, ``delivery_mode``
# missing. Surface as the malformed branch (not Queued) — the
# SSOT parser treats this as Malformed because the documented
# contract requires both keys.
import a2a_client
resp = _make_response(200, {"status": "queued", "method": "message/send"})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
# Must explicitly mention "or queued envelope" so an operator
# debugging this knows the parser HAS a Queued branch and the
# body just didn't match — not that the parser is missing the
# logic entirely (the pre-#2967 confusion).
assert "queued envelope" in result
async def test_platform_error_with_restart_metadata_surfaces_in_message(self):
# The platform error envelope: 503 with restart metadata.
# Surfaced as an error string that includes "restarting" so
# the caller / agent can render a softer error to the user.
import a2a_client
resp = _make_response(200, {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "restarting" in result
assert "retry_after=15" in result
class TestSendA2AMessageRetry:
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.

View File

@ -0,0 +1,455 @@
"""Tests for the A2A response SSOT parser (workspace/a2a_response.py).
Branch coverage target: 100%. Each variant of ``parse()`` exercised in
isolation, plus adversarial-input fuzzing to assert the parser never
raises.
Pre-#2967, the response shape was sniffed inline at every call site
(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` /
``"error" in data`` checks). The bare ``else`` returned an
"unexpected response shape" error which silently broke poll-mode
peers because the workspace-server's poll-queued envelope has neither
``result`` nor ``error``. The SSOT parser has an explicit ``Queued``
variant for that path and routes anything truly unrecognized to
``Malformed`` so a future server-side change fails loudly.
The "this test FAILS on pre-fix source" guarantee is enforced by
running the legacy-shape sniffer alongside the new parser in
``test_legacy_sniffer_misclassified_queued`` that test fails on
the pre-#2967 ``a2a_client.py`` shape because the legacy code
returns the unexpected-shape error path for the Queued envelope.
"""
from __future__ import annotations
import logging
from typing import Any
import pytest
import a2a_response
# ============== Fixture corpus — the canonical wire shapes ==============
# Every shape below mirrors a path the workspace-server's a2a_proxy.go
# can return. When you add a new server-side response shape, add a
# fixture entry here and a corresponding test method below.
_FIXTURES = {
"jsonrpc_success_with_text": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [{"kind": "text", "text": "hello world"}],
},
},
"jsonrpc_success_multipart": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [
{"kind": "text", "text": "first"},
{"kind": "text", "text": "second"},
],
},
},
"jsonrpc_success_no_parts": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {},
},
"jsonrpc_success_part_no_text_key": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {"parts": [{"kind": "text"}]},
},
"jsonrpc_error_with_message_and_code": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited", "code": -32003},
},
"jsonrpc_error_message_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited"},
},
"jsonrpc_error_code_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"code": -32603},
},
"jsonrpc_error_string_form": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": "string-shaped error",
},
"platform_error_with_restart": {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
},
"platform_error_plain": {
"error": "workspace not found",
},
"poll_queued_full": {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
},
"poll_queued_notify": {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
},
"poll_queued_no_method": {
"status": "queued",
"delivery_mode": "poll",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
# Server bug — status set but delivery_mode missing.
# Should be Malformed, not Queued, because the contract says both.
"status": "queued",
},
"malformed_delivery_mode_no_status": {
"delivery_mode": "poll",
},
}
# ============== Variant-by-variant coverage ==============
class TestQueuedVariant:
"""``parse()`` recognizes the workspace-server poll-mode short-circuit
envelope (a2a_proxy.go:402-406) and returns ``Queued``."""
def test_full_envelope_with_method_message_send(self):
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "poll"
def test_envelope_with_method_notify(self):
v = a2a_response.parse(_FIXTURES["poll_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
def test_envelope_missing_method_uses_unknown_sentinel(self):
# Envelope without ``method`` key — server contract should
# always set it, but the parser must not raise on absence.
v = a2a_response.parse(_FIXTURES["poll_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "unknown"
def test_status_queued_alone_is_malformed_not_queued(self):
# ``status=queued`` without ``delivery_mode=poll`` does not match
# the documented envelope. Surface as Malformed for visibility.
v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"])
assert isinstance(v, a2a_response.Malformed)
def test_delivery_mode_alone_is_malformed_not_queued(self):
v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"])
assert isinstance(v, a2a_response.Malformed)
def test_logs_info_on_queued(self, caplog):
# Comprehensive logging — operator should see queued events at INFO.
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
``Result(text, parts, raw_result)``."""
def test_simple_text_result(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"])
assert isinstance(v, a2a_response.Result)
assert v.text == "hello world"
assert len(v.parts) == 1
assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]}
def test_multipart_result_extracts_first_part_text(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"])
assert isinstance(v, a2a_response.Result)
assert v.text == "first"
assert len(v.parts) == 2
def test_result_with_no_parts(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"])
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == []
def test_part_without_text_key(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"])
assert isinstance(v, a2a_response.Result)
# No "text" key — extracted text is empty, parts list intact.
assert v.text == ""
assert len(v.parts) == 1
def test_result_non_dict_returns_text_form(self):
# Pathological but legal: ``result`` is a string instead of a dict.
v = a2a_response.parse({"result": "hello"})
assert isinstance(v, a2a_response.Result)
assert v.text == "hello"
assert v.parts == []
def test_result_takes_precedence_when_no_queued_envelope(self):
# Both ``result`` and ``error`` keys present — result wins
# because it's checked first after the Queued path.
v = a2a_response.parse({
"result": {"parts": [{"kind": "text", "text": "ok"}]},
"error": {"message": "should-be-ignored"},
})
assert isinstance(v, a2a_response.Result)
assert v.text == "ok"
def test_part_with_non_dict_first_entry(self):
# ``parts[0]`` is a string instead of a dict — parser tolerates it,
# text falls back to empty.
v = a2a_response.parse({"result": {"parts": ["bare-string"]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == ["bare-string"]
def test_part_text_value_none(self):
# ``parts[0].text`` is explicitly None — extracted as "".
v = a2a_response.parse({"result": {"parts": [{"text": None}]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
def test_parts_not_a_list(self):
# Server bug: ``parts`` is a dict instead of a list. Parser falls
# back to empty parts rather than raising.
v = a2a_response.parse({"result": {"parts": {"oops": True}}})
assert isinstance(v, a2a_response.Result)
assert v.parts == []
assert v.text == ""
class TestErrorVariant:
"""``parse()`` extracts ``error`` envelopes into ``Error`` and
annotates platform-restart metadata when present."""
def test_message_and_code(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code == -32003
assert v.restarting is False
assert v.retry_after is None
def test_message_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code is None
def test_code_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == ""
assert v.code == -32603
def test_error_string_form(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"])
assert isinstance(v, a2a_response.Error)
assert v.message == "string-shaped error"
assert v.code is None
def test_error_non_dict_non_string(self):
v = a2a_response.parse({"error": 12345})
assert isinstance(v, a2a_response.Error)
assert v.message == "12345"
def test_platform_error_with_restart_metadata(self):
v = a2a_response.parse(_FIXTURES["platform_error_with_restart"])
assert isinstance(v, a2a_response.Error)
assert "workspace agent unreachable" in v.message
assert v.restarting is True
assert v.retry_after == 15
def test_platform_error_without_restart(self):
v = a2a_response.parse(_FIXTURES["platform_error_plain"])
assert isinstance(v, a2a_response.Error)
assert v.message == "workspace not found"
assert v.restarting is False
assert v.retry_after is None
def test_error_message_with_whitespace_stripped(self):
v = a2a_response.parse({"error": {"message": " trimmed "}})
assert isinstance(v, a2a_response.Error)
assert v.message == "trimmed"
def test_non_int_code_dropped(self):
v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}})
assert isinstance(v, a2a_response.Error)
assert v.code is None
def test_non_int_retry_after_dropped(self):
v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"})
assert isinstance(v, a2a_response.Error)
assert v.retry_after is None
class TestMalformedVariant:
"""``parse()`` returns ``Malformed`` for any shape it can't classify
and logs at WARNING so operators see new server response shapes."""
def test_empty_dict(self):
v = a2a_response.parse(_FIXTURES["malformed_empty_dict"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {}
def test_unexpected_keys(self):
v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {"foo": "bar", "baz": 42}
def test_non_dict_input_list(self):
v = a2a_response.parse([1, 2, 3])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == [1, 2, 3]
def test_non_dict_input_string(self):
v = a2a_response.parse("plain string")
assert isinstance(v, a2a_response.Malformed)
assert v.raw == "plain string"
def test_non_dict_input_none(self):
v = a2a_response.parse(None)
assert isinstance(v, a2a_response.Malformed)
assert v.raw is None
def test_logs_warning_on_malformed(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert any(r.levelno == logging.WARNING for r in caplog.records)
def test_logs_warning_on_non_dict(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse("not a dict")
assert any("non-dict" in r.message for r in caplog.records)
# ============== Robustness — parser never raises ==============
_ADVERSARIAL_INPUTS: list[Any] = [
None,
True,
False,
0,
-1,
3.14,
"",
"string",
[],
[1, 2, 3],
{},
{"random": "garbage"},
{"result": None},
{"result": [1, 2, 3]},
{"result": {"parts": None}},
{"result": {"parts": [None]}},
{"result": {"parts": [{"text": []}]}},
{"error": None},
{"error": []},
{"error": {"message": None, "code": None}},
{"error": {"message": ["nested", "list"]}},
{"status": None, "delivery_mode": None, "method": None},
{"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode
{"status": "running", "delivery_mode": "poll"}, # wrong status
{"status": 42, "delivery_mode": "poll"}, # non-string status
# Deeply-nested junk
{"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}},
# Bytes (not really JSON-decodable but parser shouldn't raise)
{"result": {"parts": [{"text": b"bytes" if False else "x"}]}},
]
class TestRobustness:
"""Parser must never raise on adversarial input — every branch is total.
These cases catch regressions where a future change adds a key
access that doesn't tolerate ``None`` / wrong-type values.
"""
@pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS)
def test_parse_never_raises(self, payload):
# Single contract: parse must return one of the four variants
# regardless of input. No exception classes propagated.
v = a2a_response.parse(payload)
assert isinstance(v, (a2a_response.Result, a2a_response.Error,
a2a_response.Queued, a2a_response.Malformed))
# ============== Regression gate — pre-#2967 misclassified queued ==============
class TestRegressionGate:
"""Pin the bug that prompted the SSOT abstraction.
Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in
data`` and ``"error" in data`` the poll-queued envelope (no
result key, no error key) hit the bare-else and returned the
"unexpected response shape" error string. This test simulates the
pre-fix code path and confirms the SSOT parser correctly
distinguishes Queued from Malformed.
"""
def test_legacy_sniffer_would_return_neither_branch(self):
# The pre-#2967 logic — provided here so the regression is
# reproducible from this file alone, no archaeology needed.
envelope = _FIXTURES["poll_queued_full"]
legacy_branch = (
"result" if "result" in envelope
else "error" if "error" in envelope
else "unexpected_shape"
)
# Legacy sniff: hits the malformed branch.
assert legacy_branch == "unexpected_shape"
def test_ssot_parser_classifies_correctly(self):
# New parser: classifies as Queued.
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
def test_every_fixture_classifies_to_expected_variant(self):
# Defense in depth — pin the variant for every fixture so a
# future shape addition has to update the table here too.
expected: dict[str, type] = {
"jsonrpc_success_with_text": a2a_response.Result,
"jsonrpc_success_multipart": a2a_response.Result,
"jsonrpc_success_no_parts": a2a_response.Result,
"jsonrpc_success_part_no_text_key": a2a_response.Result,
"jsonrpc_error_with_message_and_code": a2a_response.Error,
"jsonrpc_error_message_only": a2a_response.Error,
"jsonrpc_error_code_only": a2a_response.Error,
"jsonrpc_error_string_form": a2a_response.Error,
"platform_error_with_restart": a2a_response.Error,
"platform_error_plain": a2a_response.Error,
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
"malformed_delivery_mode_no_status": a2a_response.Malformed,
}
# Every fixture must be enumerated — keeps this gate honest.
assert set(expected.keys()) == set(_FIXTURES.keys()), (
f"fixture/expected mismatch: "
f"missing-from-expected={set(_FIXTURES) - set(expected)} "
f"extra-in-expected={set(expected) - set(_FIXTURES)}"
)
for name, payload in _FIXTURES.items():
v = a2a_response.parse(payload)
assert isinstance(v, expected[name]), (
f"fixture {name!r} classified as {type(v).__name__}, "
f"expected {expected[name].__name__}"
)

View File

@ -93,6 +93,124 @@ class TestFlagOffLegacyPath:
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# #2967: Auto-fallback to polling path when target is poll-mode
# ---------------------------------------------------------------------------
class TestPollModeAutoFallback:
"""Pin the #2967 behavior: when send_a2a_message returns the queued
sentinel (target is poll-mode), tool_delegate_task transparently
falls back to _delegate_sync_via_polling which DOES work for
poll-mode peers (the executeDelegation goroutine writes to the
inbox queue and the result row arrives when the target replies).
Pre-#2967 behavior: queued sentinel was never returned (the parser
misclassified the envelope as malformed), and the calling agent
saw a DELEGATION FAILED / unexpected-response-shape error. This
test guards both against the parser regression (sentinel-emission)
and the fallback regression (sentinel-handling).
"""
async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch):
# Flag OFF — legacy send_a2a_message path. send returns the
# queued sentinel because the target is poll-mode. delegate_task
# must auto-route to _delegate_sync_via_polling so the agent
# eventually gets a real reply.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_QUEUED_PREFIX
send_calls = []
poll_calls = []
async def fake_send(workspace_id, task, source_workspace_id=None):
send_calls.append((workspace_id, task, source_workspace_id))
return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send"
async def fake_polling(workspace_id, task, src):
poll_calls.append((workspace_id, task, src))
return "real response from poll-mode peer"
async def fake_discover(*_a, **_kw):
return {"name": "poll-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity):
result = await a2a_tools.tool_delegate_task(
"ws-target", "task body", source_workspace_id="ws-self"
)
# send was tried first
assert len(send_calls) == 1
# …then fallback fired automatically
assert len(poll_calls) == 1
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
# Caller sees the real reply, NOT the queued sentinel and NOT
# a DELEGATION FAILED string.
assert result == "real response from poll-mode peer"
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
# Push-mode peer returns a normal text reply — fallback path
# MUST NOT fire (no extra round-trip cost).
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
async def fake_send(*_a, **_kw):
return "normal reply"
async def fake_discover(*_a, **_kw):
return {"name": "push-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert result == "normal reply"
poll_mock.assert_not_called()
async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch):
# Genuine error (not queued) — must surface as DELEGATION FAILED,
# not silently retried via the polling path.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_ERROR_PREFIX
async def fake_send(*_a, **_kw):
return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]"
async def fake_discover(*_a, **_kw):
return {"name": "broken-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert "DELEGATION FAILED" in result
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# Flag-on: dispatch failures
# ---------------------------------------------------------------------------