From 050c2412b3b16c8f301bc30bf0cc87a4731ef3e6 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Thu, 23 Apr 2026 13:26:36 -0700 Subject: [PATCH 1/2] fix(heartbeat): refresh on-disk auth token on 401 + retry once (#1877) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Auto-restart rotates the workspace's auth token in two non-atomic steps: 1. Platform issues new token via wsauth.IssueToken 2. Provisioner writes the new token to /configs/.auth_token AFTER ContainerStart returns Between steps 1 and 2, the new container has booted and the runtime has already loaded the OLD cached value of .auth_token (or no value if the file was empty during boot). The runtime's first /registry/heartbeat call sends the stale token, gets 401, but the loop never re-reads the on-disk token — so subsequent heartbeats also send the stale value. Each 401 means the platform never sees the workspace as alive → status stays 'provisioning' → scheduler won't dispatch → workspace looks dead from every angle even though the container is actually running. The existing code comment in workspace_provision.go acknowledges this: "the workspace will get 401 on its first heartbeat and can recover on the next restart." That recovery only worked because workspaces used to crash for unrelated reasons and get restarted. After PR #1861 (provisioner empty-volume auto-recover) removed those crashes, workspaces get stuck in the 401 loop with no exit. ## Fix Two-part runtime-side fix in molecule-ai-workspace-runtime: 1. **platform_auth.refresh_from_disk()** — new helper that clears the in-memory cache and re-reads /configs/.auth_token. Returns the fresh value (or None if missing). Updates the cache as a side effect. 2. **HeartbeatLoop._loop()** — on 401 from /registry/heartbeat, calls refresh_from_disk() and retries the request ONCE with the new token. Same pattern in _check_delegations(). Bounded retry budget — if the on-disk token is also stale (bug elsewhere), no infinite loop. ## Tests 6/6 new tests in tests/test_token_refresh_1877.py: - refresh_picks_up_rotated_token — happy path - refresh_returns_none_when_file_missing — defensive - refresh_clears_stale_cache_when_file_disappears - refresh_is_idempotent - 401_retry_pattern_uses_refreshed_token — the production fix path - 401_retry_no_loop_when_disk_token_also_stale — bounded retry budget All pass locally on Python 3.13 + pytest 9. ## Why this fix and not the alternatives - **Alternative B (platform writes token before ContainerStart):** Right architecturally but invasive — needs provisioner refactor to prep volumes before docker run. - **Alternative C (skip rotation on auto-restart):** Breaks the multi-instance-safety invariant the existing code calls out (revoke prevents stale tokens from sister deployments). - **This fix (A):** 3-line core change + helper. Self-healing for any timing edge case, not just the post-restart one. Costs nothing in the happy path (only triggers on 401). ## Version Bumped to 0.1.9. Once published to PyPI + workspace template image rebuilt, deployed workspaces auto-recover from token-rotation races without operator intervention. Closes #1877. Co-Authored-By: Claude Opus 4.7 (1M context) --- molecule_runtime/heartbeat.py | 46 ++++++--- molecule_runtime/platform_auth.py | 19 ++++ pyproject.toml | 2 +- tests/test_token_refresh_1877.py | 156 ++++++++++++++++++++++++++++++ 4 files changed, 208 insertions(+), 15 deletions(-) create mode 100644 tests/test_token_refresh_1877.py diff --git a/molecule_runtime/heartbeat.py b/molecule_runtime/heartbeat.py index 194d52e..c08f944 100644 --- a/molecule_runtime/heartbeat.py +++ b/molecule_runtime/heartbeat.py @@ -17,7 +17,7 @@ from pathlib import Path import httpx -from molecule_runtime.platform_auth import auth_headers +from molecule_runtime.platform_auth import auth_headers, refresh_from_disk logger = logging.getLogger(__name__) @@ -85,18 +85,34 @@ class HeartbeatLoop: while True: # 1. Send heartbeat (Phase 30.1: include auth header if token known) try: - await client.post( + hb_payload = { + "workspace_id": self.workspace_id, + "error_rate": self.error_rate, + "sample_error": self.sample_error, + "active_tasks": self.active_tasks, + "current_task": self.current_task, + "uptime_seconds": int(time.time() - self.start_time), + } + resp = await client.post( f"{self.platform_url}/registry/heartbeat", - json={ - "workspace_id": self.workspace_id, - "error_rate": self.error_rate, - "sample_error": self.sample_error, - "active_tasks": self.active_tasks, - "current_task": self.current_task, - "uptime_seconds": int(time.time() - self.start_time), - }, + json=hb_payload, headers=auth_headers(), ) + # #1877: auto-restart rotates the workspace token AFTER + # container start, so the first heartbeat after a restart + # can race the token write and send the stale cached + # value → 401. Re-read /configs/.auth_token and retry ONCE + # to break the 401 loop without needing another restart. + if resp.status_code == 401: + if refresh_from_disk() is not None: + logger.info( + "Heartbeat: got 401, refreshed token from disk, retrying" + ) + resp = await client.post( + f"{self.platform_url}/registry/heartbeat", + json=hb_payload, + headers=auth_headers(), + ) self.error_count = 0 self.request_count = 0 self._consecutive_failures = 0 @@ -135,10 +151,12 @@ class HeartbeatLoop: async def _check_delegations(self, client: httpx.AsyncClient): """Check for completed delegations and store results for the agent.""" try: - resp = await client.get( - f"{self.platform_url}/workspaces/{self.workspace_id}/delegations", - headers=auth_headers(), - ) + url = f"{self.platform_url}/workspaces/{self.workspace_id}/delegations" + resp = await client.get(url, headers=auth_headers()) + # #1877: refresh token on 401 and retry ONCE — same post-restart + # token-rotation race as the heartbeat path above. + if resp.status_code == 401 and refresh_from_disk() is not None: + resp = await client.get(url, headers=auth_headers()) if resp.status_code != 200: return diff --git a/molecule_runtime/platform_auth.py b/molecule_runtime/platform_auth.py index 36bcfe7..34c1abf 100644 --- a/molecule_runtime/platform_auth.py +++ b/molecule_runtime/platform_auth.py @@ -171,3 +171,22 @@ def clear_cache() -> None: files between cases.""" global _cached_token _cached_token = None + + +def refresh_from_disk() -> str | None: + """Force-reload the token from ``/configs/.auth_token``, bypassing the + in-memory cache. Used by callers (e.g. heartbeat loop) that got a 401 + from the platform and suspect the on-disk token was rotated after boot. + + Returns the fresh token on success, ``None`` if the file is missing or + unreadable. Updates the in-memory cache as a side-effect so subsequent + :func:`auth_headers` calls pick up the new value. + + Context (#1877): on auto-restart, the platform revokes the old token + and writes a new ``.auth_token`` AFTER ``ContainerStart``, so the + runtime's first heartbeat can race the token write and send the stale + cached value. Re-reading from disk on 401 breaks the loop without + needing another full container restart. + """ + clear_cache() + return get_token() diff --git a/pyproject.toml b/pyproject.toml index c904023..ae5f3be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "molecule-ai-workspace-runtime" -version = "0.1.8" +version = "0.1.9" description = "Molecule AI workspace runtime — shared infrastructure for all agent adapters" requires-python = ">=3.11" diff --git a/tests/test_token_refresh_1877.py b/tests/test_token_refresh_1877.py new file mode 100644 index 0000000..a37d583 --- /dev/null +++ b/tests/test_token_refresh_1877.py @@ -0,0 +1,156 @@ +"""Tests for #1877 fix — runtime re-reads /configs/.auth_token on 401. + +Covers two surfaces: + +1. ``platform_auth.refresh_from_disk()`` — pure helper that clears the + in-memory cache and re-reads the file. +2. The HeartbeatLoop 401-then-retry pattern (verified by replaying it + against an httpx MockTransport). +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +# WORKSPACE_ID must be set BEFORE importing platform_auth — the module +# validates the env var at import time. +os.environ.setdefault("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001") + +import httpx +import pytest + +import molecule_runtime.platform_auth as pa +from molecule_runtime.platform_auth import ( + auth_headers, + clear_cache, + get_token, + refresh_from_disk, + save_token, +) + + +# ---------- platform_auth.refresh_from_disk ---------- + + +def test_refresh_picks_up_rotated_token(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + + save_token("token-v1") + assert get_token() == "token-v1" + + # Simulate platform rotating the token on disk while runtime had it cached + (tmp_path / ".auth_token").write_text("token-v2") + assert auth_headers().get("Authorization") == "Bearer token-v1" # cache stale + + fresh = refresh_from_disk() + assert fresh == "token-v2" + assert auth_headers().get("Authorization") == "Bearer token-v2" + + +def test_refresh_returns_none_when_file_missing(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + assert refresh_from_disk() is None + assert "Authorization" not in auth_headers() + + +def test_refresh_clears_stale_cache_when_file_disappears( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + save_token("token-v1") + assert get_token() == "token-v1" + + (tmp_path / ".auth_token").unlink() + assert refresh_from_disk() is None + + +def test_refresh_is_idempotent(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + (tmp_path / ".auth_token").write_text("stable-token") + + a = refresh_from_disk() + b = refresh_from_disk() + assert a == b == "stable-token" + + +# ---------- 401 retry pattern (replayed manually against MockTransport) ---------- + + +@pytest.mark.asyncio +async def test_401_retry_pattern_uses_refreshed_token( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + """Models the #1877 fix path: 401 -> refresh_from_disk -> retry succeeds.""" + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + + save_token("token-v1") + (tmp_path / ".auth_token").write_text("token-v2") + pa._cached_token = "token-v1" # explicit stale cache + + calls: list[dict[str, Any]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append({"auth": request.headers.get("authorization", "")}) + if "token-v1" in request.headers.get("authorization", ""): + return httpx.Response(401, json={"error": "invalid token"}) + return httpx.Response(200, json={}) + + transport = httpx.MockTransport(handler) + client = httpx.AsyncClient(transport=transport, timeout=5.0) + + payload = {"workspace_id": "ws-test", "active_tasks": 0} + url = "http://platform:8080/registry/heartbeat" + + # Mirror exactly what heartbeat.py now does: + resp = await client.post(url, json=payload, headers=auth_headers()) + if resp.status_code == 401 and refresh_from_disk() is not None: + resp = await client.post(url, json=payload, headers=auth_headers()) + + assert resp.status_code == 200 + assert len(calls) == 2 + assert calls[0]["auth"] == "Bearer token-v1" # stale, rejected + assert calls[1]["auth"] == "Bearer token-v2" # fresh, accepted + + await client.aclose() + + +@pytest.mark.asyncio +async def test_401_retry_no_loop_when_disk_token_also_stale( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +): + """If both cached AND disk tokens are stale, the retry uses the same value + as the original — and the loop must NOT retry forever. The production code + only retries ONCE.""" + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + clear_cache() + + save_token("token-everywhere-stale") # disk + cache match, both invalid + + calls: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append(request.headers.get("authorization", "")) + return httpx.Response(401, json={"error": "invalid token"}) + + transport = httpx.MockTransport(handler) + client = httpx.AsyncClient(transport=transport, timeout=5.0) + + payload = {"workspace_id": "ws-test"} + url = "http://platform:8080/registry/heartbeat" + + resp = await client.post(url, json=payload, headers=auth_headers()) + if resp.status_code == 401 and refresh_from_disk() is not None: + resp = await client.post(url, json=payload, headers=auth_headers()) + + # Both attempts 401, no third call — bounded retry budget + assert resp.status_code == 401 + assert len(calls) == 2 + + await client.aclose() From a78b9f229e66d2a3469925b129bd6417f5776055 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Thu, 23 Apr 2026 13:35:45 -0700 Subject: [PATCH 2/2] test(1877): convert async tests to sync httpx.Client to unblock CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI doesn't have pytest-asyncio installed, and the async wrapping was incidental — the production retry pattern (refresh-on-401) is identical in sync and async forms. Switching to httpx.Client + MockTransport keeps the same coverage without the async dep. 6/6 still pass locally. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_token_refresh_1877.py | 61 +++++++++++++++----------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/tests/test_token_refresh_1877.py b/tests/test_token_refresh_1877.py index a37d583..6c8e3c0 100644 --- a/tests/test_token_refresh_1877.py +++ b/tests/test_token_refresh_1877.py @@ -82,11 +82,15 @@ def test_refresh_is_idempotent(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): # ---------- 401 retry pattern (replayed manually against MockTransport) ---------- -@pytest.mark.asyncio -async def test_401_retry_pattern_uses_refreshed_token( +def test_401_retry_pattern_uses_refreshed_token( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): - """Models the #1877 fix path: 401 -> refresh_from_disk -> retry succeeds.""" + """Models the #1877 fix path: 401 -> refresh_from_disk -> retry succeeds. + + Uses httpx sync Client + MockTransport so the test doesn't require + pytest-asyncio in CI (the production code is async, but the retry + *logic* — refresh-on-401 — is identical sync or async). + """ monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) clear_cache() @@ -102,27 +106,22 @@ async def test_401_retry_pattern_uses_refreshed_token( return httpx.Response(401, json={"error": "invalid token"}) return httpx.Response(200, json={}) - transport = httpx.MockTransport(handler) - client = httpx.AsyncClient(transport=transport, timeout=5.0) + with httpx.Client(transport=httpx.MockTransport(handler), timeout=5.0) as client: + payload = {"workspace_id": "ws-test", "active_tasks": 0} + url = "http://platform:8080/registry/heartbeat" - payload = {"workspace_id": "ws-test", "active_tasks": 0} - url = "http://platform:8080/registry/heartbeat" + # Mirror exactly what heartbeat.py now does: + resp = client.post(url, json=payload, headers=auth_headers()) + if resp.status_code == 401 and refresh_from_disk() is not None: + resp = client.post(url, json=payload, headers=auth_headers()) - # Mirror exactly what heartbeat.py now does: - resp = await client.post(url, json=payload, headers=auth_headers()) - if resp.status_code == 401 and refresh_from_disk() is not None: - resp = await client.post(url, json=payload, headers=auth_headers()) - - assert resp.status_code == 200 - assert len(calls) == 2 - assert calls[0]["auth"] == "Bearer token-v1" # stale, rejected - assert calls[1]["auth"] == "Bearer token-v2" # fresh, accepted - - await client.aclose() + assert resp.status_code == 200 + assert len(calls) == 2 + assert calls[0]["auth"] == "Bearer token-v1" # stale, rejected + assert calls[1]["auth"] == "Bearer token-v2" # fresh, accepted -@pytest.mark.asyncio -async def test_401_retry_no_loop_when_disk_token_also_stale( +def test_401_retry_no_loop_when_disk_token_also_stale( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): """If both cached AND disk tokens are stale, the retry uses the same value @@ -139,18 +138,14 @@ async def test_401_retry_no_loop_when_disk_token_also_stale( calls.append(request.headers.get("authorization", "")) return httpx.Response(401, json={"error": "invalid token"}) - transport = httpx.MockTransport(handler) - client = httpx.AsyncClient(transport=transport, timeout=5.0) + with httpx.Client(transport=httpx.MockTransport(handler), timeout=5.0) as client: + payload = {"workspace_id": "ws-test"} + url = "http://platform:8080/registry/heartbeat" - payload = {"workspace_id": "ws-test"} - url = "http://platform:8080/registry/heartbeat" + resp = client.post(url, json=payload, headers=auth_headers()) + if resp.status_code == 401 and refresh_from_disk() is not None: + resp = client.post(url, json=payload, headers=auth_headers()) - resp = await client.post(url, json=payload, headers=auth_headers()) - if resp.status_code == 401 and refresh_from_disk() is not None: - resp = await client.post(url, json=payload, headers=auth_headers()) - - # Both attempts 401, no third call — bounded retry budget - assert resp.status_code == 401 - assert len(calls) == 2 - - await client.aclose() + # Both attempts 401, no third call — bounded retry budget + assert resp.status_code == 401 + assert len(calls) == 2