fix(deploy): #2859 bounded retry + error surfacing for redeploy-fleet transient 502s #2862
@@ -26,6 +26,14 @@ DEFAULT_REQUIRED_CONTEXTS = [
|
||||
TERMINAL_FAILURE_STATES = {"failure", "error", "cancelled", "canceled", "skipped"}
|
||||
REDEPLOY_PATH = "/cp/admin/tenants/redeploy-fleet"
|
||||
|
||||
# Bounded retry for transient CP/gateway failures (e.g. 502 from an upstream
|
||||
# dependency like SSM during redeploy-fleet). A 502 on the canary should not
|
||||
# hard-halt the whole fleet rollout if a quick retry succeeds.
|
||||
REDEPLOY_RETRY_STATUSES = {502, 503, 504}
|
||||
# Initial attempt + this many retries. Delays are applied BEFORE each retry.
|
||||
REDEPLOY_MAX_RETRIES = 3
|
||||
REDEPLOY_RETRY_DELAYS_SECONDS = [5, 10, 20]
|
||||
|
||||
|
||||
def truthy_flag(value: str | None) -> bool:
|
||||
if value is None:
|
||||
@@ -214,15 +222,59 @@ def plan_rollout_slugs(cp_url: str, token: str, body: dict, redeploy=None) -> li
|
||||
return slugs
|
||||
|
||||
|
||||
def _redeploy_error_detail(body: dict, max_len: int = 200) -> str:
|
||||
"""Extract a short, safe diagnostic string from a CP error body."""
|
||||
detail = body.get("error") or body.get("message") or ""
|
||||
if not detail:
|
||||
detail = json.dumps(body)
|
||||
return detail[:max_len]
|
||||
|
||||
|
||||
def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
|
||||
return cp_api_json("POST", f"{cp_url}{REDEPLOY_PATH}", token, body)
|
||||
"""POST /cp/admin/tenants/redeploy-fleet with bounded transient retry.
|
||||
|
||||
CP can return 502/503/504 when an upstream dependency (SSM, ECS, etc.)
|
||||
flakes. Retry a small number of times with increasing backoff before
|
||||
giving up and letting the caller surface the failure.
|
||||
"""
|
||||
url = f"{cp_url}{REDEPLOY_PATH}"
|
||||
slugs = body.get("only_slugs") or []
|
||||
slugs_text = ",".join(slugs)
|
||||
total_attempts = 1 + REDEPLOY_MAX_RETRIES
|
||||
status = 0
|
||||
resp: dict = {}
|
||||
for attempt in range(total_attempts):
|
||||
status, resp = cp_api_json("POST", url, token, body)
|
||||
if status not in REDEPLOY_RETRY_STATUSES:
|
||||
return status, resp
|
||||
detail = _redeploy_error_detail(resp)
|
||||
if attempt < REDEPLOY_MAX_RETRIES:
|
||||
delay = REDEPLOY_RETRY_DELAYS_SECONDS[attempt]
|
||||
print(
|
||||
f"::warning::redeploy-fleet returned HTTP {status} for "
|
||||
f"only_slugs={slugs_text} at {url} "
|
||||
f"(attempt {attempt + 1}/{total_attempts}, detail={detail!r}); "
|
||||
f"retrying in {delay}s"
|
||||
)
|
||||
time.sleep(delay)
|
||||
else:
|
||||
print(
|
||||
f"::warning::redeploy-fleet returned HTTP {status} for "
|
||||
f"only_slugs={slugs_text} at {url} "
|
||||
f"(attempt {attempt + 1}/{total_attempts}, detail={detail!r}); "
|
||||
f"retries exhausted"
|
||||
)
|
||||
return status, resp
|
||||
|
||||
|
||||
def _raise_for_redeploy_result(status: int, body: dict, slugs: list[str]) -> None:
|
||||
if status != 200 or body.get("ok") is not True:
|
||||
# Surface the CP error body when available so the operator sees the
|
||||
# tenant-level reason (e.g. SSM timeout) instead of just the status.
|
||||
detail = _redeploy_error_detail(body, max_len=500)
|
||||
raise RuntimeError(
|
||||
"redeploy scoped call failed for "
|
||||
f"{','.join(slugs)}: HTTP {status}, ok={body.get('ok')}"
|
||||
f"{','.join(slugs)}: HTTP {status}, ok={body.get('ok')}, detail={detail!r}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
SCRIPT = Path(__file__).resolve().parents[1] / "prod-auto-deploy.py"
|
||||
spec = importlib.util.spec_from_file_location("prod_auto_deploy", SCRIPT)
|
||||
@@ -288,6 +290,98 @@ def test_plan_scoped_rollout_preserves_canary_then_batches():
|
||||
]
|
||||
|
||||
|
||||
def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd):
|
||||
responses = [
|
||||
(502, {"error": "Bad Gateway"}),
|
||||
(503, {"error": "Service Unavailable"}),
|
||||
(200, {"ok": True, "results": [{"slug": "hongming"}]}),
|
||||
]
|
||||
calls = []
|
||||
sleeps = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, body):
|
||||
calls.append(body)
|
||||
return responses.pop(0)
|
||||
|
||||
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
|
||||
monkeypatch.setattr(prod.time, "sleep", sleeps.append)
|
||||
|
||||
status, resp = prod.redeploy_scoped(
|
||||
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
|
||||
)
|
||||
|
||||
assert status == 200
|
||||
assert resp["ok"] is True
|
||||
assert len(calls) == 3
|
||||
assert sleeps == [5, 10]
|
||||
captured = capfd.readouterr().out
|
||||
assert "attempt 1/4" in captured
|
||||
assert "attempt 2/4" in captured
|
||||
assert "Bad Gateway" in captured
|
||||
assert "Service Unavailable" in captured
|
||||
assert "/cp/admin/tenants/redeploy-fleet" in captured
|
||||
|
||||
|
||||
def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
|
||||
responses = [
|
||||
(502, {"error": "Bad Gateway"}),
|
||||
(504, {"error": "Gateway Timeout"}),
|
||||
(503, {"error": "Service Unavailable"}),
|
||||
(503, {"error": "Service Unavailable"}),
|
||||
]
|
||||
sleeps = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, _body):
|
||||
return responses.pop(0)
|
||||
|
||||
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
|
||||
monkeypatch.setattr(prod.time, "sleep", sleeps.append)
|
||||
|
||||
status, resp = prod.redeploy_scoped(
|
||||
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
|
||||
)
|
||||
|
||||
assert status == 503
|
||||
assert resp["error"] == "Service Unavailable"
|
||||
# No sleep after the final (4th) attempt.
|
||||
assert sleeps == [5, 10, 20]
|
||||
captured = capfd.readouterr().out
|
||||
assert "attempt 4/4" in captured
|
||||
assert "retries exhausted" in captured
|
||||
assert "/cp/admin/tenants/redeploy-fleet" in captured
|
||||
|
||||
|
||||
def test_redeploy_scoped_does_not_retry_non_transient_errors(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, body):
|
||||
calls.append(body)
|
||||
return 500, {"error": "Internal Server Error"}
|
||||
|
||||
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
|
||||
monkeypatch.setattr(prod.time, "sleep", lambda _s: pytest.fail("should not sleep on 500"))
|
||||
|
||||
status, resp = prod.redeploy_scoped(
|
||||
"https://api.moleculesai.app", "token", {"only_slugs": ["hongming"]}
|
||||
)
|
||||
|
||||
assert status == 500
|
||||
assert resp["error"] == "Internal Server Error"
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
def test_raise_for_redeploy_result_surfaces_error_body():
|
||||
with pytest.raises(RuntimeError) as exc_info:
|
||||
prod._raise_for_redeploy_result(
|
||||
502,
|
||||
{"ok": False, "error": "upstream SSM throttled"},
|
||||
["hongming"],
|
||||
)
|
||||
assert "HTTP 502" in str(exc_info.value)
|
||||
assert "upstream SSM throttled" in str(exc_info.value)
|
||||
assert "hongming" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_scoped_rollout_halts_after_failed_canary():
|
||||
calls = []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user