fix(deploy): rollout POST read budget must exceed worst-case batch (#41) #3020

Merged
core-devops merged 1 commits from fix/rfc2843-41-rollout-http-timeout into main 2026-06-17 20:41:28 +00:00
2 changed files with 130 additions and 9 deletions
+72 -6
View File
@@ -34,6 +34,27 @@ REDEPLOY_RETRY_STATUSES = {502, 503, 504}
REDEPLOY_MAX_RETRIES = 3
REDEPLOY_RETRY_DELAYS_SECONDS = [5, 10, 20]
# HTTP read budgets for the CP calls.
#
# DEFAULT_CP_HTTP_TIMEOUT_SECONDS bounds the fast calls (CI-status reads, the
# dry-run plan enumeration) — they return in well under a second.
#
# ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS bounds the REAL (non-dry-run) rollout
# POST, which blocks server-side for the whole batch: the CP redeploys a batch's
# tenants CONCURRENTLY and only responds once the SLOWEST one finishes — up to
# the CP's PerTenantTimeout (5m SSM default) plus the /healthz settle (90s) for a
# stuck or dead box. The old hardcoded 120s read timeout was SHORTER than that
# worst case, so a single unreachable tenant (e.g. a CF-525 box whose SSM agent
# never answers — philbrew-erton, RFC#2843 #41) made this client abandon the
# call with an empty response BEFORE the CP could return the per-tenant results
# that the max_stragglers quarantine acts on. Net effect: the healthy majority
# shipped, but the deploy reported ok=false (result_count=0) and SKIPPED the
# :latest promote — so brand-new provisions kept pulling the stale image. Give
# the rollout POST a budget that comfortably exceeds the concurrent-batch worst
# case so the designed quarantine path can actually run.
DEFAULT_CP_HTTP_TIMEOUT_SECONDS = 120
ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS = 600
def truthy_flag(value: str | None) -> bool:
if value is None:
@@ -107,6 +128,15 @@ def build_plan(env: dict[str, str]) -> dict:
"target_tag": target_tag,
"cp_url": cp_url,
"body": body,
# Read budget for the real rollout POST (see ROLLOUT_HTTP_TIMEOUT_*).
# Floor at the fast-call default so a misconfig can't shrink it below
# the dry-run budget.
"rollout_http_timeout": _int_env(
env,
"PROD_AUTO_DEPLOY_ROLLOUT_HTTP_TIMEOUT_SECONDS",
ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS,
minimum=DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
),
}
@@ -184,7 +214,13 @@ def scoped_redeploy_body(base: dict, slugs: list[str]) -> dict:
return body
def cp_api_json(method: str, url: str, token: str, body: dict | None = None) -> tuple[int, dict]:
def cp_api_json(
method: str,
url: str,
token: str,
body: dict | None = None,
timeout: int = DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
) -> tuple[int, dict]:
data = None
headers = {
"Authorization": f"Bearer {token}",
@@ -195,7 +231,7 @@ def cp_api_json(method: str, url: str, token: str, body: dict | None = None) ->
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
@@ -204,6 +240,14 @@ def cp_api_json(method: str, url: str, token: str, body: dict | None = None) ->
except json.JSONDecodeError:
parsed = {"error": raw[:500]}
return exc.code, parsed
except (TimeoutError, urllib.error.URLError) as exc:
# A socket read timeout (or transient connection drop) is NOT an HTTP
# status — without this branch it propagates as an unhandled exception
# and crashes the run with a bare "read operation timed out" (the
# RFC#2843 #41 symptom). Surface it as a synthetic 504 so the bounded
# redeploy retry treats it like an upstream gateway timeout.
reason = getattr(exc, "reason", exc)
return 504, {"error": f"request to {url} timed out or failed: {reason}"}
def plan_rollout_slugs(cp_url: str, token: str, body: dict, redeploy=None) -> list[str]:
@@ -230,12 +274,22 @@ def _redeploy_error_detail(body: dict, max_len: int = 200) -> str:
return detail[:max_len]
def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
def redeploy_scoped(
cp_url: str,
token: str,
body: dict,
timeout: int = DEFAULT_CP_HTTP_TIMEOUT_SECONDS,
) -> tuple[int, dict]:
"""POST /cp/admin/tenants/redeploy-fleet with bounded transient retry.
CP can return 502/503/504 when an upstream dependency (SSM, ECS, etc.)
flakes. Retry a small number of times with increasing backoff before
giving up and letting the caller surface the failure.
``timeout`` is the per-attempt HTTP read budget. The real rollout passes
ROLLOUT_HTTP_TIMEOUT_* so a slow/dead tenant can't time the client out
before the CP returns the per-tenant results (RFC#2843 #41); dry-run /
plan calls keep the fast default.
"""
url = f"{cp_url}{REDEPLOY_PATH}"
slugs = body.get("only_slugs") or []
@@ -244,7 +298,7 @@ def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]:
status = 0
resp: dict = {}
for attempt in range(total_attempts):
status, resp = cp_api_json("POST", url, token, body)
status, resp = cp_api_json("POST", url, token, body, timeout=timeout)
if status not in REDEPLOY_RETRY_STATUSES:
return status, resp
detail = _redeploy_error_detail(resp)
@@ -357,18 +411,30 @@ def execute_scoped_rollout(
) -> dict:
cp_url = plan["cp_url"]
base_body = plan["body"]
rollout_timeout = int(plan.get("rollout_http_timeout") or ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS)
all_slugs = list_slugs(cp_url, token, base_body)
batch_size = int(base_body.get("batch_size") or 1)
canary_slug = str(base_body.get("canary_slug") or "").strip()
dry_run = bool(base_body.get("dry_run"))
aggregate = {"ok": True, "results": []}
# The real rollout POST blocks for the whole concurrent batch; give it the
# elevated read budget so a slow/dead tenant can't time the client out
# before the CP returns results (RFC#2843 #41). Only the production
# redeploy_scoped accepts a timeout; an injected test double keeps its
# 3-arg shape, so wrap only when we hold the real callable.
if redeploy is redeploy_scoped:
def rollout_redeploy(u, t, b):
return redeploy_scoped(u, t, b, timeout=rollout_timeout)
else:
rollout_redeploy = redeploy
if canary_slug:
if canary_slug not in all_slugs:
raise RuntimeError(f"configured canary slug {canary_slug!r} is not a running tenant")
body = scoped_redeploy_body(base_body, [canary_slug])
print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(body['only_slugs'])}")
status, resp = redeploy(cp_url, token, body)
status, resp = rollout_redeploy(cp_url, token, body)
aggregate["results"].extend(resp.get("results") or [])
try:
_raise_for_redeploy_result(status, resp, [canary_slug])
@@ -385,7 +451,7 @@ def execute_scoped_rollout(
for group in chunks(remaining, batch_size):
body = scoped_redeploy_body(base_body, group)
print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(group)}")
status, resp = redeploy(cp_url, token, body)
status, resp = rollout_redeploy(cp_url, token, body)
aggregate["results"].extend(resp.get("results") or [])
try:
_raise_for_redeploy_result(status, resp, group)
+58 -3
View File
@@ -290,6 +290,61 @@ def test_plan_scoped_rollout_preserves_canary_then_batches():
]
def test_rollout_uses_elevated_http_timeout(monkeypatch):
"""RFC#2843 #41: the real rollout POST must use the elevated read budget so
a slow/dead tenant can't time the client out before the CP returns results.
"""
seen_timeouts = []
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
seen_timeouts.append(timeout)
# Return a verified result so coverage passes and the loop completes.
return 200, {"ok": True, "results": [{"slug": s, "verified_on_target": True}
for s in (body.get("only_slugs") or [])]}
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
plan = {
"cp_url": "https://api.moleculesai.app",
"rollout_http_timeout": 600,
"body": {"target_tag": "staging-abc", "batch_size": 3, "max_stragglers": 1},
}
prod.execute_scoped_rollout(
plan,
"token",
list_slugs=lambda _u, _t, _b: ["reno-stars", "philbrew-erton"],
)
# Every real rollout POST went through redeploy_scoped → cp_api_json with the
# elevated budget, never the fast 120s default.
assert seen_timeouts, "expected at least one rollout POST"
assert all(t == 600 for t in seen_timeouts), seen_timeouts
def test_cp_api_json_socket_timeout_becomes_retryable_504(monkeypatch):
"""A bare socket read timeout must surface as a synthetic 504 (retryable),
not crash the run with an unhandled exception (RFC#2843 #41).
"""
def boom(_req, timeout=None):
raise TimeoutError("read operation timed out")
monkeypatch.setattr(prod.urllib.request, "urlopen", boom)
status, body = prod.cp_api_json("POST", "https://api.moleculesai.app/x", "tok", {"a": 1}, timeout=5)
assert status == 504
assert "timed out" in body["error"]
assert status in prod.REDEPLOY_RETRY_STATUSES
def test_build_plan_sets_rollout_timeout_default_and_floor():
base_env = {"GITHUB_SHA": "deadbeef0000"}
plan = prod.build_plan(dict(base_env))
assert plan["rollout_http_timeout"] == prod.ROLLOUT_HTTP_TIMEOUT_DEFAULT_SECONDS
# A configured value below the fast-call floor is rejected (keeps the rollout
# budget from ever shrinking below the dry-run budget).
import pytest as _pytest
with _pytest.raises(ValueError):
prod.build_plan({**base_env, "PROD_AUTO_DEPLOY_ROLLOUT_HTTP_TIMEOUT_SECONDS": "30"})
def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd):
responses = [
(502, {"error": "Bad Gateway"}),
@@ -299,7 +354,7 @@ def test_redeploy_scoped_retries_transient_502_then_succeeds(monkeypatch, capfd)
calls = []
sleeps = []
def fake_cp_api_json(_method, _url, _token, body):
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
calls.append(body)
return responses.pop(0)
@@ -331,7 +386,7 @@ def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
]
sleeps = []
def fake_cp_api_json(_method, _url, _token, _body):
def fake_cp_api_json(_method, _url, _token, _body, timeout=None):
return responses.pop(0)
monkeypatch.setattr(prod, "cp_api_json", fake_cp_api_json)
@@ -354,7 +409,7 @@ def test_redeploy_scoped_gives_up_after_max_retries(monkeypatch, capfd):
def test_redeploy_scoped_does_not_retry_non_transient_errors(monkeypatch):
calls = []
def fake_cp_api_json(_method, _url, _token, body):
def fake_cp_api_json(_method, _url, _token, body, timeout=None):
calls.append(body)
return 500, {"error": "Internal Server Error"}