forked from molecule-ai/molecule-core
fix(#1877): close token-rotation race on restart — Option A+Option B combined
Platform side (Option B): - provisioner.go: add WriteAuthTokenToVolume() — writes .auth_token to the Docker named volume BEFORE ContainerStart using a throwaway alpine container, eliminating the race window where a restarted container could read a stale token before WriteFilesToContainer writes the new one. - workspace_provision.go: call WriteAuthTokenToVolume() in issueAndInjectToken as a best-effort pre-write before the container starts. Runtime side (Option A): - heartbeat.py: on HTTPStatusError 401 from /registry/heartbeat, call refresh_cache() to force re-read of /configs/.auth_token from disk, then retry the heartbeat once. Fall through to normal failure tracking if the retry also fails. - platform_auth.py: add refresh_cache() which discards the in-process _cached_token and calls get_token() to re-read from disk. Together these eliminate the >1 consecutive 401 window described in issue #1877. Pre-write (B) is the primary fix; runtime retry (A) is the self-healing fallback for any residual race. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
307b5b5408
commit
b5e2142c46
@ -402,6 +402,14 @@ func (h *WorkspaceHandler) issueAndInjectToken(ctx context.Context, workspaceID
|
||||
cfg.ConfigFiles = make(map[string][]byte)
|
||||
}
|
||||
cfg.ConfigFiles[".auth_token"] = []byte(token)
|
||||
// Option B (issue #1877): write token to volume BEFORE ContainerStart.
|
||||
// Pre-write eliminates the race window where a restarted container could
|
||||
// read a stale /configs/.auth_token before WriteFilesToContainer runs.
|
||||
// This call is best-effort — if it fails we still log and fall through;
|
||||
// the runtime's heartbeat.py will retry on 401 if needed.
|
||||
if writeErr := h.provisioner.WriteAuthTokenToVolume(ctx, workspaceID, token); writeErr != nil {
|
||||
log.Printf("Provisioner: warning — pre-write token to volume failed for %s: %v (token still injected via WriteFilesToContainer after start)", workspaceID, writeErr)
|
||||
}
|
||||
log.Printf("Provisioner: injected fresh auth token for workspace %s into config volume", workspaceID)
|
||||
}
|
||||
|
||||
|
||||
@ -749,6 +749,41 @@ func (p *Provisioner) ReadFromVolume(ctx context.Context, volumeName, filePath s
|
||||
return clean, nil
|
||||
}
|
||||
|
||||
// WriteAuthTokenToVolume writes the workspace auth token into the config volume
|
||||
// BEFORE the container starts, eliminating the token-injection race window where
|
||||
// a restarted container could read a stale token from /configs/.auth_token before
|
||||
// WriteFilesToContainer writes the new one. Issue #1877.
|
||||
//
|
||||
// Uses a throwaway alpine container to write directly to the named volume,
|
||||
// bypassing the container lifecycle entirely.
|
||||
func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, token string) error {
|
||||
volName := ConfigVolumeName(workspaceID)
|
||||
resp, err := p.cli.ContainerCreate(ctx, &container.Config{
|
||||
Image: "alpine",
|
||||
Cmd: []string{"sh", "-c", "mkdir -p /vol && printf '%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token"},
|
||||
Env: []string{"TOKEN=" + token},
|
||||
}, &container.HostConfig{
|
||||
Binds: []string{volName + ":/vol"},
|
||||
}, nil, nil, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create token-write container: %w", err)
|
||||
}
|
||||
defer p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
|
||||
if err := p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
|
||||
return fmt.Errorf("failed to start token-write container: %w", err)
|
||||
}
|
||||
waitCh, errCh := p.cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
|
||||
select {
|
||||
case <-waitCh:
|
||||
case writeErr := <-errCh:
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("token-write container exited with error: %w", writeErr)
|
||||
}
|
||||
}
|
||||
log.Printf("Provisioner: wrote auth token to volume %s/.auth_token", volName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// execInContainer runs a command inside a running container as root.
|
||||
// Best-effort: logs errors but does not fail the caller.
|
||||
func (p *Provisioner) execInContainer(ctx context.Context, containerID string, cmd []string) {
|
||||
|
||||
@ -17,7 +17,7 @@ from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
from platform_auth import auth_headers
|
||||
from platform_auth import auth_headers, refresh_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -102,6 +102,35 @@ class HeartbeatLoop:
|
||||
self._consecutive_failures = 0
|
||||
except Exception as e:
|
||||
self._consecutive_failures += 1
|
||||
# Issue #1877: if heartbeat 401'd, re-read the token from disk
|
||||
# and retry once. This handles the platform's token-rotation race
|
||||
# where WriteFilesToContainer hasn't finished writing the new
|
||||
# token before the runtime boots and caches the old value.
|
||||
is_401 = False
|
||||
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 401:
|
||||
is_401 = True
|
||||
if is_401:
|
||||
logger.warning("Heartbeat 401 for %s — refreshing token cache and retrying once", self.workspace_id)
|
||||
refresh_cache()
|
||||
try:
|
||||
await client.post(
|
||||
f"{self.platform_url}/registry/heartbeat",
|
||||
json={
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
},
|
||||
headers=auth_headers(),
|
||||
)
|
||||
self._consecutive_failures = 0
|
||||
self.request_count += 1
|
||||
except Exception:
|
||||
# Retry also failed — fall through to the normal
|
||||
# failure tracking below.
|
||||
pass
|
||||
if self._consecutive_failures <= 3 or self._consecutive_failures % MAX_CONSECUTIVE_FAILURES == 0:
|
||||
logger.warning("Heartbeat failed (%d consecutive): %s", self._consecutive_failures, e)
|
||||
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
|
||||
|
||||
@ -103,3 +103,14 @@ def clear_cache() -> None:
|
||||
files between cases."""
|
||||
global _cached_token
|
||||
_cached_token = None
|
||||
|
||||
|
||||
def refresh_cache() -> str | None:
|
||||
"""Force re-read of the token from disk, discarding the in-process cache.
|
||||
|
||||
Use this when a 401 response suggests the cached token is stale —
|
||||
e.g. after the platform rotates tokens during a restart (issue #1877).
|
||||
Returns the (new) token value or None if not found/error."""
|
||||
global _cached_token
|
||||
_cached_token = None
|
||||
return get_token()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user