Merge pull request #1921 from Molecule-AI/fix/1877-token-rotation-race
fix(#1877): close token-rotation race on restart — Option A+Option B
This commit is contained in:
commit
1265bcbec6
@ -402,6 +402,17 @@ func (h *WorkspaceHandler) issueAndInjectToken(ctx context.Context, workspaceID
|
||||
cfg.ConfigFiles = make(map[string][]byte)
|
||||
}
|
||||
cfg.ConfigFiles[".auth_token"] = []byte(token)
|
||||
// Option B (issue #1877): write token to volume BEFORE ContainerStart.
|
||||
// Pre-write eliminates the race window where a restarted container could
|
||||
// read a stale /configs/.auth_token before WriteFilesToContainer runs.
|
||||
// This call is best-effort — if it fails (or provisioner is nil in tests)
|
||||
// we still log and fall through; the runtime's heartbeat.py will retry
|
||||
// on 401 if needed.
|
||||
if h.provisioner != nil {
|
||||
if writeErr := h.provisioner.WriteAuthTokenToVolume(ctx, workspaceID, token); writeErr != nil {
|
||||
log.Printf("Provisioner: warning — pre-write token to volume failed for %s: %v (token still injected via WriteFilesToContainer after start)", workspaceID, writeErr)
|
||||
}
|
||||
}
|
||||
log.Printf("Provisioner: injected fresh auth token for workspace %s into config volume", workspaceID)
|
||||
}
|
||||
|
||||
|
||||
@ -749,6 +749,41 @@ func (p *Provisioner) ReadFromVolume(ctx context.Context, volumeName, filePath s
|
||||
return clean, nil
|
||||
}
|
||||
|
||||
// WriteAuthTokenToVolume writes the workspace auth token into the config volume
|
||||
// BEFORE the container starts, eliminating the token-injection race window where
|
||||
// a restarted container could read a stale token from /configs/.auth_token before
|
||||
// WriteFilesToContainer writes the new one. Issue #1877.
|
||||
//
|
||||
// Uses a throwaway alpine container to write directly to the named volume,
|
||||
// bypassing the container lifecycle entirely.
|
||||
func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, token string) error {
|
||||
volName := ConfigVolumeName(workspaceID)
|
||||
resp, err := p.cli.ContainerCreate(ctx, &container.Config{
|
||||
Image: "alpine",
|
||||
Cmd: []string{"sh", "-c", "mkdir -p /vol && printf '%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token"},
|
||||
Env: []string{"TOKEN=" + token},
|
||||
}, &container.HostConfig{
|
||||
Binds: []string{volName + ":/vol"},
|
||||
}, nil, nil, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create token-write container: %w", err)
|
||||
}
|
||||
defer p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
|
||||
if err := p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
|
||||
return fmt.Errorf("failed to start token-write container: %w", err)
|
||||
}
|
||||
waitCh, errCh := p.cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
|
||||
select {
|
||||
case <-waitCh:
|
||||
case writeErr := <-errCh:
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("token-write container exited with error: %w", writeErr)
|
||||
}
|
||||
}
|
||||
log.Printf("Provisioner: wrote auth token to volume %s/.auth_token", volName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// execInContainer runs a command inside a running container as root.
|
||||
// Best-effort: logs errors but does not fail the caller.
|
||||
func (p *Provisioner) execInContainer(ctx context.Context, containerID string, cmd []string) {
|
||||
|
||||
@ -17,7 +17,7 @@ from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
from platform_auth import auth_headers
|
||||
from platform_auth import auth_headers, refresh_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -102,6 +102,35 @@ class HeartbeatLoop:
|
||||
self._consecutive_failures = 0
|
||||
except Exception as e:
|
||||
self._consecutive_failures += 1
|
||||
# Issue #1877: if heartbeat 401'd, re-read the token from disk
|
||||
# and retry once. This handles the platform's token-rotation race
|
||||
# where WriteFilesToContainer hasn't finished writing the new
|
||||
# token before the runtime boots and caches the old value.
|
||||
is_401 = False
|
||||
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 401:
|
||||
is_401 = True
|
||||
if is_401:
|
||||
logger.warning("Heartbeat 401 for %s — refreshing token cache and retrying once", self.workspace_id)
|
||||
refresh_cache()
|
||||
try:
|
||||
await client.post(
|
||||
f"{self.platform_url}/registry/heartbeat",
|
||||
json={
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
},
|
||||
headers=auth_headers(),
|
||||
)
|
||||
self._consecutive_failures = 0
|
||||
self.request_count += 1
|
||||
except Exception:
|
||||
# Retry also failed — fall through to the normal
|
||||
# failure tracking below.
|
||||
pass
|
||||
if self._consecutive_failures <= 3 or self._consecutive_failures % MAX_CONSECUTIVE_FAILURES == 0:
|
||||
logger.warning("Heartbeat failed (%d consecutive): %s", self._consecutive_failures, e)
|
||||
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
|
||||
|
||||
@ -103,3 +103,14 @@ def clear_cache() -> None:
|
||||
files between cases."""
|
||||
global _cached_token
|
||||
_cached_token = None
|
||||
|
||||
|
||||
def refresh_cache() -> str | None:
|
||||
"""Force re-read of the token from disk, discarding the in-process cache.
|
||||
|
||||
Use this when a 401 response suggests the cached token is stale —
|
||||
e.g. after the platform rotates tokens during a restart (issue #1877).
|
||||
Returns the (new) token value or None if not found/error."""
|
||||
global _cached_token
|
||||
_cached_token = None
|
||||
return get_token()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user