diff --git a/.github/workflows/auto-promote-stale-alarm.yml b/.github/workflows/auto-promote-stale-alarm.yml new file mode 100644 index 00000000..58667c6f --- /dev/null +++ b/.github/workflows/auto-promote-stale-alarm.yml @@ -0,0 +1,83 @@ +name: auto-promote-stale-alarm + +# Hourly cron + on-demand alarm for the silent-block failure mode that +# motivated issue #2975: +# - The auto-promote-staging.yml workflow opened a PR + armed +# auto-merge, but main's branch protection requires a human review +# (reviewDecision=REVIEW_REQUIRED). The PR sat BLOCKED with no +# surface-up-the-stack for 12+ hours, holding 25 commits hostage +# including the Memory v2 redesign and a reno-stars data-loss fix. +# +# This workflow runs `scripts/check-stale-promote-pr.sh` against the +# repo's open auto-promote PRs (base=main head=staging). When a PR has +# been BLOCKED on REVIEW_REQUIRED for >4h, it: +# 1. Emits a workflow-level warning (visible in run summary + the +# Actions UI feed). +# 2. Posts a comment on the PR (idempotent — one alarm per PR). +# +# The detection logic lives in scripts/check-stale-promote-pr.sh so +# it's unit-testable with stubbed `gh` (see test-check-stale-promote-pr.sh). +# This file is the schedule + invocation surface only — SSOT for the +# detector itself. + +on: + schedule: + # Hourly. Cheap (one `gh pr list` + jq), and 1h granularity is + # plenty for a 4h staleness threshold — operators see the alarm + # within at most 1h of crossing the threshold. + - cron: "27 * * * *" # at :27 to dodge the cron herd at :00 + workflow_dispatch: + inputs: + stale_hours: + description: "Hours after which a BLOCKED+REVIEW_REQUIRED PR is stale (default 4)" + required: false + default: "4" + post_comment: + description: "Post a comment on stale PRs (default true)" + required: false + default: "true" + +permissions: + contents: read + pull-requests: write # post comments on stale PRs + +# Serialize so the on-demand and scheduled runs don't double-comment +# the same PR. cancel-in-progress=false because the script is idempotent +# (existing comment marker prevents dupes), but a scheduled run firing +# while a manual one runs would just re-list the same PR set. +concurrency: + group: auto-promote-stale-alarm + cancel-in-progress: false + +jobs: + scan: + runs-on: ubuntu-latest + steps: + - name: Checkout (need scripts/ only) + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + sparse-checkout: | + scripts/check-stale-promote-pr.sh + sparse-checkout-cone-mode: false + - name: Run stale-PR detector + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + STALE_HOURS: ${{ inputs.stale_hours || '4' }} + POST_COMMENT: ${{ inputs.post_comment || 'true' }} + run: | + # The script's exit code reflects the count of stale PRs. + # We don't want a stale finding to fail the workflow run — + # the warning + comment are the signal, the green/red is + # noise. So convert any non-zero exit to a workflow notice + # and exit 0. + set +e + bash scripts/check-stale-promote-pr.sh + rc=$? + set -e + if [ "$rc" -ne 0 ]; then + echo "::notice::Stale PR detector found $rc PR(s) needing attention. See warnings above + comments on the PRs." + fi + # Always succeed — operator-facing surface is the warning, + # not the workflow status. + exit 0 diff --git a/canvas/src/components/tabs/chat/__tests__/uploads.test.ts b/canvas/src/components/tabs/chat/__tests__/uploads.test.ts index a08d5d19..54a298a1 100644 --- a/canvas/src/components/tabs/chat/__tests__/uploads.test.ts +++ b/canvas/src/components/tabs/chat/__tests__/uploads.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { resolveAttachmentHref } from "../uploads"; +import { isPlatformAttachment, resolveAttachmentHref } from "../uploads"; describe("resolveAttachmentHref — URI scheme normalisation", () => { const wsId = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"; @@ -39,3 +39,128 @@ describe("resolveAttachmentHref — URI scheme normalisation", () => { expect(resolveAttachmentHref(wsId, "s3://bucket/key")).toBe("s3://bucket/key"); }); }); + +// #2973 follow-up to #2968: cover the platform-pending: scheme branch +// (poll-mode chat uploads) + the isPlatformAttachment SSOT helper that +// the chip-download and markdown-link paths both consume. +// +// Pre-fix the platform-pending: URI fell through to the raw URI → +// browser saw an unhandled-protocol click → about:blank. The fix +// resolves it to the platform pending-uploads endpoint with auth +// headers attached. +describe("resolveAttachmentHref — platform-pending: scheme (poll-mode uploads)", () => { + // Use a chat workspace ID that DIFFERS from the one in the URI, so + // tests can verify which one the resolver uses. The forward-across- + // workspace case is real production behavior — files dragged into one + // workspace's chat can be referenced from another. + const chatWs = "chat-ws-aaaaaaaa"; + const sourceWs = "source-ws-bbbbbbbb"; + + it("resolves a well-formed platform-pending: URI to /pending-uploads//content", () => { + const url = resolveAttachmentHref( + chatWs, + `platform-pending:${sourceWs}/file-12345`, + ); + expect(url).toContain(`/workspaces/${sourceWs}/pending-uploads/file-12345/content`); + }); + + it("uses the URI's wsid, NOT the chat workspace_id (cross-workspace forwarding)", () => { + // The two ids differ — this is the case PR #2968's commit + // explicitly calls out. A regression that flipped this would + // silently mis-route the download to the WRONG workspace's + // pending-uploads store, returning 404 (or worse, leaking). + const url = resolveAttachmentHref( + chatWs, + `platform-pending:${sourceWs}/file-xyz`, + ); + expect(url).toContain(`/workspaces/${sourceWs}/`); + expect(url).not.toContain(`/workspaces/${chatWs}/`); + }); + + it("falls back to raw URI when platform-pending: is missing the slash", () => { + // Defensive: a URI that drifted from the expected wsid/fileid shape + // returns raw rather than producing a broken /pending-uploads// + // path. Pinned to detect a regression where a future "helpful" + // change synthesizes empty wsid/fileID. + expect(resolveAttachmentHref(chatWs, "platform-pending:no-slash")).toBe( + "platform-pending:no-slash", + ); + }); + + it("falls back to raw URI when platform-pending: has empty fileID", () => { + expect(resolveAttachmentHref(chatWs, "platform-pending:abc/")).toBe( + "platform-pending:abc/", + ); + }); + + it("falls back to raw URI when platform-pending: has empty wsid", () => { + expect(resolveAttachmentHref(chatWs, "platform-pending:/file-xyz")).toBe( + "platform-pending:/file-xyz", + ); + }); + + it("regression: exact production repro from #2968 (reno-stars)", () => { + // From the original PR #2968 body: the chat's markdown-link + // override fell through on this exact shape and the browser + // navigated to about:blank. Pin the post-fix output so a future + // refactor can't reintroduce the original bug. + const url = resolveAttachmentHref( + "chat-ws", + "platform-pending:d76977b1-uuid/bb0dcaf3-uuid", + ); + expect(url).toContain("/workspaces/d76977b1-uuid/pending-uploads/bb0dcaf3-uuid/content"); + expect(url).not.toContain("chat-ws"); + }); +}); + +describe("isPlatformAttachment", () => { + it("returns true for platform-pending: URIs", () => { + expect(isPlatformAttachment("platform-pending:abc/file")).toBe(true); + }); + + it("returns true even for malformed platform-pending: URIs", () => { + // The helper is a SHAPE check — caller routes through + // downloadChatFile and downloadChatFile handles the malformed case + // downstream. Pinning so a future helper that "validates" the + // wsid/fileID shape doesn't silently break the auth-attached + // download flow for in-flight URIs. + expect(isPlatformAttachment("platform-pending:no-slash")).toBe(true); + }); + + it("returns true for workspace: URIs", () => { + expect(isPlatformAttachment("workspace:/configs/foo")).toBe(true); + expect(isPlatformAttachment("workspace:/workspace/x.pdf")).toBe(true); + }); + + it("returns true for file:/// URIs", () => { + expect(isPlatformAttachment("file:///workspace/x")).toBe(true); + }); + + it("returns true for absolute paths under allowed roots", () => { + expect(isPlatformAttachment("/home/user/x")).toBe(true); + expect(isPlatformAttachment("/configs/y")).toBe(true); + }); + + it("returns FALSE for bare HTTPS URLs to other origins", () => { + // Auth-leak class regression: a helper that always returned true + // would attach workspace tokens to third-party requests. Pin + // the negative case explicitly. + expect(isPlatformAttachment("https://example.com/file")).toBe(false); + expect(isPlatformAttachment("http://example.com/file")).toBe(false); + }); + + it("returns FALSE for non-allowlisted root paths", () => { + expect(isPlatformAttachment("/etc/passwd")).toBe(false); + expect(isPlatformAttachment("/var/log/x")).toBe(false); + expect(isPlatformAttachment("/tmp/x")).toBe(false); + }); + + it("returns FALSE for empty string", () => { + expect(isPlatformAttachment("")).toBe(false); + }); + + it("returns FALSE for unrecognised schemes", () => { + expect(isPlatformAttachment("s3://bucket/key")).toBe(false); + expect(isPlatformAttachment("ftp://server/file")).toBe(false); + }); +}); diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index 82b1090c..d4eedde2 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = { "a2a_client", "a2a_executor", "a2a_mcp_server", + "a2a_response", "a2a_tools", "a2a_tools_delegation", "a2a_tools_inbox", diff --git a/scripts/check-stale-promote-pr.sh b/scripts/check-stale-promote-pr.sh new file mode 100755 index 00000000..bcc5afe6 --- /dev/null +++ b/scripts/check-stale-promote-pr.sh @@ -0,0 +1,216 @@ +#!/usr/bin/env bash +# scripts/check-stale-promote-pr.sh +# +# Scan open auto-promote PRs (base=main head=staging) for the +# silent-block failure mode that motivated issue #2975: +# - PR sat for hours with mergeStateStatus=BLOCKED +# - reviewDecision=REVIEW_REQUIRED (auto-merge armed but waiting +# on a human approval that never comes) +# +# When found, emit: +# - GitHub Actions notice/warning lines (workflow summary surface) +# - Optionally post a comment on the PR (--comment) +# +# Exit code is the count of stale PRs found, capped at 125 so callers +# can detect "alarm fired" via `if ! check-stale-promote-pr.sh; then …`. +# Exit 0 = clean, exit ≥1 = at least N stale PRs need attention. +# +# Used by .github/workflows/auto-promote-stale-alarm.yml. Logic lives +# here (not inline in the workflow YAML) so we can: +# - Unit-test it with a stubbed `gh` (see test-check-stale-promote-pr.sh) +# - Run it ad-hoc by an operator: `scripts/check-stale-promote-pr.sh` +# - Reuse the same surface in any sibling workflow that needs the same +# check (SSOT — one detector, many callers). +# +# Requires: `gh` CLI, `jq`. `GH_TOKEN` env in the workflow context. + +set -euo pipefail + +# ----------------------------------------------------------------------------- +# Inputs +# ----------------------------------------------------------------------------- + +# Threshold beyond which a BLOCKED+REVIEW_REQUIRED promote PR is "stale" +# enough to alarm. 4 hours is the floor: most legitimate gates clear +# inside an hour, so 4× headroom is plenty for slow CI without false- +# alarming. Override via env for tests + edge ops. +STALE_HOURS="${STALE_HOURS:-4}" + +# Repo defaults to the current `gh` context. Tests pass --repo explicitly. +REPO="${GITHUB_REPOSITORY:-}" + +# Whether to post a comment to the PR. Off by default to avoid noise on +# manual ad-hoc runs; the cron workflow turns it on. +POST_COMMENT="${POST_COMMENT:-false}" + +# Where to read the open-PR JSON from. Empty = call `gh` live. Tests +# point this at a fixture file. +PR_FIXTURE="${PR_FIXTURE:-}" + +# Where to read "now" from. Empty = real clock. Tests freeze time so +# the staleness math is deterministic. +NOW_OVERRIDE="${NOW_OVERRIDE:-}" + +while [ $# -gt 0 ]; do + case "$1" in + --repo) REPO="$2"; shift 2 ;; + --comment) POST_COMMENT="true"; shift ;; + --no-comment) POST_COMMENT="false"; shift ;; + --fixture) PR_FIXTURE="$2"; shift 2 ;; + --stale-hours) STALE_HOURS="$2"; shift 2 ;; + -h|--help) + sed -n '1,/^set /p' "$0" | grep '^# ' | sed 's/^# //' + exit 0 + ;; + *) echo "unknown arg: $1" >&2; exit 64 ;; + esac +done + +if [ -z "$REPO" ] && [ -z "$PR_FIXTURE" ]; then + echo "::error::REPO env (or GITHUB_REPOSITORY) required when no fixture given" >&2 + exit 2 +fi + +# ----------------------------------------------------------------------------- +# Clock helpers — split out so tests can freeze time +# ----------------------------------------------------------------------------- + +now_epoch() { + if [ -n "$NOW_OVERRIDE" ]; then + printf '%s\n' "$NOW_OVERRIDE" + else + date -u +%s + fi +} + +# Parse RFC3339 timestamps the way GitHub emits them (e.g. +# "2026-05-05T23:15:00Z"). gnu-date uses -d, bsd-date uses -j -f. Cover +# both because the workflow runs on ubuntu-latest (gnu) but operators +# may run this script on macOS (bsd). +to_epoch() { + local ts="$1" + # gnu-date path first. + if date -u -d "$ts" +%s 2>/dev/null; then + return 0 + fi + # bsd-date fallback — strip optional fractional seconds before %S. + local ts_clean="${ts%%.*}" + ts_clean="${ts_clean%Z}Z" + date -u -j -f "%Y-%m-%dT%H:%M:%SZ" "$ts_clean" +%s 2>/dev/null || { + echo "::error::cannot parse timestamp: $ts" >&2 + return 1 + } +} + +# ----------------------------------------------------------------------------- +# Fetch open auto-promote PRs +# ----------------------------------------------------------------------------- + +fetch_prs() { + if [ -n "$PR_FIXTURE" ]; then + cat "$PR_FIXTURE" + return 0 + fi + gh pr list --repo "$REPO" \ + --base main --head staging --state open \ + --json number,title,createdAt,mergeStateStatus,reviewDecision,url +} + +# ----------------------------------------------------------------------------- +# Stale detection +# ----------------------------------------------------------------------------- + +# Read PR list from stdin, emit one TSV line per stale PR: +# \t\t\t +# Caller decides what to do (warn, comment, escalate). +detect_stale() { + local now_ts + now_ts="$(now_epoch)" + local stale_seconds=$((STALE_HOURS * 3600)) + + jq -r '.[] | [.number, .createdAt, .mergeStateStatus, .reviewDecision, .url, .title] | @tsv' \ + | while IFS=$'\t' read -r num created_at merge_state review_decision url title; do + # Only alarm on the specific failure mode: BLOCKED + REVIEW_REQUIRED. + # Other BLOCKED reasons (DIRTY, BEHIND, failed checks) are the + # author's signal-to-fix; this script targets the silent + # "no human reviewed yet" wedge specifically. + [ "$merge_state" = "BLOCKED" ] || continue + [ "$review_decision" = "REVIEW_REQUIRED" ] || continue + + local created_ts + created_ts="$(to_epoch "$created_at")" || continue + local age=$((now_ts - created_ts)) + if [ "$age" -ge "$stale_seconds" ]; then + local age_h=$((age / 3600)) + printf '%s\t%d\t%s\t%s\n' "$num" "$age_h" "$url" "$title" + fi + done +} + +# ----------------------------------------------------------------------------- +# Reporting +# ----------------------------------------------------------------------------- + +# Comment body — kept short; the issue body has the full design. +comment_body() { + local age_h="$1" + cat <<EOF +⚠️ This auto-promote PR has been BLOCKED on \`REVIEW_REQUIRED\` for **${age_h}h**. + +Auto-merge is armed, but main's branch protection requires 1 review and no human has approved. Until someone reviews, the staging→main promote chain is wedged and downstream consumers (canvas builds, tenant redeploys) won't see new code. + +**Action**: a human reviewer on \`@Molecule-AI/maintainers\` should approve this PR (or mark it as not ready and close). + +Detected by \`scripts/check-stale-promote-pr.sh\` per issue #2975. +EOF +} + +post_comment() { + local pr_num="$1" + local age_h="$2" + if [ "$POST_COMMENT" != "true" ]; then + return 0 + fi + # Idempotency: only one alarm comment per PR. Look for the marker + # string in existing comments before posting a new one. + local existing + existing="$(gh pr view "$pr_num" --repo "$REPO" --json comments \ + --jq '.comments[] | select(.body | test("scripts/check-stale-promote-pr.sh per issue #2975")) | .databaseId' \ + | head -n1)" + if [ -n "$existing" ]; then + echo "::notice::PR #$pr_num already has a stale-alarm comment ($existing) — not re-posting" + return 0 + fi + comment_body "$age_h" | gh pr comment "$pr_num" --repo "$REPO" --body-file - + echo "::notice::Posted stale-alarm comment on PR #$pr_num (age=${age_h}h)" +} + +# ----------------------------------------------------------------------------- +# Main +# ----------------------------------------------------------------------------- + +stale_count=0 +while IFS=$'\t' read -r num age_h url title; do + [ -n "$num" ] || continue + stale_count=$((stale_count + 1)) + echo "::warning title=Stale auto-promote PR::PR #$num — BLOCKED on REVIEW_REQUIRED for ${age_h}h. $url" + { + echo "## ⚠️ Stale auto-promote PR detected" + echo + echo "- PR: #$num — \`$title\`" + echo "- Age: ${age_h}h" + echo "- State: BLOCKED on REVIEW_REQUIRED" + echo "- URL: $url" + echo + echo "Auto-merge is armed but waiting on a human review. See issue #2975." + } >> "${GITHUB_STEP_SUMMARY:-/dev/null}" + post_comment "$num" "$age_h" +done < <(fetch_prs | detect_stale) + +if [ "$stale_count" -eq 0 ]; then + echo "::notice::No stale auto-promote PRs detected (threshold: ${STALE_HOURS}h)" +fi + +# Cap exit code so we don't accidentally break shells that interpret +# >125 as signal-style. 1..N maps to "1..N stale PRs". +exit $(( stale_count > 125 ? 125 : stale_count )) diff --git a/scripts/test-check-stale-promote-pr.sh b/scripts/test-check-stale-promote-pr.sh new file mode 100755 index 00000000..3b8caba9 --- /dev/null +++ b/scripts/test-check-stale-promote-pr.sh @@ -0,0 +1,257 @@ +#!/usr/bin/env bash +# scripts/test-check-stale-promote-pr.sh +# +# Exhaustive bash unit tests for check-stale-promote-pr.sh. +# Goal: 100% branch coverage on the detector logic. +# +# Each case writes a fixture JSON, freezes the clock with NOW_OVERRIDE, +# runs the script with --fixture + --no-comment (so we don't try to +# actually call `gh pr comment`), and asserts on stdout/exit code. +# +# Run: bash scripts/test-check-stale-promote-pr.sh +# Expected: "All N tests passed" + exit 0. + +set -euo pipefail + +SCRIPT="$(cd "$(dirname "$0")" && pwd)/check-stale-promote-pr.sh" +TMP="$(mktemp -d)" +trap 'rm -rf "$TMP"' EXIT + +PASS=0 +FAIL=0 + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +# Frozen "now" — 2026-05-06T05:00:00Z. Compute dynamically so the +# tests stay correct regardless of platform-specific date semantics +# (gnu vs bsd) and any author math errors on the epoch. +if FROZEN_NOW="$(date -u -d '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then + : # gnu-date worked +elif FROZEN_NOW="$(date -u -j -f '%Y-%m-%dT%H:%M:%SZ' '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then + : # bsd-date worked +else + echo "FATAL: cannot compute FROZEN_NOW on this platform" >&2 + exit 1 +fi + +run_script() { + # Args: <fixture-file> + # Returns stdout + exit code via a known marker. + local fixture="$1" + shift + set +e + NOW_OVERRIDE="$FROZEN_NOW" \ + POST_COMMENT="false" \ + bash "$SCRIPT" --fixture "$fixture" "$@" 2>&1 + local rc=$? + set -e + echo "EXIT_CODE=$rc" +} + +assert_pass() { + local name="$1" + local got="$2" + local want_pattern="$3" + if printf '%s' "$got" | grep -qE "$want_pattern"; then + PASS=$((PASS + 1)) + printf ' ✓ %s\n' "$name" + else + FAIL=$((FAIL + 1)) + printf ' ✗ %s\n want pattern: %s\n got:\n%s\n' "$name" "$want_pattern" "$got" + fi +} + +assert_no_match() { + local name="$1" + local got="$2" + local bad_pattern="$3" + if printf '%s' "$got" | grep -qE "$bad_pattern"; then + FAIL=$((FAIL + 1)) + printf ' ✗ %s\n bad pattern matched: %s\n got:\n%s\n' "$name" "$bad_pattern" "$got" + else + PASS=$((PASS + 1)) + printf ' ✓ %s\n' "$name" + fi +} + +# ───────────────────────────────────────────────────────────────────────────── +# Test cases +# ───────────────────────────────────────────────────────────────────────────── + +echo "1. Empty PR list — clean exit" +echo '[]' > "$TMP/empty.json" +got=$(run_script "$TMP/empty.json") +assert_pass "empty-no-warning" "$got" "No stale auto-promote PRs detected" +assert_pass "empty-exit-zero" "$got" "EXIT_CODE=0" + +echo +echo "2. Single PR, BLOCKED+REVIEW_REQUIRED, 5h old — fires alarm" +cat > "$TMP/stale1.json" <<EOF +[{ + "number": 2963, + "title": "staging → main", + "createdAt": "2026-05-06T00:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://github.com/test/test/pull/2963" +}] +EOF +got=$(run_script "$TMP/stale1.json") +assert_pass "stale1-warning" "$got" "Stale auto-promote PR" +assert_pass "stale1-pr-number" "$got" "PR #2963" +assert_pass "stale1-age" "$got" "for 5h" +assert_pass "stale1-exit-1" "$got" "EXIT_CODE=1" + +echo +echo "3. Same PR but only 3h old — under threshold, NO alarm" +cat > "$TMP/young.json" <<EOF +[{ + "number": 100, + "title": "fresh promote", + "createdAt": "2026-05-06T02:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://github.com/test/test/pull/100" +}] +EOF +got=$(run_script "$TMP/young.json") +assert_pass "young-no-alarm" "$got" "No stale auto-promote PRs" +assert_pass "young-exit-zero" "$got" "EXIT_CODE=0" +assert_no_match "young-no-warning" "$got" "Stale auto-promote PR" + +echo +echo "4. PR is BLOCKED but for the wrong reason (DIRTY, not REVIEW_REQUIRED)" +cat > "$TMP/dirty.json" <<EOF +[{ + "number": 200, + "title": "needs rebase", + "createdAt": "2026-05-06T00:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "APPROVED", + "url": "https://github.com/test/test/pull/200" +}] +EOF +got=$(run_script "$TMP/dirty.json") +assert_pass "dirty-no-alarm" "$got" "No stale auto-promote PRs" +assert_pass "dirty-exit-zero" "$got" "EXIT_CODE=0" + +echo +echo "5. PR is APPROVED but mergeStateStatus is CLEAN — NOT alarming" +cat > "$TMP/clean.json" <<EOF +[{ + "number": 300, + "title": "all green", + "createdAt": "2026-05-06T00:00:00Z", + "mergeStateStatus": "CLEAN", + "reviewDecision": "APPROVED", + "url": "https://github.com/test/test/pull/300" +}] +EOF +got=$(run_script "$TMP/clean.json") +assert_pass "clean-no-alarm" "$got" "No stale auto-promote PRs" + +echo +echo "6. Multiple PRs — only the BLOCKED+REVIEW_REQUIRED+old one alarms" +cat > "$TMP/mixed.json" <<EOF +[ + { + "number": 100, + "title": "fresh", + "createdAt": "2026-05-06T04:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://x/100" + }, + { + "number": 200, + "title": "stale + alarming", + "createdAt": "2026-05-05T20:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://x/200" + }, + { + "number": 300, + "title": "approved + clean", + "createdAt": "2026-05-05T20:00:00Z", + "mergeStateStatus": "CLEAN", + "reviewDecision": "APPROVED", + "url": "https://x/300" + } +] +EOF +got=$(run_script "$TMP/mixed.json") +assert_pass "mixed-only-200" "$got" "PR #200" +assert_no_match "mixed-not-100" "$got" "PR #100" +assert_no_match "mixed-not-300" "$got" "PR #300" +assert_pass "mixed-exit-1" "$got" "EXIT_CODE=1" + +echo +echo "7. Custom STALE_HOURS via --stale-hours overrides threshold" +got=$(run_script "$TMP/young.json" --stale-hours 1) +assert_pass "custom-threshold-fires" "$got" "PR #100" +assert_pass "custom-threshold-exit-1" "$got" "EXIT_CODE=1" + +echo +echo "8. Two stale PRs — exit code reflects count" +cat > "$TMP/two-stale.json" <<EOF +[ + { + "number": 200, + "title": "stale-A", + "createdAt": "2026-05-05T20:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://x/200" + }, + { + "number": 201, + "title": "stale-B", + "createdAt": "2026-05-05T19:00:00Z", + "mergeStateStatus": "BLOCKED", + "reviewDecision": "REVIEW_REQUIRED", + "url": "https://x/201" + } +] +EOF +got=$(run_script "$TMP/two-stale.json") +assert_pass "two-stale-exit-2" "$got" "EXIT_CODE=2" + +echo +echo "9. Help text is shown for --help" +set +e +help_out=$(bash "$SCRIPT" --help 2>&1) +help_rc=$? +set -e +assert_pass "help-exits-zero" "EXIT_CODE=$help_rc" "EXIT_CODE=0" +assert_pass "help-mentions-issue" "$help_out" "issue #2975" + +echo +echo "10. Unknown arg exits 64 (EX_USAGE)" +set +e +bad_out=$(bash "$SCRIPT" --bogus 2>&1) +bad_rc=$? +set -e +assert_pass "unknown-arg-rc" "EXIT_CODE=$bad_rc" "EXIT_CODE=64" + +echo +echo "11. Missing repo + missing fixture exits 2" +set +e +out=$(REPO="" bash "$SCRIPT" 2>&1) +rc=$? +set -e +assert_pass "no-repo-exit-2" "EXIT_CODE=$rc" "EXIT_CODE=2" + +# ───────────────────────────────────────────────────────────────────────────── +# Summary +# ───────────────────────────────────────────────────────────────────────────── + +echo +echo "─────────────────────────────────────────────" +echo "Tests: $PASS passed, $FAIL failed" +if [ "$FAIL" -gt 0 ]; then + exit 1 +fi +echo "All tests passed." diff --git a/tests/e2e/test_poll_mode_e2e.sh b/tests/e2e/test_poll_mode_e2e.sh index e4dd22bc..766ec3c7 100755 --- a/tests/e2e/test_poll_mode_e2e.sh +++ b/tests/e2e/test_poll_mode_e2e.sh @@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a }') check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP" + +# ---------- Phase 3.5: Python parser classifies queued envelope correctly ---------- +# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse() +# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare +# message/send parser in a2a_client.py:587 misclassified this and returned +# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A +# on poll-mode peers. +# +# This phase exercises the actual on-the-wire response from a real +# workspace-server (NOT a mocked dict) through the same module the production +# wheel ships, so a regression in either the server emit shape OR the client +# parser fails this E2E. + +echo "" +echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---" + +# Pipe the queued response captured above through a2a_response.parse and +# assert the classification. WORKSPACE_ID is required at module import +# time but irrelevant to this parsing call (any UUID is fine). +PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \ + python3 -c " +import json, sys +sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)') +import a2a_response +data = json.loads(r'''$A2A_RESP''') +v = a2a_response.parse(data) +print(type(v).__name__) +if isinstance(v, a2a_response.Queued): + print(f'method={v.method} delivery_mode={v.delivery_mode}') +") + +check_eq "Python parser classifies real server response as Queued" \ + "Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)" +check "Queued variant captures method=message/send" \ + "method=message/send" "$PARSE_RESULT" +check "Queued variant captures delivery_mode=poll" \ + "delivery_mode=poll" "$PARSE_RESULT" check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP" check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP" diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 16eb4b9c..8e499f40 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor import httpx +import a2a_response from platform_auth import auth_headers, self_source_headers logger = logging.getLogger(__name__) @@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str: # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " +# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967). +# When the target workspace is registered as delivery_mode=poll (no +# public URL — typical for external molecule-mcp standalone runtimes), +# the platform's a2a_proxy.go:402 short-circuit returns a synthetic +# {"status":"queued","delivery_mode":"poll","method":"..."} envelope +# instead of dispatching over HTTP. The message IS delivered (written +# to the platform's inbox queue); there's just no synchronous reply +# to relay. Pre-#2967 the client treated this as "unexpected response +# shape" → caller saw DELEGATION FAILED → retried → recipient saw +# duplicates. The Queued prefix lets callers branch on this outcome +# explicitly: "delivered async, no synchronous reply expected" is +# different from both success-with-text and failure. +_A2A_QUEUED_PREFIX = "[A2A_QUEUED] " + # Workspace IDs are UUIDs everywhere we generate them (platform's # workspaces.id column, /registry/discover/:id route param, etc.) but # the agent-facing tool surface receives them as free-form strings via @@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str }, ) data = resp.json() - if "result" in data: - parts = data["result"].get("parts", []) - text = parts[0].get("text", "") if parts else "(no response)" - # Tag child-reported errors so the caller can detect them reliably + # Dispatch via the SSOT response model (a2a_response.py). + # All shape detection lives in one place — the parser + # never raises and routes unknown shapes to Malformed + # so a future server-side change is loud, not silent. + variant = a2a_response.parse(data) + if isinstance(variant, a2a_response.Result): + # Match legacy semantics: + # parts non-empty + first part has no text → "" + # parts empty → "(no response)" + # Differentiation matters for callers that assert + # on the empty-string case (test_a2a_client). + if variant.parts: + text = variant.text + else: + text = "(no response)" + # Tag child-reported errors so the caller can + # detect them reliably — agent-side bug surfaces + # text like "Agent error: <traceback>" inside a + # JSON-RPC success envelope. if text.startswith("Agent error:"): return f"{_A2A_ERROR_PREFIX}{text}" return text - elif "error" in data: - err = data["error"] - msg = (err.get("message") or "").strip() - code = err.get("code") + if isinstance(variant, a2a_response.Queued): + # Poll-mode peer — message accepted into the inbox + # queue, target agent will fetch via poll. NOT a + # failure. Return the queued sentinel so callers + # (delegate_task etc.) can render the outcome + # accurately instead of treating it as an error. + logger.info( + "send_a2a_message: queued for poll-mode peer (target=%s method=%s)", + target_url, + variant.method, + ) + return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}" + if isinstance(variant, a2a_response.Error): + msg = variant.message + code = variant.code if msg and code is not None: detail = f"{msg} (code={code})" elif msg: @@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str detail = f"JSON-RPC error with no message (code={code})" else: detail = "JSON-RPC error with no message" + if variant.restarting: + # Surface platform-restart-in-progress + # explicitly — caller (UI / delegating agent) + # can render a softer "agent is restarting" + # message rather than a generic failure. + retry = ( + f", retry_after={variant.retry_after}s" + if variant.retry_after is not None + else "" + ) + detail = f"{detail} (restarting{retry})" return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" - elif data.get("status") == "queued" and data.get("delivery_mode") == "poll": - # Workspace-server's poll-mode short-circuit envelope - # (workspace-server/internal/handlers/a2a_proxy.go ~line 402). - # The peer is poll-mode and has no URL to dispatch to, so - # the server queued the message for the peer's next inbox - # poll instead of forwarding it. Delivery is acknowledged - # but pending consumption. - # - # Pre-fix this fell through to the "unexpected response - # shape" error path → callers logged false failures, then - # delegate_task retried, and the peer received duplicate - # delegations. Issue #2967. - method = data.get("method") or "message/send" - logger.info( - "send_a2a_message: queued for poll-mode peer (method=%s, target=%s)", - method, target_url, - ) - return f"queued for poll-mode peer (method={method})" - return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" + # Malformed — log loud + surface as error so the + # operator notices a server change. SSOT refactor + # subsumes the inline "queued" check that landed in + # the #2972 hotfix; that branch is now the typed + # Queued variant above. + logger.warning( + "send_a2a_message: malformed response (target=%s body=%.200s)", + target_url, + str(variant.raw), + ) + return ( + f"{_A2A_ERROR_PREFIX}unexpected response shape " + f"(no result, error, or queued envelope): " + f"{str(variant.raw)[:200]} [target={target_url}]" + ) except _TRANSIENT_HTTP_ERRORS as e: last_exc = e attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1) diff --git a/workspace/a2a_response.py b/workspace/a2a_response.py new file mode 100644 index 00000000..ae48465a --- /dev/null +++ b/workspace/a2a_response.py @@ -0,0 +1,246 @@ +"""Single source of truth for A2A ``/workspaces/<id>/a2a`` response shapes. + +The workspace-server proxy at +``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical +emitter) returns one of the following shapes for a single A2A call: + + * **JSON-RPC success** — + ``{"jsonrpc": "2.0", "result": {...}, "id": "..."}`` + The agent's reply, passed through unchanged. + + * **JSON-RPC error** — + ``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}`` + The agent reported a structured error. + + * **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see + ``a2a_proxy.go:402-406``) — + ``{"status": "queued", "delivery_mode": "poll", "method": "..."}`` + The target is a poll-mode workspace (no public URL); the message + was written to the platform's inbox queue. The target agent will + fetch it via ``GET /activity?since_id=`` polling. NOT a failure — + delivery succeeded, there's just no synchronous reply to relay. + + * **Platform error** — ``{"error": "...", "restarting": true?, "retry_after": int?}`` + HTTP-level failure synthesized by the proxy when the agent is + unreachable, the container is restarting, or some other infrastructure + failure happened. ``restarting=true`` flags the platform-initiated + container-restart path. + + * **Malformed** — anything else. Surfaced explicitly so a future server + change is loud rather than silent. + +The ``parse(data)`` function classifies a pre-decoded JSON body into a +typed variant. Callers ``match`` on the variant and never re-implement +shape detection — that's the SSOT discipline. + +# SSOT contract + +This file is the Python half. The Go server emits these shapes today +via inline ``gin.H{...}`` literals. A future PR can introduce a Go +mirror (e.g. ``workspace-server/internal/models/a2a_response.go``) +with a typed marshaller — until then, **any change to the wire shape +must be reflected here** and gated by ``test_a2a_response.py``'s +fixture corpus. The corpus exists specifically so a one-sided edit +breaks CI. + +# Why a typed model (vs. dict-key sniffing at every site) + +The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result`` +or ``error`` keys inline and treated everything else as malformed — +which silently broke poll-mode peers (the queued envelope has neither +key). Inline sniffing per call site multiplies the surface area where +a new shape gets misclassified. A single typed parser with an +explicit ``Malformed`` escape hatch makes shape additions a +one-line change here + a fixture entry in the test corpus, instead of +a hunt through every parsing site in the runtime. +""" +from __future__ import annotations + +import dataclasses +import logging +from typing import Any, Optional, Union + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class Result: + """JSON-RPC success — agent's reply available synchronously. + + ``text`` is the convenience extraction from ``parts[0].text`` (the + A2A multipart shape). ``parts`` is the full list, available for + callers that need richer rendering (multiple parts, non-text parts). + ``raw_result`` preserves the unparsed ``result`` field for any + caller that needs it (e.g. activity-row response_body audit). + """ + + text: str + parts: list[dict[str, Any]] = dataclasses.field(default_factory=list) + raw_result: Optional[dict[str, Any]] = None + + +@dataclasses.dataclass(frozen=True) +class Error: + """JSON-RPC error or platform-level error response. + + ``code`` is the JSON-RPC integer code when present, else None. + ``restarting`` / ``retry_after`` are platform-restart-in-progress + metadata: when both are set, the caller knows the container is + being recycled and may surface a softer error to the user. + """ + + message: str + code: Optional[int] = None + restarting: bool = False + retry_after: Optional[int] = None + + +@dataclasses.dataclass(frozen=True) +class Queued: + """Platform poll-mode short-circuit — message accepted, peer will pick up async. + + Returned when the target workspace is registered as + ``delivery_mode=poll`` (no public URL — typical for external + standalone ``molecule-mcp`` runtimes). The message was written to + the platform's inbox queue; the target agent will fetch it via + ``GET /activity?since_id=`` polling. + + NOT a failure. Callers that expect a synchronous reply (the agent's + response text) won't get one here — they should either: + + * Tolerate the absence of a reply (fire-and-forget semantics). + * Fall back to the durable ``/workspaces/:id/delegate`` + + ``/delegations`` polling path (see ``a2a_tools_delegation``'s + ``_delegate_sync_via_polling``), which writes the same A2A + request through the platform's executeDelegation goroutine + and lets the caller poll for the result row. + + ``method`` echoes the request method (``message/send``, ``notify``, + etc.) so callers can correlate. + """ + + method: str + delivery_mode: str = "poll" + + +@dataclasses.dataclass(frozen=True) +class Malformed: + """Server returned a body the parser can't classify. + + Carries the raw decoded payload for diagnostic logging. Callers + typically render this as an error to the user (see + ``send_a2a_message``) — but the Malformed variant is a separate + type so logging / metrics can distinguish it from genuine + JSON-RPC ``Error`` responses. + """ + + raw: Any # whatever the server returned: dict / list / str / number / etc. + + +Variant = Union[Result, Error, Queued, Malformed] + + +# Field-name constants — the wire vocabulary. Single source of truth; +# the parser references these by name so a change here is a +# one-line edit instead of a hunt through string literals. +_KEY_RESULT = "result" +_KEY_ERROR = "error" +_KEY_STATUS = "status" +_KEY_DELIVERY_MODE = "delivery_mode" +_KEY_METHOD = "method" +_KEY_RESTARTING = "restarting" +_KEY_RETRY_AFTER = "retry_after" + +_STATUS_QUEUED = "queued" +_DELIVERY_MODE_POLL = "poll" + + +def parse(data: Any) -> Variant: + """Classify a pre-decoded ``/a2a`` JSON response into a typed variant. + + Never raises. Every branch is total: any input that doesn't match a + known shape routes to ``Malformed`` so the caller can decide how + to surface it. + + The order of checks matters: + + 1. Non-dict input → Malformed (server contract is dict-shaped). + 2. Poll-queued envelope is checked BEFORE result/error because a + server bug that sets both ``status=queued`` and ``result`` + should be loud, not silently treated as Result. + 3. ``result`` → Result (the JSON-RPC success path). + 4. ``error`` → Error (JSON-RPC error or platform error). + 5. Anything else → Malformed. + """ + if not isinstance(data, dict): + logger.warning( + "a2a_response.parse: non-dict body — got %s", + type(data).__name__, + ) + return Malformed(raw=data) + + # Poll-queued envelope. Both keys must be present — the workspace + # server sets them together; if only one is present the body is + # ambiguous and we route to Malformed for visibility. + if ( + data.get(_KEY_STATUS) == _STATUS_QUEUED + and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL + ): + method_raw = data.get(_KEY_METHOD) + method = str(method_raw) if method_raw is not None else "unknown" + logger.info( + "a2a_response.parse: queued for poll-mode peer (method=%s)", + method, + ) + return Queued(method=method) + + # JSON-RPC success. + if _KEY_RESULT in data: + result = data[_KEY_RESULT] + if isinstance(result, dict): + parts_raw = result.get("parts") + parts = parts_raw if isinstance(parts_raw, list) else [] + text = "" + if parts: + first = parts[0] + if isinstance(first, dict): + text_raw = first.get("text") + text = str(text_raw) if text_raw is not None else "" + return Result(text=text, parts=parts, raw_result=result) + # ``result`` present but not a dict — unusual but not an error; + # surface as a Result with the value rendered to text. + return Result(text=str(result), parts=[], raw_result=None) + + # JSON-RPC error or platform error. + if _KEY_ERROR in data: + err_raw = data[_KEY_ERROR] + message = "" + code: Optional[int] = None + if isinstance(err_raw, dict): + msg_raw = err_raw.get("message") + if msg_raw is not None: + message = str(msg_raw).strip() + code_raw = err_raw.get("code") + if isinstance(code_raw, int): + code = code_raw + elif isinstance(err_raw, str): + message = err_raw.strip() + else: + message = str(err_raw) + + restarting = bool(data.get(_KEY_RESTARTING, False)) + retry_after_raw = data.get(_KEY_RETRY_AFTER) + retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None + + return Error( + message=message, + code=code, + restarting=restarting, + retry_after=retry_after, + ) + + logger.warning( + "a2a_response.parse: unrecognized shape — keys=%s", + sorted(data.keys()), + ) + return Malformed(raw=data) diff --git a/workspace/a2a_tools_delegation.py b/workspace/a2a_tools_delegation.py index 170a5333..79f42fd1 100644 --- a/workspace/a2a_tools_delegation.py +++ b/workspace/a2a_tools_delegation.py @@ -29,14 +29,18 @@ from __future__ import annotations import hashlib import json +import logging import os import httpx +logger = logging.getLogger(__name__) + from a2a_client import ( PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _A2A_QUEUED_PREFIX, _peer_names, _peer_to_source, discover_peer, @@ -245,6 +249,29 @@ async def tool_delegate_task( # (the platform proxy) so the same code works for in-container and # external (standalone molecule-mcp) callers. result = await send_a2a_message(workspace_id, task, source_workspace_id=src) + # #2967: when the target is a poll-mode peer, the platform's + # a2a_proxy short-circuits and returns a queued envelope — + # send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX + # sentinel. The synchronous proxy path can't deliver a reply + # because the target has no public URL; fall back to the + # durable /delegate + /delegations polling path which DOES + # work for poll-mode peers (the executeDelegation goroutine + # writes to the inbox queue and the result row arrives when + # the target picks it up + replies). + # + # This is what makes external-runtime-to-external-runtime + # A2A actually deliver synchronous replies — without the + # fallback the calling agent sees the queued sentinel as + # success-with-no-text and never gets the peer's response. + if result.startswith(_A2A_QUEUED_PREFIX): + logger.info( + "tool_delegate_task: target=%s is poll-mode; " + "falling back from message/send to /delegate-poll path", + workspace_id, + ) + result = await _delegate_sync_via_polling( + workspace_id, task, src or WORKSPACE_ID, + ) # Detect delegation failures — wrap them clearly so the calling agent # can decide to retry, use another peer, or handle the task itself. diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 068fbffd..39e3ae04 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -281,11 +281,11 @@ class TestSendA2AMessage: to the 'unexpected response shape' error path → callers retried, peer got duplicate delegations. - Pin: poll-queued envelope returns a clean success string that does - NOT start with _A2A_ERROR_PREFIX, so callers route it through the - normal-outcome path. Verified discriminating: assert_NOT_startswith - the error prefix would FAIL on the old code (which returned an - error-prefixed string) and PASSES on the new code. + Pin: poll-queued envelope returns a string tagged with the + _A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers + can branch on the typed outcome without substring-sniffing. + Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so + the not-startswith assertion would FAIL on the old code. """ import a2a_client @@ -301,12 +301,13 @@ class TestSendA2AMessage: # Discriminating: pre-fix returned a string that startswith # _A2A_ERROR_PREFIX, so this assertion would have FAILED on the - # old code. New code returns a queued-success string. + # old code. New code returns the queued-success sentinel. assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), ( f"poll-queued envelope must not be tagged as A2A error; got: {result!r}" ) - assert "queued" in result.lower() - assert "poll" in result.lower() + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), ( + f"poll-queued envelope must use the queued sentinel; got: {result!r}" + ) # The method is included so a structured-log scraper can route by # protocol verb if needed. assert "message/send" in result @@ -329,6 +330,7 @@ class TestSendA2AMessage: result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) assert "message/sendStream" in result async def test_status_queued_without_poll_mode_still_falls_through(self): @@ -462,6 +464,98 @@ def _make_seq_mock_client(post_side_effect): return mock_client +class TestSendA2AMessagePollMode: + """Pin the #2967 fix: send_a2a_message recognizes the platform's + poll-mode short-circuit envelope and returns a queued sentinel + instead of an "unexpected response shape" error. + + Pre-#2967 the client treated the queued envelope as malformed, + causing the calling agent to retry, which delivered the same + message twice to the (poll-mode) recipient. The Queued sentinel + lets delegate_task fall back to the durable polling path + transparently — see test_delegation_sync_via_polling for the + fallback verification. + """ + + async def test_poll_queued_envelope_returns_queued_sentinel(self): + # Workspace-server returns this shape (a2a_proxy.go:402-406) + # when the target workspace is registered as delivery_mode=poll + # (no public URL, typical for external molecule-mcp standalone + # runtimes). + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Sentinel + structured payload so callers can branch on it. + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + # Critically: NOT the error sentinel. Pre-#2967 it was the error path. + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + # Carries enough info for the caller to log meaningfully. + assert _TEST_PEER_ID in result + assert "message/send" in result + + async def test_poll_queued_envelope_method_is_recorded(self): + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + assert "notify" in result + + async def test_status_queued_without_delivery_mode_is_unexpected_shape(self): + # Server bug: only ``status=queued`` set, ``delivery_mode`` + # missing. Surface as the malformed branch (not Queued) — the + # SSOT parser treats this as Malformed because the documented + # contract requires both keys. + import a2a_client + + resp = _make_response(200, {"status": "queued", "method": "message/send"}) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "unexpected response shape" in result + # Must explicitly mention "or queued envelope" so an operator + # debugging this knows the parser HAS a Queued branch and the + # body just didn't match — not that the parser is missing the + # logic entirely (the pre-#2967 confusion). + assert "queued envelope" in result + + async def test_platform_error_with_restart_metadata_surfaces_in_message(self): + # The platform error envelope: 503 with restart metadata. + # Surfaced as an error string that includes "restarting" so + # the caller / agent can render a softer error to the user. + import a2a_client + + resp = _make_response(200, { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "restarting" in result + assert "retry_after=15" in result + + class TestSendA2AMessageRetry: """Verify auto-retry on transient transport errors (RemoteProtocolError, ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times. diff --git a/workspace/tests/test_a2a_response.py b/workspace/tests/test_a2a_response.py new file mode 100644 index 00000000..cf254b36 --- /dev/null +++ b/workspace/tests/test_a2a_response.py @@ -0,0 +1,455 @@ +"""Tests for the A2A response SSOT parser (workspace/a2a_response.py). + +Branch coverage target: 100%. Each variant of ``parse()`` exercised in +isolation, plus adversarial-input fuzzing to assert the parser never +raises. + +Pre-#2967, the response shape was sniffed inline at every call site +(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` / +``"error" in data`` checks). The bare ``else`` returned an +"unexpected response shape" error — which silently broke poll-mode +peers because the workspace-server's poll-queued envelope has neither +``result`` nor ``error``. The SSOT parser has an explicit ``Queued`` +variant for that path and routes anything truly unrecognized to +``Malformed`` so a future server-side change fails loudly. + +The "this test FAILS on pre-fix source" guarantee is enforced by +running the legacy-shape sniffer alongside the new parser in +``test_legacy_sniffer_misclassified_queued`` — that test fails on +the pre-#2967 ``a2a_client.py`` shape because the legacy code +returns the unexpected-shape error path for the Queued envelope. +""" +from __future__ import annotations + +import logging +from typing import Any + +import pytest + +import a2a_response + + +# ============== Fixture corpus — the canonical wire shapes ============== + + +# Every shape below mirrors a path the workspace-server's a2a_proxy.go +# can return. When you add a new server-side response shape, add a +# fixture entry here and a corresponding test method below. +_FIXTURES = { + "jsonrpc_success_with_text": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [{"kind": "text", "text": "hello world"}], + }, + }, + "jsonrpc_success_multipart": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [ + {"kind": "text", "text": "first"}, + {"kind": "text", "text": "second"}, + ], + }, + }, + "jsonrpc_success_no_parts": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {}, + }, + "jsonrpc_success_part_no_text_key": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {"parts": [{"kind": "text"}]}, + }, + "jsonrpc_error_with_message_and_code": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited", "code": -32003}, + }, + "jsonrpc_error_message_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited"}, + }, + "jsonrpc_error_code_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"code": -32603}, + }, + "jsonrpc_error_string_form": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": "string-shaped error", + }, + "platform_error_with_restart": { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }, + "platform_error_plain": { + "error": "workspace not found", + }, + "poll_queued_full": { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }, + "poll_queued_notify": { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }, + "poll_queued_no_method": { + "status": "queued", + "delivery_mode": "poll", + }, + "malformed_empty_dict": {}, + "malformed_unexpected_keys": {"foo": "bar", "baz": 42}, + "malformed_status_queued_no_delivery_mode": { + # Server bug — status set but delivery_mode missing. + # Should be Malformed, not Queued, because the contract says both. + "status": "queued", + }, + "malformed_delivery_mode_no_status": { + "delivery_mode": "poll", + }, +} + + +# ============== Variant-by-variant coverage ============== + + +class TestQueuedVariant: + """``parse()`` recognizes the workspace-server poll-mode short-circuit + envelope (a2a_proxy.go:402-406) and returns ``Queued``.""" + + def test_full_envelope_with_method_message_send(self): + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "poll" + + def test_envelope_with_method_notify(self): + v = a2a_response.parse(_FIXTURES["poll_queued_notify"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "notify" + + def test_envelope_missing_method_uses_unknown_sentinel(self): + # Envelope without ``method`` key — server contract should + # always set it, but the parser must not raise on absence. + v = a2a_response.parse(_FIXTURES["poll_queued_no_method"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "unknown" + + def test_status_queued_alone_is_malformed_not_queued(self): + # ``status=queued`` without ``delivery_mode=poll`` does not match + # the documented envelope. Surface as Malformed for visibility. + v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"]) + assert isinstance(v, a2a_response.Malformed) + + def test_delivery_mode_alone_is_malformed_not_queued(self): + v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"]) + assert isinstance(v, a2a_response.Malformed) + + def test_logs_info_on_queued(self, caplog): + # Comprehensive logging — operator should see queued events at INFO. + with caplog.at_level(logging.INFO, logger="a2a_response"): + a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert any("queued for poll-mode peer" in r.message for r in caplog.records) + + +class TestResultVariant: + """``parse()`` extracts the JSON-RPC ``result`` envelope into + ``Result(text, parts, raw_result)``.""" + + def test_simple_text_result(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello world" + assert len(v.parts) == 1 + assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]} + + def test_multipart_result_extracts_first_part_text(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "first" + assert len(v.parts) == 2 + + def test_result_with_no_parts(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == [] + + def test_part_without_text_key(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"]) + assert isinstance(v, a2a_response.Result) + # No "text" key — extracted text is empty, parts list intact. + assert v.text == "" + assert len(v.parts) == 1 + + def test_result_non_dict_returns_text_form(self): + # Pathological but legal: ``result`` is a string instead of a dict. + v = a2a_response.parse({"result": "hello"}) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello" + assert v.parts == [] + + def test_result_takes_precedence_when_no_queued_envelope(self): + # Both ``result`` and ``error`` keys present — result wins + # because it's checked first after the Queued path. + v = a2a_response.parse({ + "result": {"parts": [{"kind": "text", "text": "ok"}]}, + "error": {"message": "should-be-ignored"}, + }) + assert isinstance(v, a2a_response.Result) + assert v.text == "ok" + + def test_part_with_non_dict_first_entry(self): + # ``parts[0]`` is a string instead of a dict — parser tolerates it, + # text falls back to empty. + v = a2a_response.parse({"result": {"parts": ["bare-string"]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == ["bare-string"] + + def test_part_text_value_none(self): + # ``parts[0].text`` is explicitly None — extracted as "". + v = a2a_response.parse({"result": {"parts": [{"text": None}]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + + def test_parts_not_a_list(self): + # Server bug: ``parts`` is a dict instead of a list. Parser falls + # back to empty parts rather than raising. + v = a2a_response.parse({"result": {"parts": {"oops": True}}}) + assert isinstance(v, a2a_response.Result) + assert v.parts == [] + assert v.text == "" + + +class TestErrorVariant: + """``parse()`` extracts ``error`` envelopes into ``Error`` and + annotates platform-restart metadata when present.""" + + def test_message_and_code(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code == -32003 + assert v.restarting is False + assert v.retry_after is None + + def test_message_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code is None + + def test_code_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "" + assert v.code == -32603 + + def test_error_string_form(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "string-shaped error" + assert v.code is None + + def test_error_non_dict_non_string(self): + v = a2a_response.parse({"error": 12345}) + assert isinstance(v, a2a_response.Error) + assert v.message == "12345" + + def test_platform_error_with_restart_metadata(self): + v = a2a_response.parse(_FIXTURES["platform_error_with_restart"]) + assert isinstance(v, a2a_response.Error) + assert "workspace agent unreachable" in v.message + assert v.restarting is True + assert v.retry_after == 15 + + def test_platform_error_without_restart(self): + v = a2a_response.parse(_FIXTURES["platform_error_plain"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "workspace not found" + assert v.restarting is False + assert v.retry_after is None + + def test_error_message_with_whitespace_stripped(self): + v = a2a_response.parse({"error": {"message": " trimmed "}}) + assert isinstance(v, a2a_response.Error) + assert v.message == "trimmed" + + def test_non_int_code_dropped(self): + v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}}) + assert isinstance(v, a2a_response.Error) + assert v.code is None + + def test_non_int_retry_after_dropped(self): + v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"}) + assert isinstance(v, a2a_response.Error) + assert v.retry_after is None + + +class TestMalformedVariant: + """``parse()`` returns ``Malformed`` for any shape it can't classify + and logs at WARNING so operators see new server response shapes.""" + + def test_empty_dict(self): + v = a2a_response.parse(_FIXTURES["malformed_empty_dict"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {} + + def test_unexpected_keys(self): + v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {"foo": "bar", "baz": 42} + + def test_non_dict_input_list(self): + v = a2a_response.parse([1, 2, 3]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == [1, 2, 3] + + def test_non_dict_input_string(self): + v = a2a_response.parse("plain string") + assert isinstance(v, a2a_response.Malformed) + assert v.raw == "plain string" + + def test_non_dict_input_none(self): + v = a2a_response.parse(None) + assert isinstance(v, a2a_response.Malformed) + assert v.raw is None + + def test_logs_warning_on_malformed(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert any(r.levelno == logging.WARNING for r in caplog.records) + + def test_logs_warning_on_non_dict(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse("not a dict") + assert any("non-dict" in r.message for r in caplog.records) + + +# ============== Robustness — parser never raises ============== + + +_ADVERSARIAL_INPUTS: list[Any] = [ + None, + True, + False, + 0, + -1, + 3.14, + "", + "string", + [], + [1, 2, 3], + {}, + {"random": "garbage"}, + {"result": None}, + {"result": [1, 2, 3]}, + {"result": {"parts": None}}, + {"result": {"parts": [None]}}, + {"result": {"parts": [{"text": []}]}}, + {"error": None}, + {"error": []}, + {"error": {"message": None, "code": None}}, + {"error": {"message": ["nested", "list"]}}, + {"status": None, "delivery_mode": None, "method": None}, + {"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode + {"status": "running", "delivery_mode": "poll"}, # wrong status + {"status": 42, "delivery_mode": "poll"}, # non-string status + # Deeply-nested junk + {"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}}, + # Bytes (not really JSON-decodable but parser shouldn't raise) + {"result": {"parts": [{"text": b"bytes" if False else "x"}]}}, +] + + +class TestRobustness: + """Parser must never raise on adversarial input — every branch is total. + + These cases catch regressions where a future change adds a key + access that doesn't tolerate ``None`` / wrong-type values. + """ + + @pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS) + def test_parse_never_raises(self, payload): + # Single contract: parse must return one of the four variants + # regardless of input. No exception classes propagated. + v = a2a_response.parse(payload) + assert isinstance(v, (a2a_response.Result, a2a_response.Error, + a2a_response.Queued, a2a_response.Malformed)) + + +# ============== Regression gate — pre-#2967 misclassified queued ============== + + +class TestRegressionGate: + """Pin the bug that prompted the SSOT abstraction. + + Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in + data`` and ``"error" in data`` — the poll-queued envelope (no + result key, no error key) hit the bare-else and returned the + "unexpected response shape" error string. This test simulates the + pre-fix code path and confirms the SSOT parser correctly + distinguishes Queued from Malformed. + """ + + def test_legacy_sniffer_would_return_neither_branch(self): + # The pre-#2967 logic — provided here so the regression is + # reproducible from this file alone, no archaeology needed. + envelope = _FIXTURES["poll_queued_full"] + legacy_branch = ( + "result" if "result" in envelope + else "error" if "error" in envelope + else "unexpected_shape" + ) + # Legacy sniff: hits the malformed branch. + assert legacy_branch == "unexpected_shape" + + def test_ssot_parser_classifies_correctly(self): + # New parser: classifies as Queued. + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + + def test_every_fixture_classifies_to_expected_variant(self): + # Defense in depth — pin the variant for every fixture so a + # future shape addition has to update the table here too. + expected: dict[str, type] = { + "jsonrpc_success_with_text": a2a_response.Result, + "jsonrpc_success_multipart": a2a_response.Result, + "jsonrpc_success_no_parts": a2a_response.Result, + "jsonrpc_success_part_no_text_key": a2a_response.Result, + "jsonrpc_error_with_message_and_code": a2a_response.Error, + "jsonrpc_error_message_only": a2a_response.Error, + "jsonrpc_error_code_only": a2a_response.Error, + "jsonrpc_error_string_form": a2a_response.Error, + "platform_error_with_restart": a2a_response.Error, + "platform_error_plain": a2a_response.Error, + "poll_queued_full": a2a_response.Queued, + "poll_queued_notify": a2a_response.Queued, + "poll_queued_no_method": a2a_response.Queued, + "malformed_empty_dict": a2a_response.Malformed, + "malformed_unexpected_keys": a2a_response.Malformed, + "malformed_status_queued_no_delivery_mode": a2a_response.Malformed, + "malformed_delivery_mode_no_status": a2a_response.Malformed, + } + # Every fixture must be enumerated — keeps this gate honest. + assert set(expected.keys()) == set(_FIXTURES.keys()), ( + f"fixture/expected mismatch: " + f"missing-from-expected={set(_FIXTURES) - set(expected)} " + f"extra-in-expected={set(expected) - set(_FIXTURES)}" + ) + for name, payload in _FIXTURES.items(): + v = a2a_response.parse(payload) + assert isinstance(v, expected[name]), ( + f"fixture {name!r} classified as {type(v).__name__}, " + f"expected {expected[name].__name__}" + ) diff --git a/workspace/tests/test_delegation_sync_via_polling.py b/workspace/tests/test_delegation_sync_via_polling.py index 7f6b2918..018d572a 100644 --- a/workspace/tests/test_delegation_sync_via_polling.py +++ b/workspace/tests/test_delegation_sync_via_polling.py @@ -93,6 +93,124 @@ class TestFlagOffLegacyPath: poll_mock.assert_not_called() +# --------------------------------------------------------------------------- +# #2967: Auto-fallback to polling path when target is poll-mode +# --------------------------------------------------------------------------- + +class TestPollModeAutoFallback: + """Pin the #2967 behavior: when send_a2a_message returns the queued + sentinel (target is poll-mode), tool_delegate_task transparently + falls back to _delegate_sync_via_polling — which DOES work for + poll-mode peers (the executeDelegation goroutine writes to the + inbox queue and the result row arrives when the target replies). + + Pre-#2967 behavior: queued sentinel was never returned (the parser + misclassified the envelope as malformed), and the calling agent + saw a DELEGATION FAILED / unexpected-response-shape error. This + test guards both against the parser regression (sentinel-emission) + and the fallback regression (sentinel-handling). + """ + + async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch): + # Flag OFF — legacy send_a2a_message path. send returns the + # queued sentinel because the target is poll-mode. delegate_task + # must auto-route to _delegate_sync_via_polling so the agent + # eventually gets a real reply. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_QUEUED_PREFIX + + send_calls = [] + poll_calls = [] + + async def fake_send(workspace_id, task, source_workspace_id=None): + send_calls.append((workspace_id, task, source_workspace_id)) + return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send" + + async def fake_polling(workspace_id, task, src): + poll_calls.append((workspace_id, task, src)) + return "real response from poll-mode peer" + + async def fake_discover(*_a, **_kw): + return {"name": "poll-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity): + result = await a2a_tools.tool_delegate_task( + "ws-target", "task body", source_workspace_id="ws-self" + ) + + # send was tried first + assert len(send_calls) == 1 + # …then fallback fired automatically + assert len(poll_calls) == 1 + assert poll_calls[0] == ("ws-target", "task body", "ws-self") + # Caller sees the real reply, NOT the queued sentinel and NOT + # a DELEGATION FAILED string. + assert result == "real response from poll-mode peer" + + async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch): + # Push-mode peer returns a normal text reply — fallback path + # MUST NOT fire (no extra round-trip cost). + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + + async def fake_send(*_a, **_kw): + return "normal reply" + + async def fake_discover(*_a, **_kw): + return {"name": "push-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert result == "normal reply" + poll_mock.assert_not_called() + + async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch): + # Genuine error (not queued) — must surface as DELEGATION FAILED, + # not silently retried via the polling path. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_ERROR_PREFIX + + async def fake_send(*_a, **_kw): + return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]" + + async def fake_discover(*_a, **_kw): + return {"name": "broken-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert "DELEGATION FAILED" in result + poll_mock.assert_not_called() + + # --------------------------------------------------------------------------- # Flag-on: dispatch failures # ---------------------------------------------------------------------------