molecule-ai-workspace-runtime/molecule_runtime/adapters/shared_runtime.py
rabbitblood d3235cc564 fix(heartbeat): increment/decrement active_tasks + push on clear (#1372, #1408)
Both set_current_task() implementations (shared_runtime.py + executor_helpers.py):
- Increment active_tasks on task start, decrement on completion (was binary 0/1)
- Push heartbeat immediately on BOTH increment AND decrement
- Only clear current_task when active_tasks reaches 0 (preserves description
  for still-running tasks)

Fixes phantom-busy: the old code returned early on clear, leaving
active_tasks=1 in the platform DB until the next 30s heartbeat cycle.
If a new cron fired before the heartbeat, the workspace appeared
permanently busy — required manual DB reset every 30 min.

Bump: 0.1.2 → 0.1.3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 06:37:12 -07:00

191 lines
6.6 KiB
Python

"""Shared runtime helpers for A2A-backed workspace executors."""
from __future__ import annotations
from typing import Any
from a2a.server.agent_execution import RequestContext
def _extract_part_text(part) -> str:
"""Extract text from a message part, handling dicts and A2A objects."""
if isinstance(part, dict):
text = part.get("text", "")
if text:
return text
root = part.get("root")
if isinstance(root, dict):
return root.get("text", "")
return ""
if hasattr(part, "text") and part.text:
return part.text
if hasattr(part, "root") and hasattr(part.root, "text") and part.root.text:
return part.root.text
return ""
def extract_message_text(context_or_parts) -> str:
"""Extract concatenated plain text from A2A message parts."""
parts = getattr(getattr(context_or_parts, "message", None), "parts", None)
if parts is None:
parts = context_or_parts
return " ".join(
text for part in (parts or []) if (text := _extract_part_text(part))
).strip()
def extract_history(context: RequestContext) -> list[tuple[str, str]]:
"""Extract conversation history from A2A request metadata."""
messages: list[tuple[str, str]] = []
request = getattr(context, "request", None)
metadata = getattr(request, "metadata", None) if request else None
if not isinstance(metadata, dict):
metadata = getattr(context, "metadata", None) or {}
history = metadata.get("history", []) if isinstance(metadata, dict) else []
if not isinstance(history, list):
return messages
for entry in history:
if not isinstance(entry, dict):
continue
role = entry.get("role", "user")
parts = entry.get("parts", [])
text = " ".join(
text for part in (parts or []) if (text := _extract_part_text(part))
).strip()
if text:
mapped_role = "human" if role == "user" else "ai"
messages.append((mapped_role, text))
return messages
def format_conversation_history(history: list[tuple[str, str]]) -> str:
"""Render `(role, text)` history into a stable human-readable transcript."""
return "\n".join(
f"{'User' if role == 'human' else 'Agent'}: {text}" for role, text in history
)
def build_task_text(user_message: str, history: list[tuple[str, str]]) -> str:
"""Build a single task/request string with optional prepended conversation history."""
if not history:
return user_message
transcript = format_conversation_history(history)
return f"Conversation so far:\n{transcript}\n\nCurrent request: {user_message}"
def append_peer_guidance(
base_text: str | None,
peers_info: str,
*,
default_text: str,
tool_name: str,
) -> str:
"""Append peer guidance text when peers are available."""
text = (base_text or default_text).strip()
if peers_info:
text += f"\n\n## Peers\n{peers_info}\nUse {tool_name} to communicate with them."
return text
def summarize_peer_cards(peers: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Return compact peer metadata for prompt rendering."""
summaries: list[dict[str, Any]] = []
for peer in peers:
agent_card = peer.get("agent_card")
if not agent_card:
continue
if isinstance(agent_card, str):
try:
import json
agent_card = json.loads(agent_card)
except Exception:
continue
if not isinstance(agent_card, dict):
continue
skills = agent_card.get("skills", [])
summaries.append(
{
"id": peer.get("id", "unknown"),
"name": agent_card.get("name", peer.get("name", "Unknown")),
"status": peer.get("status", "unknown"),
"skills": [
s.get("name", s.get("id", ""))
for s in skills
if isinstance(s, dict)
],
}
)
return summaries
def build_peer_section(
peers: list[dict[str, Any]],
*,
heading: str = "## Your Peers (workspaces you can delegate to)",
instruction: str = (
"Use the `delegate_to_workspace` tool to send tasks to peers. "
"Only delegate to peers listed above."
),
) -> str:
"""Render a stable peer section for system prompts."""
summaries = summarize_peer_cards(peers)
if not summaries:
return ""
parts = [heading, ""]
for peer in summaries:
parts.append(f"- **{peer['name']}** (id: `{peer['id']}`, status: {peer['status']})")
if peer["skills"]:
parts.append(f" Skills: {', '.join(peer['skills'])}")
parts.append("")
parts.append(instruction)
return "\n".join(parts)
def brief_task(text: str, limit: int = 60) -> str:
"""Create a short human-readable task label for the heartbeat banner."""
return text[:limit] + ("..." if len(text) > limit else "")
async def set_current_task(heartbeat: Any, task: str) -> None:
"""Update current task on heartbeat and push immediately to platform.
Uses increment/decrement instead of binary 0/1 so agents can track
multiple concurrent tasks (#1408). Pushes immediately on both
increment and decrement to avoid phantom-busy (#1372).
"""
if heartbeat:
if task:
heartbeat.active_tasks = getattr(heartbeat, "active_tasks", 0) + 1
heartbeat.current_task = task
else:
heartbeat.active_tasks = max(0, getattr(heartbeat, "active_tasks", 0) - 1)
if heartbeat.active_tasks == 0:
heartbeat.current_task = ""
import os
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if workspace_id and platform_url:
try:
import httpx
active = getattr(heartbeat, "active_tasks", 0) if heartbeat else (1 if task else 0)
cur_task = getattr(heartbeat, "current_task", task or "") if heartbeat else (task or "")
async with httpx.AsyncClient(timeout=3.0) as client:
await client.post(
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"current_task": cur_task,
"active_tasks": active,
"error_rate": 0,
"sample_error": "",
"uptime_seconds": 0,
},
)
except Exception:
pass # Best-effort