diff --git a/tests/e2e/test_staging_full_saas.sh b/tests/e2e/test_staging_full_saas.sh index a2e9d5fd0..5d105e511 100755 --- a/tests/e2e/test_staging_full_saas.sh +++ b/tests/e2e/test_staging_full_saas.sh @@ -1088,6 +1088,152 @@ done # externally routable readiness boundary again. wait_workspaces_online_routable "7d/11 Waiting for workspace(s) to recover routing after config.yaml PUT..." "${WS_TO_CHECK[@]}" +# ─── A2A send with 202-queued poll helper (core#2437 part B) ─────────── +# Sends POST /workspaces/:id/a2a. If the agent is busy/starting and returns +# a 2xx with queued:true + queue_id, poll GetA2AQueueStatus until the durable +# result is available. Handles curl rc 28 / http 000 / 404 retryable while the +# queue row is still materializing, and transient 502/503/504 cold-start. +# Prints the final A2A JSON-RPC response body to stdout; logs to stderr. +a2a_send_or_poll_queue() { + local ws_id="$1"; shift + local payload="$1"; shift + local label="$1" + local tmp qid resp code rc attempt poll_attempt poll_tmp + tmp=$(mktemp -t a2a_poll.XXXXXX) + qid="" + + for attempt in $(seq 1 12); do + if [ -n "$qid" ]; then + # We have a queue_id — poll GetA2AQueueStatus for the durable result. + poll_tmp=$(mktemp -t a2a_qpoll.XXXXXX) + for poll_attempt in $(seq 1 30); do + : >"$poll_tmp" + set +e + code=$(tenant_call GET "/workspaces/$ws_id/a2a/queue/$qid" \ + --max-time 30 \ + -H "X-Workspace-ID: $ws_id" \ + -o "$poll_tmp" \ + -w '%{http_code}' \ + 2>/dev/null) + rc=$? + set -e + code=${code:-000} + resp=$(cat "$poll_tmp" 2>/dev/null || echo "") + + if [ "$rc" != "0" ] || [ "$code" = "000" ] || [ "$code" = "404" ]; then + echo " $label queue poll attempt $poll_attempt/30: curl_rc=$rc http=$code — retryable, backing off 2s" >&2 + sleep 2 + continue + fi + if [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then + rm -f "$poll_tmp" "$tmp" + fail "$label queue poll failed (http=$code): $(printf '%s' "$resp" | sanitize_http_body)" + fi + + local qstatus + qstatus=$(printf '%s' "$resp" | python3 -c " +import json,sys +try: + print(json.load(sys.stdin).get('status','')) +except Exception: + print('')" 2>/dev/null || echo "") + case "$qstatus" in + completed) + resp=$(printf '%s' "$resp" | python3 -c " +import json,sys +try: + rb=json.load(sys.stdin).get('response_body') + print(json.dumps(rb) if rb is not None else '') +except Exception: + print('')" 2>/dev/null || echo "") + if [ -n "$resp" ]; then + code=200 + break 2 + fi + ;; + failed|dropped) + rm -f "$poll_tmp" "$tmp" + fail "$label queue item $qid terminal status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)" + ;; + queued|dispatched|in_progress|"") + echo " $label queue poll attempt $poll_attempt/30 status=$qstatus — backing off 2s" >&2 + sleep 2 + ;; + *) + rm -f "$poll_tmp" "$tmp" + fail "$label queue poll unexpected status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)" + ;; + esac + done + rm -f "$poll_tmp" + # Ran out of queue poll attempts. + fail "$label queue poll timed out waiting for $qid to complete" + fi + + # Initial POST (or retry before we have a queue_id). + : >"$tmp" + set +e + code=$(tenant_call POST "/workspaces/$ws_id/a2a" \ + --max-time 90 \ + -H "Content-Type: application/json" \ + -H "X-Workspace-ID: $ws_id" \ + -d "$payload" \ + -o "$tmp" \ + -w '%{http_code}' \ + 2>/dev/null) + rc=$? + set -e + code=${code:-000} + resp=$(cat "$tmp" 2>/dev/null || echo "") + + if [ "$rc" = "0" ] && [ "$code" -ge 200 ] && [ "$code" -lt 300 ]; then + local is_queued + is_queued=$(printf '%s' "$resp" | python3 -c " +import json,sys +try: + d=json.load(sys.stdin) + print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false') +except Exception: + print('false')" 2>/dev/null || echo "false") + if [ "$is_queued" = "true" ]; then + qid=$(printf '%s' "$resp" | python3 -c " +import json,sys +try: + print(json.load(sys.stdin).get('queue_id','')) +except Exception: + print('')" 2>/dev/null || echo "") + if [ -n "$qid" ]; then + echo " $label A2A queued (queue_id=$qid); switching to poll" >&2 + continue + fi + else + break + fi + fi + + local safe_body + safe_body=$(printf '%s' "$resp" | sanitize_http_body) + if echo "$code" | grep -Eq '^(502|503|504)$' && echo "$safe_body" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then + echo " $label A2A transient $code attempt $attempt/12: $safe_body" >&2 + if [ "$attempt" -lt 12 ]; then + local sleep_sec=10 + if echo "$safe_body" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then + sleep_sec=30 + fi + sleep "$sleep_sec" + continue + fi + fi + break + done + + rm -f "$tmp" + if [ "$rc" != "0" ] || [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then + fail "$label failed after $attempt attempt(s) (curl_rc=$rc, http=$code): $(printf '%s' "$resp" | sanitize_http_body)" + fi + printf '%s' "$resp" +} + # ─── 8. A2A round-trip on parent ─────────────────────────────────────── log "8/11 Sending A2A message to parent — expecting agent response..." # Smoke prompt phrasing — DO NOT trim back to the bare "Reply with exactly: PONG" @@ -1127,44 +1273,9 @@ print(json.dumps({ # 90s gives ~3x headroom over observed cold-call P95 (~25-30s). # Subsequent A2A turns hit the same workspace and are sub-second, so # this only widens the window for step 8/11 of the canary's first turn. -A2A_TMP=$(mktemp -t synth_a2a.XXXXXX) -for A2A_ATTEMPT in $(seq 1 12); do - : >"$A2A_TMP" - set +e - A2A_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \ - --max-time 90 \ - -H "Content-Type: application/json" \ - -d "$A2A_PAYLOAD" \ - -o "$A2A_TMP" \ - -w '%{http_code}' \ - 2>/dev/null) - A2A_RC=$? - set -e - A2A_CODE=${A2A_CODE:-000} - A2A_RESP=$(cat "$A2A_TMP" 2>/dev/null || echo "") - if [ "$A2A_RC" = "0" ] && [ "$A2A_CODE" -ge 200 ] && [ "$A2A_CODE" -lt 300 ]; then - break - fi - - A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body) - if echo "$A2A_CODE" | grep -Eq '^(502|503|504)$' && echo "$A2A_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then - log " A2A cold-start probe attempt $A2A_ATTEMPT/12 returned $A2A_CODE: $A2A_SAFE_BODY" - if [ "$A2A_ATTEMPT" -lt 12 ]; then - A2A_SLEEP=10 - if echo "$A2A_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session'; then - A2A_SLEEP=30 - fi - sleep "$A2A_SLEEP" - continue - fi - fi - break -done -rm -f "$A2A_TMP" -if [ "$A2A_RC" != "0" ] || [ "$A2A_CODE" -lt 200 ] || [ "$A2A_CODE" -ge 300 ]; then - A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body) - fail "A2A POST /workspaces/$PARENT_ID/a2a failed after $A2A_ATTEMPT attempt(s) (curl_rc=$A2A_RC, http=$A2A_CODE): $A2A_SAFE_BODY" -fi +# core#2437 part B: send A2A and, if the agent is busy/starting, poll the +# queue status endpoint until the durable result is available. +A2A_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$A2A_PAYLOAD" "A2A parent") AGENT_TEXT=$(echo "$A2A_RESP" | python3 -c " import json, sys d = json.load(sys.stdin) @@ -1280,128 +1391,8 @@ print(json.dumps({ } })) ") -KA_TMP=$(mktemp -t known_answer_a2a.XXXXXX) -KA_RESP="" -KA_QUEUE_ID="" -for KA_ATTEMPT in $(seq 1 12); do - : >"$KA_TMP" - if [ -n "$KA_QUEUE_ID" ]; then - # We already have a queued work item — poll its status, don't re-POST. - # Re-POSTing while the original queued work is still pending collides with - # the native-session busy/drain window and reproduces the 502/empty failure. - set +e - KA_CODE=$(tenant_call GET "/workspaces/$PARENT_ID/a2a/queue/$KA_QUEUE_ID" \ - --max-time 90 \ - -o "$KA_TMP" \ - -w '%{http_code}' \ - 2>/dev/null) - KA_RC=$? - set -e - KA_CODE=${KA_CODE:-000} - KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "") - if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then - KA_Q_STATUS=$(echo "$KA_RESP" | python3 -c " -import json,sys -try: - print(json.load(sys.stdin).get('status','')) -except Exception: - print('') -" 2>/dev/null || echo "") - if [ "$KA_Q_STATUS" = "completed" ]; then - # The queue row stores the agent's A2A response in response_body. - KA_RESP=$(echo "$KA_RESP" | python3 -c " -import json,sys -try: - rb=json.load(sys.stdin).get('response_body') - print(json.dumps(rb) if rb is not None else '') -except Exception: - print('') -" 2>/dev/null || echo "") - if [ -n "$KA_RESP" ]; then - KA_CODE=200 - break - fi - fi - # Still pending — poll again. - if echo "$KA_Q_STATUS" | grep -Eqi 'queued|dispatched|in_progress'; then - KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body) - log " known-answer queue poll attempt $KA_ATTEMPT/12 status=$KA_Q_STATUS: $KA_SAFE_BODY" - if [ "$KA_ATTEMPT" -lt 12 ]; then sleep 10; continue; fi - fi - fi - KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body) - if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then - log " known-answer queue poll transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY" - if [ "$KA_ATTEMPT" -lt 12 ]; then - KA_SLEEP=10 - if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then - KA_SLEEP=30 - fi - sleep "$KA_SLEEP" - continue - fi - fi - break - else - # Initial POST (or retry before we have a queue_id). A 202 queued response - # transitions to the poll path above; a terminal 2xx breaks immediately. - set +e - KA_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \ - --max-time 90 \ - -H "Content-Type: application/json" \ - -d "$KA_PAYLOAD" \ - -o "$KA_TMP" \ - -w '%{http_code}' \ - 2>/dev/null) - KA_RC=$? - set -e - KA_CODE=${KA_CODE:-000} - KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "") - if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then - KA_IS_QUEUED=$(echo "$KA_RESP" | python3 -c " -import json,sys -try: - d=json.load(sys.stdin) - print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false') -except Exception: - print('false') -" 2>/dev/null || echo "false") - if [ "$KA_IS_QUEUED" = "true" ]; then - KA_QUEUE_ID=$(echo "$KA_RESP" | python3 -c " -import json,sys -try: - print(json.load(sys.stdin).get('queue_id','')) -except Exception: - print('') -" 2>/dev/null || echo "") - if [ -n "$KA_QUEUE_ID" ]; then - log " known-answer A2A queued (queue_id=$KA_QUEUE_ID); switching to poll" - continue - fi - else - break - fi - fi - KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body) - if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then - log " known-answer A2A transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY" - if [ "$KA_ATTEMPT" -lt 12 ]; then - KA_SLEEP=10 - if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then - KA_SLEEP=30 - fi - sleep "$KA_SLEEP" - continue - fi - fi - break - fi -done -rm -f "$KA_TMP" -if [ "$KA_RC" != "0" ] || [ "$KA_CODE" -lt 200 ] || [ "$KA_CODE" -ge 300 ]; then - KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body) - fail "Known-answer A2A failed after $KA_ATTEMPT attempt(s) (curl_rc=$KA_RC, http=$KA_CODE): $KA_SAFE_BODY" -fi +# core#2437 part B: send A2A and poll queue status if the agent queues it. +KA_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$KA_PAYLOAD" "A2A known-answer") KA_TEXT=$(echo "$KA_RESP" | python3 -c " import json, sys def extract_text(d):