fix(deploy): #2859 bounded retry + error surfacing for redeploy-fleet transient 502s #2862

Merged
devops-engineer merged 1 commits from fix/2859-redeploy-fleet-transient-retry into main 2026-06-14 16:28:15 +00:00
2 changed files with 148 additions and 2 deletions
+54 -2
View File
@@ -26,6 +26,14 @@ DEFAULT_REQUIRED_CONTEXTS = [
TERMINAL_FAILURE_STATES = {"failure", "error", "cancelled", "canceled", "skipped"}
REDEPLOY_PATH = "/cp/admin/tenants/redeploy-fleet"
# Bounded retry for transient CP/gateway failures (e.g. 502 from an upstream
# dependency like SSM during redeploy-fleet). A 502 on the canary should not
# hard-halt the whole fleet rollout if a quick retry succeeds.
REDEPLOY_RETRY_STATUSES = {502, 503, 504}
# Initial attempt + this many retries. Delays are applied BEFORE each retry.
REDEPLOY_MAX_RETRIES = 3
REDEPLOY_RETRY_DELAYS_SECONDS = [5, 10, 20]
def truthy_flag(value: str | None) -> bool:
if value is None:
@@ -214,15 +222,59 @@ def plan_rollout_slugs(cp_url: str, token: str, body: dict, redeploy=None) -> li
return slugs
def _redeploy_error_detail(body: dict, max_len: int = 200) -> str:
"""Extract a short, safe diagnostic string from a CP error body."""
detail = body.get("error") or body.get("message") or ""
if not detail:
detail = json.dumps(body)
return detail[:max_len]
def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
return cp_api_json("POST", f"{cp_url}{REDEPLOY_PATH}", token, body)
"""POST /cp/admin/tenants/redeploy-fleet with bounded transient retry.
CP can return 502/503/504 when an upstream dependency (SSM, ECS, etc.)
flakes. Retry a small number of times with increasing backoff before
giving up and letting the caller surface the failure.
"""
url = f"{cp_url}{REDEPLOY_PATH}"
slugs = body.get("only_slugs") or []
slugs_text = ",".join(slugs)
total_attempts = 1 + REDEPLOY_MAX_RETRIES
status = 0
resp: dict = {}
for attempt in range(total_attempts):
status, resp = cp_api_json("POST", url, token, body)
if status not in REDEPLOY_RETRY_STATUSES:
return status, resp
detail = _redeploy_error_detail(resp)
if attempt < REDEPLOY_MAX_RETRIES:
delay = REDEPLOY_RETRY_DELAYS_SECONDS[attempt]
print(
f"::warning::redeploy-fleet returned HTTP {status} for "
f"only_slugs={slugs_text} at {url} "
f"(attempt {attempt + 1}/{total_attempts}, detail={detail!r}); "
f"retrying in {delay}s"
)
time.sleep(delay)
else:
print(
f"::warning::redeploy-fleet returned HTTP {status} for "
f"only_slugs={slugs_text} at {url} "
f"(attempt {attempt + 1}/{total_attempts}, detail={detail!r}); "
f"retries exhausted"
)
return status, resp
def _raise_for_redeploy_result(status: int, body: dict, slugs: list[str]) -> None:
if status != 200 or body.get("ok") is not True:
# Surface the CP error body when available so the operator sees the
# tenant-level reason (e.g. SSM timeout) instead of just the status.
detail = _redeploy_error_detail(body, max_len=500)
raise RuntimeError(
"redeploy scoped call failed for "
f"{','.join(slugs)}: HTTP {status}, ok={body.get('ok')}"
f"{','.join(slugs)}: HTTP {status}, ok={body.get('ok')}, detail={detail!r}"
)
@@ -2,6 +2,8 @@ import importlib.util
import sys
from pathlib import Path
import pytest
SCRIPT = Path(__file__).resolve().parents[1] / "prod-auto-deploy.py"
spec = importlib.util.spec_from_file_location("prod_auto_deploy", SCRIPT)
@@ -288,6 +290,98 @@ def test_plan_scoped_rollout_preserves_canary_then_batches():
]
def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd):
responses = [
(502, {"error": "Bad Gateway"}),
(503, {"error": "Service Unavailable"}),
(200, {"ok": True, "results": [{"slug": "hongming"}]}),
]
calls = []
sleeps = []
def fake_cp_api_json(_method, _url, _token, body):
calls.append(body)
return responses.pop(0)
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
monkeypatch.setattr(prod.time, "sleep", sleeps.append)
status, resp = prod.redeploy_scoped(
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
)
assert status == 200
assert resp["ok"] is True
assert len(calls) == 3
assert sleeps == [5, 10]
captured = capfd.readouterr().out
assert "attempt 1/4" in captured
assert "attempt 2/4" in captured
assert "Bad Gateway" in captured
assert "Service Unavailable" in captured
assert "/cp/admin/tenants/redeploy-fleet" in captured
def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
responses = [
(502, {"error": "Bad Gateway"}),
(504, {"error": "Gateway Timeout"}),
(503, {"error": "Service Unavailable"}),
(503, {"error": "Service Unavailable"}),
]
sleeps = []
def fake_cp_api_json(_method, _url, _token, _body):
return responses.pop(0)
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
monkeypatch.setattr(prod.time, "sleep", sleeps.append)
status, resp = prod.redeploy_scoped(
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
)
assert status == 503
assert resp["error"] == "Service Unavailable"
# No sleep after the final (4th) attempt.
assert sleeps == [5, 10, 20]
captured = capfd.readouterr().out
assert "attempt 4/4" in captured
assert "retries exhausted" in captured
assert "/cp/admin/tenants/redeploy-fleet" in captured
def test_redeploy_scoped_does_not_retry_non_transient_errors(monkeypatch):
calls = []
def fake_cp_api_json(_method, _url, _token, body):
calls.append(body)
return 500, {"error": "Internal Server Error"}
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
monkeypatch.setattr(prod.time, "sleep", lambda _s: pytest.fail("should not sleep on 500"))
status, resp = prod.redeploy_scoped(
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
)
assert status == 500
assert resp["error"] == "Internal Server Error"
assert len(calls) == 1
def test_raise_for_redeploy_result_surfaces_error_body():
with pytest.raises(RuntimeError) as exc_info:
prod._raise_for_redeploy_result(
502,
{"ok": False, "error": "upstream SSM throttled"},
["hongming"],
)
assert "HTTP 502" in str(exc_info.value)
assert "upstream SSM throttled" in str(exc_info.value)
assert "hongming" in str(exc_info.value)
def test_scoped_rollout_halts_after_failed_canary():
calls = []