diff --git a/.gitea/scripts/prod-auto-deploy.py b/.gitea/scripts/prod-auto-deploy.py index 67fd46d43..9fff1205a 100644 --- a/.gitea/scripts/prod-auto-deploy.py +++ b/.gitea/scripts/prod-auto-deploy.py @@ -25,6 +25,7 @@ DEFAULT_REQUIRED_CONTEXTS = [ "Secret scan / Scan diff for credential-shaped strings (push)", ] TERMINAL_FAILURE_STATES = {"failure", "error", "cancelled", "canceled", "skipped"} +REDEPLOY_PATH = "/cp/admin/tenants/redeploy-fleet" def truthy_flag(value: str | None) -> bool: @@ -130,6 +131,154 @@ def required_contexts(env: dict[str, str]) -> list[str]: return [line.strip() for line in raw.replace(",", "\n").splitlines() if line.strip()] +def chunks(items: list[str], size: int) -> list[list[str]]: + return [items[i : i + size] for i in range(0, len(items), size)] + + +class RolloutFailed(RuntimeError): + def __init__(self, message: str, response: dict): + super().__init__(message) + self.response = response + + +def slugs_from_redeploy_response(body: dict) -> list[str]: + slugs: list[str] = [] + for row in body.get("results") or []: + slug = str(row.get("slug") or "").strip() + if slug: + slugs.append(slug) + return slugs + + +def scoped_redeploy_body(base: dict, slugs: list[str]) -> dict: + body = dict(base) + body.pop("canary_slug", None) + body["only_slugs"] = slugs + body["soak_seconds"] = 0 + body["batch_size"] = max(1, len(slugs)) + return body + + +def cp_api_json(method: str, url: str, token: str, body: dict | None = None) -> tuple[int, dict]: + data = None + headers = { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + return resp.status, json.loads(resp.read()) + except urllib.error.HTTPError as exc: + raw = exc.read().decode("utf-8", errors="replace") + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + parsed = {"error": raw[:500]} + return exc.code, parsed + + +def plan_rollout_slugs(cp_url: str, token: str, body: dict, redeploy=None) -> list[str]: + if redeploy is None: + redeploy = redeploy_scoped + dry_run_body = dict(body) + dry_run_body["dry_run"] = True + status, resp = redeploy(cp_url, token, dry_run_body) + if status != 200: + raise RuntimeError(f"dry-run redeploy-fleet returned HTTP {status}: {resp.get('error', '')}") + if resp.get("ok") is not True: + raise RuntimeError(f"dry-run redeploy-fleet reported ok={resp.get('ok')}: {resp.get('error', '')}") + slugs = slugs_from_redeploy_response(resp) + if not slugs: + raise RuntimeError("dry-run redeploy-fleet returned no rollout candidates") + return slugs + + +def redeploy_scoped(cp_url: str, token: str, body: dict) -> tuple[int, dict]: + return cp_api_json("POST", f"{cp_url}{REDEPLOY_PATH}", token, body) + + +def _raise_for_redeploy_result(status: int, body: dict, slugs: list[str]) -> None: + if status != 200 or body.get("ok") is not True: + raise RuntimeError( + "redeploy scoped call failed for " + f"{','.join(slugs)}: HTTP {status}, ok={body.get('ok')}" + ) + + +def execute_scoped_rollout( + plan: dict, + token: str, + list_slugs=plan_rollout_slugs, + redeploy=redeploy_scoped, + sleep=time.sleep, +) -> dict: + cp_url = plan["cp_url"] + base_body = plan["body"] + all_slugs = list_slugs(cp_url, token, base_body) + batch_size = int(base_body.get("batch_size") or 1) + canary_slug = str(base_body.get("canary_slug") or "").strip() + dry_run = bool(base_body.get("dry_run")) + aggregate = {"ok": True, "results": []} + + if canary_slug: + if canary_slug not in all_slugs: + raise RuntimeError(f"configured canary slug {canary_slug!r} is not a running tenant") + body = scoped_redeploy_body(base_body, [canary_slug]) + print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(body['only_slugs'])}") + status, resp = redeploy(cp_url, token, body) + aggregate["results"].extend(resp.get("results") or []) + try: + _raise_for_redeploy_result(status, resp, [canary_slug]) + except RuntimeError as exc: + aggregate["ok"] = False + aggregate["error"] = str(exc) + raise RolloutFailed(str(exc), aggregate) from exc + soak_seconds = int(base_body.get("soak_seconds") or 0) + if soak_seconds > 0 and not dry_run: + print(f"Canary passed; soaking locally for {soak_seconds}s") + sleep(soak_seconds) + + remaining = [slug for slug in all_slugs if slug != canary_slug] + for group in chunks(remaining, batch_size): + body = scoped_redeploy_body(base_body, group) + print(f"POST {cp_url}{REDEPLOY_PATH} only_slugs={','.join(group)}") + status, resp = redeploy(cp_url, token, body) + aggregate["results"].extend(resp.get("results") or []) + try: + _raise_for_redeploy_result(status, resp, group) + except RuntimeError as exc: + aggregate["ok"] = False + aggregate["error"] = str(exc) + raise RolloutFailed(str(exc), aggregate) from exc + + return aggregate + + +def rollout_from_plan_file(plan_path: str, response_path: str, env: dict[str, str]) -> None: + token = env.get("CP_ADMIN_API_TOKEN", "").strip() + if not token: + raise ValueError("CP_ADMIN_API_TOKEN is required for production auto-deploy") + with open(plan_path, "r", encoding="utf-8") as fh: + plan = json.load(fh) + if not plan.get("enabled"): + raise RuntimeError("production auto-deploy plan is disabled") + try: + response = execute_scoped_rollout(plan, token) + except RolloutFailed as exc: + response = exc.response + with open(response_path, "w", encoding="utf-8") as fh: + json.dump(response, fh, sort_keys=True) + fh.write("\n") + raise + with open(response_path, "w", encoding="utf-8") as fh: + json.dump(response, fh, sort_keys=True) + fh.write("\n") + + def _api_json(url: str, token: str) -> dict: req = urllib.request.Request(url, headers={"Authorization": f"token {token}"}) try: @@ -231,6 +380,9 @@ def main() -> int: sub.add_parser("plan", help="print production deploy plan as JSON") sub.add_parser("assert-enabled", help="fail if production deploy is currently disabled") sub.add_parser("wait-ci", help="block until required CI context is green") + rollout_parser = sub.add_parser("rollout", help="execute canary-first scoped production rollout") + rollout_parser.add_argument("--plan", required=True, help="path to prod-auto-deploy plan JSON") + rollout_parser.add_argument("--response", required=True, help="path to write aggregate response JSON") args = parser.parse_args() try: @@ -243,6 +395,9 @@ def main() -> int: if args.command == "wait-ci": wait_for_ci_context(dict(os.environ)) return 0 + if args.command == "rollout": + rollout_from_plan_file(args.plan, args.response, dict(os.environ)) + return 0 except Exception as exc: # noqa: BLE001 - CLI should render operator-friendly errors. print(f"::error::{exc}", file=sys.stderr) return 1 diff --git a/.gitea/scripts/tests/test_prod_auto_deploy.py b/.gitea/scripts/tests/test_prod_auto_deploy.py index f1417c8a1..f3e92548a 100644 --- a/.gitea/scripts/tests/test_prod_auto_deploy.py +++ b/.gitea/scripts/tests/test_prod_auto_deploy.py @@ -153,3 +153,205 @@ def test_default_required_contexts_delegate_path_gating_to_all_required(): "CI / all-required (push)", "Secret scan / Scan diff for credential-shaped strings (push)", ] + + +def test_slugs_from_redeploy_response_uses_controlplane_plan_rows(): + body = { + "results": [ + {"slug": "hongming", "phase": "canary", "ssm_status": "DryRun"}, + {"slug": "tenant-a", "phase": "batch-1", "ssm_status": "DryRun"}, + {"slug": "", "phase": "batch-1", "ssm_status": "DryRun"}, + {"phase": "batch-1", "ssm_status": "DryRun"}, + ] + } + + assert prod.slugs_from_redeploy_response(body) == ["hongming", "tenant-a"] + + +def test_plan_rollout_slugs_asks_controlplane_for_dry_run_plan(): + calls = [] + + def fake_redeploy(_cp_url, _token, body): + calls.append(body) + return 200, { + "ok": True, + "results": [ + {"slug": "hongming", "phase": "canary", "ssm_status": "DryRun"}, + {"slug": "tenant-a", "phase": "batch-1", "ssm_status": "DryRun"}, + ], + } + + slugs = prod.plan_rollout_slugs( + "https://api.moleculesai.app", + "secret", + { + "target_tag": "staging-abcdef1", + "canary_slug": "hongming", + "soak_seconds": 60, + "batch_size": 3, + "dry_run": False, + "confirm": True, + }, + redeploy=fake_redeploy, + ) + + assert slugs == ["hongming", "tenant-a"] + assert calls == [ + { + "target_tag": "staging-abcdef1", + "canary_slug": "hongming", + "soak_seconds": 60, + "batch_size": 3, + "dry_run": True, + "confirm": True, + } + ] + + +def test_scoped_redeploy_body_removes_canary_and_local_soak(): + base = { + "target_tag": "staging-abcdef1", + "canary_slug": "hongming", + "soak_seconds": 60, + "batch_size": 3, + "dry_run": False, + "confirm": True, + } + + scoped = prod.scoped_redeploy_body(base, ["tenant-a", "tenant-b"]) + + assert scoped == { + "target_tag": "staging-abcdef1", + "soak_seconds": 0, + "batch_size": 2, + "dry_run": False, + "confirm": True, + "only_slugs": ["tenant-a", "tenant-b"], + } + + +def test_plan_scoped_rollout_preserves_canary_then_batches(): + calls, sleeps = [], [] + + def fake_list(_cp_url, _token, _body): + return ["tenant-a", "hongming", "tenant-b", "tenant-c"] + + def fake_redeploy(_cp_url, _token, body): + calls.append(body) + return 200, { + "ok": True, + "results": [{"slug": slug, "healthz_ok": True} for slug in body["only_slugs"]], + } + + aggregate = prod.execute_scoped_rollout( + { + "cp_url": "https://api.moleculesai.app", + "body": { + "target_tag": "staging-abcdef1", + "canary_slug": "hongming", + "soak_seconds": 60, + "batch_size": 2, + "dry_run": False, + "confirm": True, + }, + }, + token="secret", + list_slugs=fake_list, + redeploy=fake_redeploy, + sleep=sleeps.append, + ) + + assert [call["only_slugs"] for call in calls] == [ + ["hongming"], + ["tenant-a", "tenant-b"], + ["tenant-c"], + ] + assert sleeps == [60] + assert aggregate["ok"] is True + assert [result["slug"] for result in aggregate["results"]] == [ + "hongming", + "tenant-a", + "tenant-b", + "tenant-c", + ] + + +def test_scoped_rollout_halts_after_failed_canary(): + calls = [] + + def fake_redeploy(_cp_url, _token, body): + calls.append(body) + return 200, {"ok": False, "results": [{"slug": body["only_slugs"][0], "error": "bad"}]} + + try: + prod.execute_scoped_rollout( + { + "cp_url": "https://api.moleculesai.app", + "body": { + "target_tag": "staging-abcdef1", + "canary_slug": "hongming", + "soak_seconds": 60, + "batch_size": 2, + "dry_run": False, + "confirm": True, + }, + }, + token="secret", + list_slugs=lambda _cp_url, _token, _body: ["hongming", "tenant-a"], + redeploy=fake_redeploy, + sleep=lambda _seconds: None, + ) + except prod.RolloutFailed as exc: + assert "redeploy scoped call failed" in str(exc) + assert exc.response["ok"] is False + assert exc.response["results"] == [{"slug": "hongming", "error": "bad"}] + else: + raise AssertionError("expected failed canary to halt rollout") + + assert [call["only_slugs"] for call in calls] == [["hongming"]] + + +def test_rollout_from_plan_file_writes_partial_response_on_failure(tmp_path): + plan_path = tmp_path / "plan.json" + response_path = tmp_path / "response.json" + plan_path.write_text( + """ + { + "enabled": true, + "cp_url": "https://api.moleculesai.app", + "body": {"target_tag": "staging-abcdef1", "confirm": true} + } + """, + encoding="utf-8", + ) + + original = prod.execute_scoped_rollout + + def fake_execute(_plan, _token): + raise prod.RolloutFailed( + "redeploy scoped call failed for hongming: HTTP 500, ok=false", + { + "ok": False, + "error": "redeploy scoped call failed for hongming: HTTP 500, ok=false", + "results": [{"slug": "hongming", "error": "bad"}], + }, + ) + + prod.execute_scoped_rollout = fake_execute + try: + try: + prod.rollout_from_plan_file( + str(plan_path), + str(response_path), + {"CP_ADMIN_API_TOKEN": "secret"}, + ) + except prod.RolloutFailed: + pass + else: + raise AssertionError("expected rollout failure") + finally: + prod.execute_scoped_rollout = original + + assert response_path.read_text(encoding="utf-8").strip() + assert '"ok": false' in response_path.read_text(encoding="utf-8") + assert '"slug": "hongming"' in response_path.read_text(encoding="utf-8") diff --git a/.gitea/workflows/publish-workspace-server-image.yml b/.gitea/workflows/publish-workspace-server-image.yml index 7385de981..e07dafd9c 100644 --- a/.gitea/workflows/publish-workspace-server-image.yml +++ b/.gitea/workflows/publish-workspace-server-image.yml @@ -303,26 +303,19 @@ jobs: python3 .gitea/scripts/prod-auto-deploy.py assert-enabled PLAN="$RUNNER_TEMP/prod-auto-deploy-plan.json" TARGET_TAG="$(jq -r '.target_tag' "$PLAN")" - BODY="$(jq -c '.body' "$PLAN")" - - echo "POST $CP_URL/cp/admin/tenants/redeploy-fleet" - echo " target_tag: $TARGET_TAG" - echo " body: $BODY" HTTP_RESPONSE="$RUNNER_TEMP/prod-redeploy-response.json" - HTTP_CODE_FILE="$RUNNER_TEMP/prod-redeploy-http-code.txt" set +e - curl -sS -o "$HTTP_RESPONSE" -w '%{http_code}' \ - -m 1200 \ - -H "Authorization: Bearer $CP_ADMIN_API_TOKEN" \ - -H "Content-Type: application/json" \ - -X POST "$CP_URL/cp/admin/tenants/redeploy-fleet" \ - -d "$BODY" > "$HTTP_CODE_FILE" + python3 .gitea/scripts/prod-auto-deploy.py rollout \ + --plan "$PLAN" \ + --response "$HTTP_RESPONSE" + ROLLOUT_EXIT=$? set -e - HTTP_CODE="$(cat "$HTTP_CODE_FILE" 2>/dev/null || echo "000")" - [ -z "$HTTP_CODE" ] && HTTP_CODE="000" - echo "HTTP $HTTP_CODE" + if [ ! -s "$HTTP_RESPONSE" ]; then + jq -nc --arg error "rollout command exited $ROLLOUT_EXIT before writing a response" \ + '{ok:false, results:[], error:$error}' > "$HTTP_RESPONSE" + fi jq '{ok, result_count: (.results // [] | length)}' "$HTTP_RESPONSE" || true { @@ -330,7 +323,6 @@ jobs: echo "" echo "**Commit:** \`${GITHUB_SHA:0:7}\`" echo "**Target tag:** \`$TARGET_TAG\`" - echo "**HTTP:** $HTTP_CODE" echo "" echo "### Per-tenant result" echo "" @@ -339,15 +331,15 @@ jobs: jq -r '.results[]? | "| \(.slug) | \(.phase) | \(.ssm_status // "-") | \(.ssm_exit_code) | \(.healthz_ok) | \((.error // "") != "") |"' "$HTTP_RESPONSE" || true } >> "$GITHUB_STEP_SUMMARY" - if [ "$HTTP_CODE" != "200" ]; then - echo "::error::redeploy-fleet returned HTTP $HTTP_CODE" - exit 1 - fi OK="$(jq -r '.ok' "$HTTP_RESPONSE")" if [ "$OK" != "true" ]; then echo "::error::redeploy-fleet reported ok=false; production rollout halted." exit 1 fi + if [ "$ROLLOUT_EXIT" -ne 0 ]; then + echo "::error::redeploy-fleet rollout failed with exit code $ROLLOUT_EXIT." + exit "$ROLLOUT_EXIT" + fi - name: Verify reachable tenants report this SHA if: ${{ steps.plan.outputs.enabled == 'true' }}