fix(heartbeat): increment/decrement active_tasks + push on clear (#1372, #1408)

Both set_current_task() implementations (shared_runtime.py + executor_helpers.py):
- Increment active_tasks on task start, decrement on completion (was binary 0/1)
- Push heartbeat immediately on BOTH increment AND decrement
- Only clear current_task when active_tasks reaches 0 (preserves description
  for still-running tasks)

Fixes phantom-busy: the old code returned early on clear, leaving
active_tasks=1 in the platform DB until the next 30s heartbeat cycle.
If a new cron fired before the heartbeat, the workspace appeared
permanently busy — required manual DB reset every 30 min.

Bump: 0.1.2 → 0.1.3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-21 06:37:12 -07:00
parent 2da6f2d1cd
commit d3235cc564
3 changed files with 32 additions and 20 deletions

View File

@ -153,20 +153,18 @@ def brief_task(text: str, limit: int = 60) -> str:
async def set_current_task(heartbeat: Any, task: str) -> None:
"""Update current task on heartbeat and push immediately to platform.
The heartbeat loop only fires every 30s, so quick tasks would finish
before the canvas ever sees them. Setting a task pushes immediately.
Clearing a task only updates the heartbeat object the next heartbeat
cycle will broadcast the clear, keeping the task visible longer.
Uses increment/decrement instead of binary 0/1 so agents can track
multiple concurrent tasks (#1408). Pushes immediately on both
increment and decrement to avoid phantom-busy (#1372).
"""
if heartbeat:
heartbeat.current_task = task
heartbeat.active_tasks = 1 if task else 0
# Only push immediately when SETTING a task (not clearing)
# Clearing is handled by the next heartbeat cycle, which keeps
# the task visible on the canvas for quick A2A responses
if not task:
return
if task:
heartbeat.active_tasks = getattr(heartbeat, "active_tasks", 0) + 1
heartbeat.current_task = task
else:
heartbeat.active_tasks = max(0, getattr(heartbeat, "active_tasks", 0) - 1)
if heartbeat.active_tasks == 0:
heartbeat.current_task = ""
import os
workspace_id = os.environ.get("WORKSPACE_ID", "")
@ -174,13 +172,15 @@ async def set_current_task(heartbeat: Any, task: str) -> None:
if workspace_id and platform_url:
try:
import httpx
active = getattr(heartbeat, "active_tasks", 0) if heartbeat else (1 if task else 0)
cur_task = getattr(heartbeat, "current_task", task or "") if heartbeat else (task or "")
async with httpx.AsyncClient(timeout=3.0) as client:
await client.post(
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"current_task": task,
"active_tasks": 1,
"current_task": cur_task,
"active_tasks": active,
"error_rate": 0,
"sample_error": "",
"uptime_seconds": 0,

View File

@ -223,14 +223,26 @@ def read_delegation_results() -> str:
# ========================================================================
async def set_current_task(heartbeat: "HeartbeatLoop | None", task: str) -> None:
"""Update current task on heartbeat and push immediately via platform API."""
"""Update current task on heartbeat and push immediately via platform API.
Uses increment/decrement instead of binary 0/1 so agents can track
multiple concurrent tasks (#1408). Pushes immediately on both
increment and decrement to avoid phantom-busy (#1372).
"""
if heartbeat is not None:
heartbeat.current_task = task
heartbeat.active_tasks = 1 if task else 0
if task:
heartbeat.active_tasks = getattr(heartbeat, "active_tasks", 0) + 1
heartbeat.current_task = task
else:
heartbeat.active_tasks = max(0, getattr(heartbeat, "active_tasks", 0) - 1)
if heartbeat.active_tasks == 0:
heartbeat.current_task = ""
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if not (workspace_id and platform_url):
return
active = getattr(heartbeat, "active_tasks", 0) if heartbeat is not None else (1 if task else 0)
cur_task = getattr(heartbeat, "current_task", task or "") if heartbeat is not None else (task or "")
try:
try:
from platform_auth import auth_headers as _auth
@ -241,8 +253,8 @@ async def set_current_task(heartbeat: "HeartbeatLoop | None", task: str) -> None
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"current_task": task,
"active_tasks": 1 if task else 0,
"current_task": cur_task,
"active_tasks": active,
"error_rate": 0,
"sample_error": "",
"uptime_seconds": 0,

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "molecule-ai-workspace-runtime"
version = "0.1.2"
version = "0.1.3"
description = "Molecule AI workspace runtime — shared infrastructure for all agent adapters"
requires-python = ">=3.11"
license = {text = "BSL-1.1"}