fix(e2e): poll GetA2AQueueStatus on 202-queued A2A responses (core#2437 part B) #2708
+151
-160
@@ -1088,6 +1088,152 @@ done
|
||||
# externally routable readiness boundary again.
|
||||
wait_workspaces_online_routable "7d/11 Waiting for workspace(s) to recover routing after config.yaml PUT..." "${WS_TO_CHECK[@]}"
|
||||
|
||||
# ─── A2A send with 202-queued poll helper (core#2437 part B) ───────────
|
||||
# Sends POST /workspaces/:id/a2a. If the agent is busy/starting and returns
|
||||
# a 2xx with queued:true + queue_id, poll GetA2AQueueStatus until the durable
|
||||
# result is available. Handles curl rc 28 / http 000 / 404 retryable while the
|
||||
# queue row is still materializing, and transient 502/503/504 cold-start.
|
||||
# Prints the final A2A JSON-RPC response body to stdout; logs to stderr.
|
||||
a2a_send_or_poll_queue() {
|
||||
local ws_id="$1"; shift
|
||||
local payload="$1"; shift
|
||||
local label="$1"
|
||||
local tmp qid resp code rc attempt poll_attempt poll_tmp
|
||||
tmp=$(mktemp -t a2a_poll.XXXXXX)
|
||||
qid=""
|
||||
|
||||
for attempt in $(seq 1 12); do
|
||||
if [ -n "$qid" ]; then
|
||||
# We have a queue_id — poll GetA2AQueueStatus for the durable result.
|
||||
poll_tmp=$(mktemp -t a2a_qpoll.XXXXXX)
|
||||
for poll_attempt in $(seq 1 30); do
|
||||
: >"$poll_tmp"
|
||||
set +e
|
||||
code=$(tenant_call GET "/workspaces/$ws_id/a2a/queue/$qid" \
|
||||
--max-time 30 \
|
||||
-H "X-Workspace-ID: $ws_id" \
|
||||
-o "$poll_tmp" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
rc=$?
|
||||
set -e
|
||||
code=${code:-000}
|
||||
resp=$(cat "$poll_tmp" 2>/dev/null || echo "")
|
||||
|
||||
if [ "$rc" != "0" ] || [ "$code" = "000" ] || [ "$code" = "404" ]; then
|
||||
echo " $label queue poll attempt $poll_attempt/30: curl_rc=$rc http=$code — retryable, backing off 2s" >&2
|
||||
sleep 2
|
||||
continue
|
||||
fi
|
||||
if [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then
|
||||
rm -f "$poll_tmp" "$tmp"
|
||||
fail "$label queue poll failed (http=$code): $(printf '%s' "$resp" | sanitize_http_body)"
|
||||
fi
|
||||
|
||||
local qstatus
|
||||
qstatus=$(printf '%s' "$resp" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
print(json.load(sys.stdin).get('status',''))
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
case "$qstatus" in
|
||||
completed)
|
||||
resp=$(printf '%s' "$resp" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
rb=json.load(sys.stdin).get('response_body')
|
||||
print(json.dumps(rb) if rb is not None else '')
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
if [ -n "$resp" ]; then
|
||||
code=200
|
||||
break 2
|
||||
fi
|
||||
;;
|
||||
failed|dropped)
|
||||
rm -f "$poll_tmp" "$tmp"
|
||||
fail "$label queue item $qid terminal status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)"
|
||||
;;
|
||||
queued|dispatched|in_progress|"")
|
||||
echo " $label queue poll attempt $poll_attempt/30 status=$qstatus — backing off 2s" >&2
|
||||
sleep 2
|
||||
;;
|
||||
*)
|
||||
rm -f "$poll_tmp" "$tmp"
|
||||
fail "$label queue poll unexpected status=$qstatus: $(printf '%s' "$resp" | sanitize_http_body)"
|
||||
;;
|
||||
esac
|
||||
done
|
||||
rm -f "$poll_tmp"
|
||||
# Ran out of queue poll attempts.
|
||||
fail "$label queue poll timed out waiting for $qid to complete"
|
||||
fi
|
||||
|
||||
# Initial POST (or retry before we have a queue_id).
|
||||
: >"$tmp"
|
||||
set +e
|
||||
code=$(tenant_call POST "/workspaces/$ws_id/a2a" \
|
||||
--max-time 90 \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "X-Workspace-ID: $ws_id" \
|
||||
-d "$payload" \
|
||||
-o "$tmp" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
rc=$?
|
||||
set -e
|
||||
code=${code:-000}
|
||||
resp=$(cat "$tmp" 2>/dev/null || echo "")
|
||||
|
||||
if [ "$rc" = "0" ] && [ "$code" -ge 200 ] && [ "$code" -lt 300 ]; then
|
||||
local is_queued
|
||||
is_queued=$(printf '%s' "$resp" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
d=json.load(sys.stdin)
|
||||
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
|
||||
except Exception:
|
||||
print('false')" 2>/dev/null || echo "false")
|
||||
if [ "$is_queued" = "true" ]; then
|
||||
qid=$(printf '%s' "$resp" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
print(json.load(sys.stdin).get('queue_id',''))
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
if [ -n "$qid" ]; then
|
||||
echo " $label A2A queued (queue_id=$qid); switching to poll" >&2
|
||||
continue
|
||||
fi
|
||||
else
|
||||
break
|
||||
fi
|
||||
fi
|
||||
|
||||
local safe_body
|
||||
safe_body=$(printf '%s' "$resp" | sanitize_http_body)
|
||||
if echo "$code" | grep -Eq '^(502|503|504)$' && echo "$safe_body" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
echo " $label A2A transient $code attempt $attempt/12: $safe_body" >&2
|
||||
if [ "$attempt" -lt 12 ]; then
|
||||
local sleep_sec=10
|
||||
if echo "$safe_body" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
sleep_sec=30
|
||||
fi
|
||||
sleep "$sleep_sec"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
done
|
||||
|
||||
rm -f "$tmp"
|
||||
if [ "$rc" != "0" ] || [ "$code" -lt 200 ] || [ "$code" -ge 300 ]; then
|
||||
fail "$label failed after $attempt attempt(s) (curl_rc=$rc, http=$code): $(printf '%s' "$resp" | sanitize_http_body)"
|
||||
fi
|
||||
printf '%s' "$resp"
|
||||
}
|
||||
|
||||
# ─── 8. A2A round-trip on parent ───────────────────────────────────────
|
||||
log "8/11 Sending A2A message to parent — expecting agent response..."
|
||||
# Smoke prompt phrasing — DO NOT trim back to the bare "Reply with exactly: PONG"
|
||||
@@ -1127,44 +1273,9 @@ print(json.dumps({
|
||||
# 90s gives ~3x headroom over observed cold-call P95 (~25-30s).
|
||||
# Subsequent A2A turns hit the same workspace and are sub-second, so
|
||||
# this only widens the window for step 8/11 of the canary's first turn.
|
||||
A2A_TMP=$(mktemp -t synth_a2a.XXXXXX)
|
||||
for A2A_ATTEMPT in $(seq 1 12); do
|
||||
: >"$A2A_TMP"
|
||||
set +e
|
||||
A2A_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
|
||||
--max-time 90 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$A2A_PAYLOAD" \
|
||||
-o "$A2A_TMP" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
A2A_RC=$?
|
||||
set -e
|
||||
A2A_CODE=${A2A_CODE:-000}
|
||||
A2A_RESP=$(cat "$A2A_TMP" 2>/dev/null || echo "")
|
||||
if [ "$A2A_RC" = "0" ] && [ "$A2A_CODE" -ge 200 ] && [ "$A2A_CODE" -lt 300 ]; then
|
||||
break
|
||||
fi
|
||||
|
||||
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
|
||||
if echo "$A2A_CODE" | grep -Eq '^(502|503|504)$' && echo "$A2A_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
|
||||
log " A2A cold-start probe attempt $A2A_ATTEMPT/12 returned $A2A_CODE: $A2A_SAFE_BODY"
|
||||
if [ "$A2A_ATTEMPT" -lt 12 ]; then
|
||||
A2A_SLEEP=10
|
||||
if echo "$A2A_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session'; then
|
||||
A2A_SLEEP=30
|
||||
fi
|
||||
sleep "$A2A_SLEEP"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
done
|
||||
rm -f "$A2A_TMP"
|
||||
if [ "$A2A_RC" != "0" ] || [ "$A2A_CODE" -lt 200 ] || [ "$A2A_CODE" -ge 300 ]; then
|
||||
A2A_SAFE_BODY=$(printf '%s' "$A2A_RESP" | sanitize_http_body)
|
||||
fail "A2A POST /workspaces/$PARENT_ID/a2a failed after $A2A_ATTEMPT attempt(s) (curl_rc=$A2A_RC, http=$A2A_CODE): $A2A_SAFE_BODY"
|
||||
fi
|
||||
# core#2437 part B: send A2A and, if the agent is busy/starting, poll the
|
||||
# queue status endpoint until the durable result is available.
|
||||
A2A_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$A2A_PAYLOAD" "A2A parent")
|
||||
AGENT_TEXT=$(echo "$A2A_RESP" | python3 -c "
|
||||
import json, sys
|
||||
d = json.load(sys.stdin)
|
||||
@@ -1280,128 +1391,8 @@ print(json.dumps({
|
||||
}
|
||||
}))
|
||||
")
|
||||
KA_TMP=$(mktemp -t known_answer_a2a.XXXXXX)
|
||||
KA_RESP=""
|
||||
KA_QUEUE_ID=""
|
||||
for KA_ATTEMPT in $(seq 1 12); do
|
||||
: >"$KA_TMP"
|
||||
if [ -n "$KA_QUEUE_ID" ]; then
|
||||
# We already have a queued work item — poll its status, don't re-POST.
|
||||
# Re-POSTing while the original queued work is still pending collides with
|
||||
# the native-session busy/drain window and reproduces the 502/empty failure.
|
||||
set +e
|
||||
KA_CODE=$(tenant_call GET "/workspaces/$PARENT_ID/a2a/queue/$KA_QUEUE_ID" \
|
||||
--max-time 90 \
|
||||
-o "$KA_TMP" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
KA_RC=$?
|
||||
set -e
|
||||
KA_CODE=${KA_CODE:-000}
|
||||
KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "")
|
||||
if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then
|
||||
KA_Q_STATUS=$(echo "$KA_RESP" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
print(json.load(sys.stdin).get('status',''))
|
||||
except Exception:
|
||||
print('')
|
||||
" 2>/dev/null || echo "")
|
||||
if [ "$KA_Q_STATUS" = "completed" ]; then
|
||||
# The queue row stores the agent's A2A response in response_body.
|
||||
KA_RESP=$(echo "$KA_RESP" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
rb=json.load(sys.stdin).get('response_body')
|
||||
print(json.dumps(rb) if rb is not None else '')
|
||||
except Exception:
|
||||
print('')
|
||||
" 2>/dev/null || echo "")
|
||||
if [ -n "$KA_RESP" ]; then
|
||||
KA_CODE=200
|
||||
break
|
||||
fi
|
||||
fi
|
||||
# Still pending — poll again.
|
||||
if echo "$KA_Q_STATUS" | grep -Eqi 'queued|dispatched|in_progress'; then
|
||||
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
|
||||
log " known-answer queue poll attempt $KA_ATTEMPT/12 status=$KA_Q_STATUS: $KA_SAFE_BODY"
|
||||
if [ "$KA_ATTEMPT" -lt 12 ]; then sleep 10; continue; fi
|
||||
fi
|
||||
fi
|
||||
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
|
||||
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
log " known-answer queue poll transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY"
|
||||
if [ "$KA_ATTEMPT" -lt 12 ]; then
|
||||
KA_SLEEP=10
|
||||
if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
KA_SLEEP=30
|
||||
fi
|
||||
sleep "$KA_SLEEP"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
else
|
||||
# Initial POST (or retry before we have a queue_id). A 202 queued response
|
||||
# transitions to the poll path above; a terminal 2xx breaks immediately.
|
||||
set +e
|
||||
KA_CODE=$(tenant_call POST "/workspaces/$PARENT_ID/a2a" \
|
||||
--max-time 90 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$KA_PAYLOAD" \
|
||||
-o "$KA_TMP" \
|
||||
-w '%{http_code}' \
|
||||
2>/dev/null)
|
||||
KA_RC=$?
|
||||
set -e
|
||||
KA_CODE=${KA_CODE:-000}
|
||||
KA_RESP=$(cat "$KA_TMP" 2>/dev/null || echo "")
|
||||
if [ "$KA_RC" = "0" ] && [ "$KA_CODE" -ge 200 ] && [ "$KA_CODE" -lt 300 ]; then
|
||||
KA_IS_QUEUED=$(echo "$KA_RESP" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
d=json.load(sys.stdin)
|
||||
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
|
||||
except Exception:
|
||||
print('false')
|
||||
" 2>/dev/null || echo "false")
|
||||
if [ "$KA_IS_QUEUED" = "true" ]; then
|
||||
KA_QUEUE_ID=$(echo "$KA_RESP" | python3 -c "
|
||||
import json,sys
|
||||
try:
|
||||
print(json.load(sys.stdin).get('queue_id',''))
|
||||
except Exception:
|
||||
print('')
|
||||
" 2>/dev/null || echo "")
|
||||
if [ -n "$KA_QUEUE_ID" ]; then
|
||||
log " known-answer A2A queued (queue_id=$KA_QUEUE_ID); switching to poll"
|
||||
continue
|
||||
fi
|
||||
else
|
||||
break
|
||||
fi
|
||||
fi
|
||||
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
|
||||
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
log " known-answer A2A transient $KA_CODE attempt $KA_ATTEMPT/12: $KA_SAFE_BODY"
|
||||
if [ "$KA_ATTEMPT" -lt 12 ]; then
|
||||
KA_SLEEP=10
|
||||
if echo "$KA_SAFE_BODY" | grep -Eqi 'workspace agent busy|native_session|restarting|restart triggered'; then
|
||||
KA_SLEEP=30
|
||||
fi
|
||||
sleep "$KA_SLEEP"
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
break
|
||||
fi
|
||||
done
|
||||
rm -f "$KA_TMP"
|
||||
if [ "$KA_RC" != "0" ] || [ "$KA_CODE" -lt 200 ] || [ "$KA_CODE" -ge 300 ]; then
|
||||
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
|
||||
fail "Known-answer A2A failed after $KA_ATTEMPT attempt(s) (curl_rc=$KA_RC, http=$KA_CODE): $KA_SAFE_BODY"
|
||||
fi
|
||||
# core#2437 part B: send A2A and poll queue status if the agent queues it.
|
||||
KA_RESP=$(a2a_send_or_poll_queue "$PARENT_ID" "$KA_PAYLOAD" "A2A known-answer")
|
||||
KA_TEXT=$(echo "$KA_RESP" | python3 -c "
|
||||
import json, sys
|
||||
def extract_text(d):
|
||||
|
||||
Reference in New Issue
Block a user