fix(deploy): rollout POST read budget must exceed worst-case batch (#41) #3020
@@ -34,6 +34,27 @@ REDEPLOY_RETRY_STATUSES = {502, 503, 504}
|
||||
REDEPLOY_MAX_RETRIES = 3
|
||||
REDEPLOY_RETRY_DELAYS_SECONDS = [5, 10, 20]
|
||||
|
||||
# HTTP read budgets for the CP calls.
|
||||
#
|
||||
# DEFAULT_CP_HTTP_TIMEOUT_SECONDS bounds the fast calls (CI-status reads, the
|
||||
# dry-run plan enumeration) — they return in well under a second.
|
||||
#
|
||||
# ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS bounds the REAL (non-dry-run) rollout
|
||||
# POST, which blocks server-side for the whole batch: the CP redeploys a batch's
|
||||
# tenants CONCURRENTLY and only responds once the SLOWEST one finishes — up to
|
||||
# the CP's PerTenantTimeout (5m SSM default) plus the /healthz settle (90s) for a
|
||||
# stuck or dead box. The old hardcoded 120s read timeout was SHORTER than that
|
||||
# worst case, so a single unreachable tenant (e.g. a CF-525 box whose SSM agent
|
||||
# never answers — philbrew-erton, RFC#2843 #41) made this client abandon the
|
||||
# call with an empty response BEFORE the CP could return the per-tenant results
|
||||
# that the max_stragglers quarantine acts on. Net effect: the healthy majority
|
||||
# shipped, but the deploy reported ok=false (result_count=0) and SKIPPED the
|
||||
# :latest promote — so brand-new provisions kept pulling the stale image. Give
|
||||
# the rollout POST a budget that comfortably exceeds the concurrent-batch worst
|
||||
# case so the designed quarantine path can actually run.
|
||||
DEFAULT_CP_HTTP_TIMEOUT_SECONDS = 120
|
||||
ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS = 600
|
||||
|
||||
|
||||
def truthy_flag(value: str | None) -> bool:
|
||||
if value is None:
|
||||
@@ -107,6 +128,15 @@ def build_plan(env: dict[str, str]) -> dict:
|
||||
"target_tag": target_tag,
|
||||
"cp_url": cp_url,
|
||||
"body": body,
|
||||
# Read budget for the real rollout POST (see ROLLOUT_HTTP_TIMEOUT_*).
|
||||
# Floor at the fast-call default so a misconfig can't shrink it below
|
||||
# the dry-run budget.
|
||||
"rollout_http_timeout": _int_env(
|
||||
env,
|
||||
"PROD_AUTO_DEPLOY_ROLLOUT_HTTP_TIMEOUT_SECONDS",
|
||||
ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS,
|
||||
minimum=DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@@ -184,7 +214,13 @@ def scoped_redeploy_body(base: dict, slugs: list[str]) -> dict:
|
||||
return body
|
||||
|
||||
|
||||
def cp_api_json(method: str, url: str, token: str, body: dict | None = None) -> tuple[int, dict]:
|
||||
def cp_api_json(
|
||||
method: str,
|
||||
url: str,
|
||||
token: str,
|
||||
body: dict | None = None,
|
||||
timeout: int = DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
|
||||
) -> tuple[int, dict]:
|
||||
data = None
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
@@ -195,7 +231,7 @@ def cp_api_json(method: str, url: str, token: str, body: dict | None = None) ->
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=120) as resp:
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return resp.status, json.loads(resp.read())
|
||||
except urllib.error.HTTPError as exc:
|
||||
raw = exc.read().decode("utf-8", errors="replace")
|
||||
@@ -204,6 +240,14 @@ def cp_api_json(method: str, url: str, token: str, body: dict | None = None) ->
|
||||
except json.JSONDecodeError:
|
||||
parsed = {"error": raw[:500]}
|
||||
return exc.code, parsed
|
||||
except (TimeoutError, urllib.error.URLError) as exc:
|
||||
# A socket read timeout (or transient connection drop) is NOT an HTTP
|
||||
# status — without this branch it propagates as an unhandled exception
|
||||
# and crashes the run with a bare "read operation timed out" (the
|
||||
# RFC#2843 #41 symptom). Surface it as a synthetic 504 so the bounded
|
||||
# redeploy retry treats it like an upstream gateway timeout.
|
||||
reason = getattr(exc, "reason", exc)
|
||||
return 504, {"error": f"request to {url} timed out or failed: {reason}"}
|
||||
|
||||
|
||||
def plan_rollout_slugs(cp_url: str, token: str, body: dict, redeploy=None) -> list[str]:
|
||||
@@ -230,12 +274,22 @@ def _redeploy_error_detail(body: dict, max_len: int = 200) -> str:
|
||||
return detail[:max_len]
|
||||
|
||||
|
||||
def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
|
||||
def redeploy_scoped(
|
||||
cp_url: str,
|
||||
token: str,
|
||||
body: dict,
|
||||
timeout: int = DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
|
||||
) -> tuple[int, dict]:
|
||||
"""POST /cp/admin/tenants/redeploy-fleet with bounded transient retry.
|
||||
|
||||
CP can return 502/503/504 when an upstream dependency (SSM, ECS, etc.)
|
||||
flakes. Retry a small number of times with increasing backoff before
|
||||
giving up and letting the caller surface the failure.
|
||||
|
||||
``timeout`` is the per-attempt HTTP read budget. The real rollout passes
|
||||
ROLLOUT_HTTP_TIMEOUT_* so a slow/dead tenant can't time the client out
|
||||
before the CP returns the per-tenant results (RFC#2843 #41); dry-run /
|
||||
plan calls keep the fast default.
|
||||
"""
|
||||
url = f"{cp_url}{REDEPLOY_PATH}"
|
||||
slugs = body.get("only_slugs") or []
|
||||
@@ -244,7 +298,7 @@ def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
|
||||
status = 0
|
||||
resp: dict = {}
|
||||
for attempt in range(total_attempts):
|
||||
status, resp = cp_api_json("POST", url, token, body)
|
||||
status, resp = cp_api_json("POST", url, token, body, timeout=timeout)
|
||||
if status not in REDEPLOY_RETRY_STATUSES:
|
||||
return status, resp
|
||||
detail = _redeploy_error_detail(resp)
|
||||
@@ -357,18 +411,30 @@ def execute_scoped_rollout(
|
||||
) -> dict:
|
||||
cp_url = plan["cp_url"]
|
||||
base_body = plan["body"]
|
||||
rollout_timeout = int(plan.get("rollout_http_timeout") or ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS)
|
||||
all_slugs = list_slugs(cp_url, token, base_body)
|
||||
batch_size = int(base_body.get("batch_size") or 1)
|
||||
canary_slug = str(base_body.get("canary_slug") or "").strip()
|
||||
dry_run = bool(base_body.get("dry_run"))
|
||||
aggregate = {"ok": True, "results": []}
|
||||
|
||||
# The real rollout POST blocks for the whole concurrent batch; give it the
|
||||
# elevated read budget so a slow/dead tenant can't time the client out
|
||||
# before the CP returns results (RFC#2843 #41). Only the production
|
||||
# redeploy_scoped accepts a timeout; an injected test double keeps its
|
||||
# 3-arg shape, so wrap only when we hold the real callable.
|
||||
if redeploy is redeploy_scoped:
|
||||
def rollout_redeploy(u, t, b):
|
||||
return redeploy_scoped(u, t, b, timeout=rollout_timeout)
|
||||
else:
|
||||
rollout_redeploy = redeploy
|
||||
|
||||
if canary_slug:
|
||||
if canary_slug not in all_slugs:
|
||||
raise RuntimeError(f"configured canary slug {canary_slug!r} is not a running tenant")
|
||||
body = scoped_redeploy_body(base_body, [canary_slug])
|
||||
print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(body['only_slugs'])}")
|
||||
status, resp = redeploy(cp_url, token, body)
|
||||
status, resp = rollout_redeploy(cp_url, token, body)
|
||||
aggregate["results"].extend(resp.get("results") or [])
|
||||
try:
|
||||
_raise_for_redeploy_result(status, resp, [canary_slug])
|
||||
@@ -385,7 +451,7 @@ def execute_scoped_rollout(
|
||||
for group in chunks(remaining, batch_size):
|
||||
body = scoped_redeploy_body(base_body, group)
|
||||
print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(group)}")
|
||||
status, resp = redeploy(cp_url, token, body)
|
||||
status, resp = rollout_redeploy(cp_url, token, body)
|
||||
aggregate["results"].extend(resp.get("results") or [])
|
||||
try:
|
||||
_raise_for_redeploy_result(status, resp, group)
|
||||
|
||||
@@ -290,6 +290,61 @@ def test_plan_scoped_rollout_preserves_canary_then_batches():
|
||||
]
|
||||
|
||||
|
||||
def test_rollout_uses_elevated_http_timeout(monkeypatch):
|
||||
"""RFC#2843 #41: the real rollout POST must use the elevated read budget so
|
||||
a slow/dead tenant can't time the client out before the CP returns results.
|
||||
"""
|
||||
seen_timeouts = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
|
||||
seen_timeouts.append(timeout)
|
||||
# Return a verified result so coverage passes and the loop completes.
|
||||
return 200, {"ok": True, "results": [{"slug": s, "verified_on_target": True}
|
||||
for s in (body.get("only_slugs") or [])]}
|
||||
|
||||
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
|
||||
|
||||
plan = {
|
||||
"cp_url": "https://api.moleculesai.app",
|
||||
"rollout_http_timeout": 600,
|
||||
"body": {"target_tag": "staging-abc", "batch_size": 3, "max_stragglers": 1},
|
||||
}
|
||||
prod.execute_scoped_rollout(
|
||||
plan,
|
||||
"token",
|
||||
list_slugs=lambda _u, _t, _b: ["reno-stars", "philbrew-erton"],
|
||||
)
|
||||
# Every real rollout POST went through redeploy_scoped → cp_api_json with the
|
||||
# elevated budget, never the fast 120s default.
|
||||
assert seen_timeouts, "expected at least one rollout POST"
|
||||
assert all(t == 600 for t in seen_timeouts), seen_timeouts
|
||||
|
||||
|
||||
def test_cp_api_json_socket_timeout_becomes_retryable_504(monkeypatch):
|
||||
"""A bare socket read timeout must surface as a synthetic 504 (retryable),
|
||||
not crash the run with an unhandled exception (RFC#2843 #41).
|
||||
"""
|
||||
def boom(_req, timeout=None):
|
||||
raise TimeoutError("read operation timed out")
|
||||
|
||||
monkeypatch.setattr(prod.urllib.request, "urlopen", boom)
|
||||
status, body = prod.cp_api_json("POST", "https://api.moleculesai.app/x", "tok", {"a": 1}, timeout=5)
|
||||
assert status == 504
|
||||
assert "timed out" in body["error"]
|
||||
assert status in prod.REDEPLOY_RETRY_STATUSES
|
||||
|
||||
|
||||
def test_build_plan_sets_rollout_timeout_default_and_floor():
|
||||
base_env = {"GITHUB_SHA": "deadbeef0000"}
|
||||
plan = prod.build_plan(dict(base_env))
|
||||
assert plan["rollout_http_timeout"] == prod.ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS
|
||||
# A configured value below the fast-call floor is rejected (keeps the rollout
|
||||
# budget from ever shrinking below the dry-run budget).
|
||||
import pytest as _pytest
|
||||
with _pytest.raises(ValueError):
|
||||
prod.build_plan({**base_env, "PROD_AUTO_DEPLOY_ROLLOUT_HTTP_TIMEOUT_SECONDS": "30"})
|
||||
|
||||
|
||||
def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd):
|
||||
responses = [
|
||||
(502, {"error": "Bad Gateway"}),
|
||||
@@ -299,7 +354,7 @@ def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd)
|
||||
calls = []
|
||||
sleeps = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, body):
|
||||
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
|
||||
calls.append(body)
|
||||
return responses.pop(0)
|
||||
|
||||
@@ -331,7 +386,7 @@ def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
|
||||
]
|
||||
sleeps = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, _body):
|
||||
def fake_cp_api_json(_method, _url, _token, _body, timeout=None):
|
||||
return responses.pop(0)
|
||||
|
||||
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
|
||||
@@ -354,7 +409,7 @@ def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
|
||||
def test_redeploy_scoped_does_not_retry_non_transient_errors(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_cp_api_json(_method, _url, _token, body):
|
||||
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
|
||||
calls.append(body)
|
||||
return 500, {"error": "Internal Server Error"}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user