Heartbeat lies. The asyncio task that POSTs /registry/heartbeat lives in its own process slot, so a workspace whose claude_agent_sdk has wedged on `Control request timeout: initialize` keeps reporting "online" — every chat send hangs the full 5-min platform deadline even though the runtime is dead in the water. This commit teaches the workspace to admit it's wedged and the platform to honor that admission by flipping status → degraded. Five layers, all in one commit because they share a contract: 1. Migration 043 — convert workspaces.status from free-form TEXT to a real `workspace_status` Postgres ENUM with the 6 values production code actually writes (provisioning, online, offline, degraded, failed, removed). Locks the value set; future typo writes error at the DB instead of silently storing rogue strings. Down migration reverts to TEXT and drops the type. 2. workspace-server/internal/models — `HeartbeatPayload` gains a `runtime_state string` field. Empty = healthy. Currently the only non-empty value the handler honors is "wedged"; future symptoms can extend without another migration. 3. workspace-server/internal/handlers/registry.go — `evaluateStatus` gains a wedge branch BEFORE the existing error_rate >= 0.5 path: if `RuntimeState=="wedged"` and currently online, flip to degraded and broadcast WORKSPACE_DEGRADED with the wedge sample error. Recovery (`degraded → online`) now requires BOTH error_rate < 0.1 AND runtime_state cleared, so a workspace still reporting wedged stays degraded even when its error count happens to be 0 (the wedge captures a runtime state, not an error count). 4. workspace/claude_sdk_executor.py — module-level `_sdk_wedged_reason` flag set when execute()'s catch block sees an error matching `_WEDGE_ERROR_PATTERNS` (currently just "control request timeout"). Sticky for the process lifetime; the SDK's internal client-process state is corrupted on this error and only a workspace restart (= new Python process = fresh module state) clears it. Helpers `is_wedged()` / `wedge_reason()` / `_reset_sdk_wedge_for_test()` exposed. 5. workspace/heartbeat.py — heartbeat body now layers on `_runtime_state_payload()` for both the happy path and the 401-retry path. Lazy-imports claude_sdk_executor so non-Claude runtimes (where the module may not even be importable) keep working unchanged. Canvas required no changes — `STATUS_CONFIG.degraded` was already defined in design-tokens.ts (amber dot, "Degraded" label) and WorkspaceNode.tsx already renders `lastSampleError` underneath the status pill when status === "degraded". The existing wiring just never fired because nothing was writing degraded in this code path. Tests: - 3 Go handler tests for the new transitions (online → degraded on wedged, degraded stays put while still wedged, degraded → online after wedge clears) - 5 Python wedge-detector tests (default clean, mark sets flag, sticky-first-wins, execute() flips on Control request timeout, execute() does NOT flip on unrelated errors) - Migration smoke-tested against the local dev DB (3 existing rows, all enum-compatible; migration applied cleanly, post-state has the column as workspace_status type and the index preserved) Verified: 79 Python tests pass; full Go test suite passes; migration applies clean on a real DB; reverse migration restores the column to TEXT. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
357 lines
16 KiB
Python
357 lines
16 KiB
Python
"""Heartbeat loop — alive signal + delegation status checker.
|
|
|
|
Every 30 seconds:
|
|
1. Send heartbeat to platform (alive signal with current_task, error_rate)
|
|
2. Check pending delegations — any results back?
|
|
3. Store completed delegation results for the agent to pick up
|
|
|
|
Resilient: recreates HTTP client on failure, auto-restarts on crash.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from platform_auth import auth_headers, refresh_cache, self_source_headers
|
|
|
|
|
|
def _runtime_state_payload() -> dict:
|
|
"""Build the {runtime_state, sample_error} portion of the heartbeat
|
|
body when the Claude SDK has hit a wedge. Returns an empty dict
|
|
when the runtime is healthy so the heartbeat payload doesn't grow
|
|
fields the platform doesn't need.
|
|
|
|
Imported lazily so workspaces running non-Claude runtimes (where
|
|
`claude_sdk_executor` may not be importable at all) keep working —
|
|
a missing import means "no Claude wedge possible here, healthy."
|
|
"""
|
|
try:
|
|
from claude_sdk_executor import is_wedged, wedge_reason
|
|
except Exception:
|
|
return {}
|
|
if not is_wedged():
|
|
return {}
|
|
return {
|
|
"runtime_state": "wedged",
|
|
# sample_error doubles as the human-readable banner text on the
|
|
# canvas's degraded card — keep it short and actionable.
|
|
"sample_error": wedge_reason(),
|
|
}
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
HEARTBEAT_INTERVAL = 30 # seconds
|
|
MAX_CONSECUTIVE_FAILURES = 10
|
|
MAX_SEEN_DELEGATION_IDS = 200
|
|
SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to prevent loops
|
|
# Shared path — also used by cli_executor._read_delegation_results()
|
|
DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl")
|
|
|
|
|
|
class HeartbeatLoop:
|
|
def __init__(self, platform_url: str, workspace_id: str):
|
|
self.platform_url = platform_url
|
|
self.workspace_id = workspace_id
|
|
self.start_time = time.time()
|
|
self.error_count = 0
|
|
self.request_count = 0
|
|
self.active_tasks = 0
|
|
self.current_task = ""
|
|
self.sample_error = ""
|
|
self._task = None
|
|
self._consecutive_failures = 0
|
|
self._seen_delegation_ids: set[str] = set()
|
|
self._last_self_message_time = 0.0
|
|
self._parent_name: str | None = None # Cached after first lookup
|
|
|
|
@property
|
|
def error_rate(self) -> float:
|
|
if self.request_count == 0:
|
|
return 0.0
|
|
return self.error_count / self.request_count
|
|
|
|
def record_error(self, error: str):
|
|
self.error_count += 1
|
|
self.request_count += 1
|
|
self.sample_error = error
|
|
|
|
def record_success(self):
|
|
self.request_count += 1
|
|
|
|
def start(self):
|
|
self._task = asyncio.create_task(self._loop())
|
|
self._task.add_done_callback(self._on_done)
|
|
|
|
def _on_done(self, task):
|
|
if not task.cancelled() and task.exception():
|
|
logger.error("Heartbeat loop died: %s — restarting", task.exception())
|
|
self._task = asyncio.create_task(self._loop())
|
|
self._task.add_done_callback(self._on_done)
|
|
|
|
async def stop(self):
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _loop(self):
|
|
while True:
|
|
client = None
|
|
try:
|
|
client = httpx.AsyncClient(timeout=10.0)
|
|
while True:
|
|
# 1. Send heartbeat (Phase 30.1: include auth header if token known)
|
|
try:
|
|
body = {
|
|
"workspace_id": self.workspace_id,
|
|
"error_rate": self.error_rate,
|
|
"sample_error": self.sample_error,
|
|
"active_tasks": self.active_tasks,
|
|
"current_task": self.current_task,
|
|
"uptime_seconds": int(time.time() - self.start_time),
|
|
}
|
|
# Layer the runtime-wedge fields on top so a
|
|
# non-empty sample_error from the wedge wins
|
|
# over the (typically empty) heartbeat
|
|
# sample_error field. The platform reads
|
|
# runtime_state to flip status → degraded.
|
|
body.update(_runtime_state_payload())
|
|
await client.post(
|
|
f"{self.platform_url}/registry/heartbeat",
|
|
json=body,
|
|
headers=auth_headers(),
|
|
)
|
|
self.error_count = 0
|
|
self.request_count = 0
|
|
self._consecutive_failures = 0
|
|
except Exception as e:
|
|
self._consecutive_failures += 1
|
|
# Issue #1877: if heartbeat 401'd, re-read the token from disk
|
|
# and retry once. This handles the platform's token-rotation race
|
|
# where WriteFilesToContainer hasn't finished writing the new
|
|
# token before the runtime boots and caches the old value.
|
|
is_401 = False
|
|
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 401:
|
|
is_401 = True
|
|
if is_401:
|
|
logger.warning("Heartbeat 401 for %s — refreshing token cache and retrying once", self.workspace_id)
|
|
refresh_cache()
|
|
try:
|
|
retry_body = {
|
|
"workspace_id": self.workspace_id,
|
|
"error_rate": self.error_rate,
|
|
"sample_error": self.sample_error,
|
|
"active_tasks": self.active_tasks,
|
|
"current_task": self.current_task,
|
|
"uptime_seconds": int(time.time() - self.start_time),
|
|
}
|
|
retry_body.update(_runtime_state_payload())
|
|
await client.post(
|
|
f"{self.platform_url}/registry/heartbeat",
|
|
json=retry_body,
|
|
headers=auth_headers(),
|
|
)
|
|
self._consecutive_failures = 0
|
|
self.request_count += 1
|
|
except Exception:
|
|
# Retry also failed — fall through to the normal
|
|
# failure tracking below.
|
|
pass
|
|
if self._consecutive_failures <= 3 or self._consecutive_failures % MAX_CONSECUTIVE_FAILURES == 0:
|
|
logger.warning("Heartbeat failed (%d consecutive): %s", self._consecutive_failures, e)
|
|
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
|
|
logger.info("Heartbeat: recreating HTTP client after %d failures", self._consecutive_failures)
|
|
try:
|
|
await client.aclose()
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
# 2. Check delegation status
|
|
try:
|
|
await self._check_delegations(client)
|
|
except Exception as e:
|
|
logger.debug("Delegation check failed: %s", e)
|
|
|
|
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
|
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Heartbeat loop error: %s — retrying in 30s", e)
|
|
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
|
finally:
|
|
if client:
|
|
try:
|
|
await client.aclose()
|
|
except Exception:
|
|
pass
|
|
|
|
async def _check_delegations(self, client: httpx.AsyncClient):
|
|
"""Check for completed delegations and store results for the agent."""
|
|
try:
|
|
resp = await client.get(
|
|
f"{self.platform_url}/workspaces/{self.workspace_id}/delegations",
|
|
headers=auth_headers(),
|
|
)
|
|
if resp.status_code != 200:
|
|
return
|
|
|
|
delegations = resp.json()
|
|
if not isinstance(delegations, list):
|
|
return
|
|
|
|
new_results = []
|
|
for d in delegations:
|
|
did = d.get("delegation_id", "")
|
|
status = d.get("status", "")
|
|
|
|
if not did or did in self._seen_delegation_ids:
|
|
continue
|
|
|
|
if status in ("completed", "failed"):
|
|
# Fix B (Cycle 5): validate source_id before accepting delegation
|
|
# results. Only process delegations that THIS workspace created
|
|
# (source_id == self.workspace_id). Attacker-crafted delegation
|
|
# records with a foreign source_id cannot inject instructions.
|
|
source_id = d.get("source_id", "")
|
|
if source_id != self.workspace_id:
|
|
logger.warning(
|
|
"Heartbeat: skipping delegation %s — source_id %r does not "
|
|
"match this workspace %r; possible injection attempt",
|
|
did, source_id, self.workspace_id,
|
|
)
|
|
self._seen_delegation_ids.add(did) # mark seen so we don't warn again
|
|
continue
|
|
|
|
self._seen_delegation_ids.add(did)
|
|
new_results.append({
|
|
"delegation_id": did,
|
|
"target_id": d.get("target_id", ""),
|
|
"source_id": source_id,
|
|
"status": status,
|
|
"summary": d.get("summary", ""),
|
|
"response_preview": d.get("response_preview", ""),
|
|
"error": d.get("error", ""),
|
|
"timestamp": time.time(),
|
|
})
|
|
|
|
# Evict old seen IDs if over limit
|
|
if len(self._seen_delegation_ids) > MAX_SEEN_DELEGATION_IDS:
|
|
# Keep most recent half
|
|
self._seen_delegation_ids = set(list(self._seen_delegation_ids)[MAX_SEEN_DELEGATION_IDS // 2:])
|
|
|
|
if new_results:
|
|
# Append to results file for context injection on next message
|
|
with open(DELEGATION_RESULTS_FILE, "a") as f:
|
|
for r in new_results:
|
|
f.write(json.dumps(r) + "\n")
|
|
logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results))
|
|
|
|
# Build a summary message for the agent.
|
|
# Fix B (Cycle 5): do NOT embed raw response_preview text in
|
|
# user-role A2A messages — that is the prompt-injection vector.
|
|
# Instead reference only the delegation ID and status; the agent
|
|
# reads full content from DELEGATION_RESULTS_FILE which was
|
|
# written above from trusted platform data.
|
|
summary_lines = []
|
|
for r in new_results:
|
|
line = f"- [{r['status']}] Delegation {r['delegation_id'][:8]}: {r['summary'][:80]}"
|
|
if r.get("error"):
|
|
line += f"\n Error: {r['error'][:100]}"
|
|
summary_lines.append(line)
|
|
|
|
# Look up parent workspace (cached after first call)
|
|
if self._parent_name is None:
|
|
try:
|
|
parent_resp = await client.get(
|
|
f"{self.platform_url}/workspaces/{self.workspace_id}",
|
|
headers=auth_headers(),
|
|
)
|
|
if parent_resp.status_code == 200:
|
|
parent_id = parent_resp.json().get("parent_id", "")
|
|
if parent_id:
|
|
parent_info = await client.get(
|
|
f"{self.platform_url}/workspaces/{parent_id}",
|
|
headers=auth_headers(),
|
|
)
|
|
if parent_info.status_code == 200:
|
|
self._parent_name = parent_info.json().get("name", "")
|
|
if self._parent_name is None:
|
|
self._parent_name = "" # No parent — cache empty
|
|
except Exception:
|
|
pass # Will retry next cycle
|
|
parent_name = self._parent_name or ""
|
|
|
|
report_instruction = ""
|
|
if parent_name:
|
|
report_instruction = (
|
|
f"\n\nIMPORTANT: Report these results back to your parent '{parent_name}' "
|
|
f"by delegating a summary to them. Use delegate_task or delegate_task_async "
|
|
f"with a concise status report. Also use send_message_to_user to notify the user."
|
|
)
|
|
else:
|
|
report_instruction = (
|
|
"\n\nReport results using send_message_to_user to notify the user."
|
|
)
|
|
|
|
trigger_msg = (
|
|
"Delegation results are ready. Review them and take appropriate action:\n"
|
|
+ "\n".join(summary_lines)
|
|
+ report_instruction
|
|
)
|
|
|
|
# Send A2A self-message to wake the agent.
|
|
# Minimum 60s between self-messages to avoid spam, but always send
|
|
# when there are genuinely NEW results to process.
|
|
now = time.time()
|
|
if now - self._last_self_message_time < SELF_MESSAGE_COOLDOWN:
|
|
logger.debug("Heartbeat: self-message cooldown (60s), will retry next cycle")
|
|
else:
|
|
self._last_self_message_time = now
|
|
try:
|
|
# self_source_headers() adds X-Workspace-ID so the
|
|
# platform tags this row source=agent, not canvas
|
|
# — see platform_auth.py for the full rationale.
|
|
await client.post(
|
|
f"{self.platform_url}/workspaces/{self.workspace_id}/a2a",
|
|
json={
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"parts": [{"type": "text", "text": trigger_msg}],
|
|
},
|
|
},
|
|
},
|
|
headers=self_source_headers(self.workspace_id),
|
|
timeout=120.0,
|
|
)
|
|
logger.info("Heartbeat: self-message sent to process delegation results")
|
|
except Exception as e:
|
|
logger.warning("Heartbeat: failed to send self-message: %s", e)
|
|
|
|
# Also push notification to user via canvas
|
|
for r in new_results:
|
|
try:
|
|
msg = f"Delegation {r['status']}: {r['summary'][:100]}"
|
|
if r.get("response_preview"):
|
|
msg += f"\nResult: {r['response_preview'][:200]}"
|
|
await client.post(
|
|
f"{self.platform_url}/workspaces/{self.workspace_id}/notify",
|
|
json={"message": msg, "type": "delegation_result"},
|
|
headers=auth_headers(),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.debug("Delegation check error: %s", e)
|