fix(workspace): poll activity_logs for a2a_proxy delegation results (closes #354) (#501)
Some checks failed
CI / Python Lint & Test (push) Has been cancelled
Block internal-flavored paths / Block forbidden paths (push) Successful in 7s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 6s
CI / Detect changes (push) Successful in 19s
E2E API Smoke Test / detect-changes (push) Successful in 21s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 21s
Handlers Postgres Integration / detect-changes (push) Successful in 20s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 20s
CI / Shellcheck (E2E scripts) (push) Successful in 4s
CI / Platform (Go) (push) Successful in 5s
CI / Canvas (Next.js) (push) Successful in 5s
CI / Canvas Deploy Reminder (push) Has been skipped
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 7s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 7s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 5s
publish-runtime-autobump / autobump-and-tag (push) Failing after 41s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 1m47s

Co-authored-by: Molecule AI · core-be <core-be@agents.moleculesai.app>
Co-committed-by: Molecule AI · core-be <core-be@agents.moleculesai.app>
This commit is contained in:
Molecule AI · core-be 2026-05-11 15:53:05 +00:00 committed by Molecule AI · core-lead
parent 9025e86cc7
commit 7b783aa2ed

View File

@ -139,6 +139,14 @@ SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to preve
# same file via executor_helpers.read_delegation_results so heartbeat-
# delivered async delegation results land in the next agent turn.
DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl")
# Cursor file for tracking activity_log IDs processed from the a2a_receive path
# (delegations fired via tool_delegate_task → POST /workspaces/:id/a2a proxy, not
# POST /workspaces/:id/delegate). Persisted to disk so heartbeat restarts
# don't re-process the same rows.
_ACTIVITY_DELEGATION_CURSOR_FILE = os.environ.get(
"DELEGATION_ACTIVITY_CURSOR_FILE",
"/tmp/delegation_activity_cursor",
)
class HeartbeatLoop:
@ -169,6 +177,10 @@ class HeartbeatLoop:
self._seen_delegation_ids: set[str] = set()
self._last_self_message_time = 0.0
self._parent_name: str | None = None # Cached after first lookup
# Seen activity IDs for a2a_receive polling (delegations via POST /a2a proxy path).
# Loaded lazily from cursor file on first poll to avoid blocking startup.
self._seen_activity_ids: set[str] = set()
self._activity_cursor_loaded = False
@property
def error_rate(self) -> float:
@ -293,6 +305,15 @@ class HeartbeatLoop:
except Exception as e:
logger.debug("Delegation check failed: %s", e)
# 3. Check activity_logs for delegation results that arrived via
# the POST /a2a proxy path (tool_delegate_task → send_a2a_message).
# These are NOT written to the delegations table, so
# _check_delegations misses them. See issue #354.
try:
await self._check_activity_delegations(client)
except Exception as e:
logger.debug("Activity delegation check failed: %s", e)
await asyncio.sleep(self._interval_seconds)
except asyncio.CancelledError:
@ -469,3 +490,217 @@ class HeartbeatLoop:
except Exception as e:
logger.debug("Delegation check error: %s", e)
async def _check_activity_delegations(self, client: httpx.AsyncClient):
"""Poll activity_logs for delegation results that arrived via the POST /a2a proxy path.
tool_delegate_task send_a2a_message POST /workspaces/:id/a2a (proxy)
logs to activity_logs but NOT the delegations table. _check_delegations
only checks the delegations table, so these results are invisible to the
heartbeat the agent never wakes up to consume them (issue #354).
This method closes that gap: polls GET /workspaces/:id/activity?type=a2a_receive,
filters for rows from peer workspaces (source_id != "" and != self.workspace_id),
tracks seen IDs with a cursor file, and sends a self-message to wake the agent.
"""
try:
# Load cursor lazily on first call so startup is not blocked by disk I/O.
if not self._activity_cursor_loaded:
self._activity_cursor_loaded = True
try:
if os.path.exists(_ACTIVITY_DELEGATION_CURSOR_FILE):
cursor = open(_ACTIVITY_DELEGATION_CURSOR_FILE).read().strip()
if cursor:
self._seen_activity_ids = set(cursor.split(","))
except Exception:
pass # Corrupt cursor — start fresh
params: dict[str, str] = {"type": "a2a_receive"}
resp = await client.get(
f"{self.platform_url}/workspaces/{self.workspace_id}/activity",
params=params,
headers=auth_headers(),
)
if resp.status_code != 200:
return
rows = resp.json()
if not isinstance(rows, list):
return
# Activity API returns newest-first; process in reverse order so
# we advance the cursor monotonically (oldest → newest).
rows = list(reversed(rows))
new_results: list[dict] = []
last_id: str | None = None
for row in rows:
if not isinstance(row, dict):
continue
activity_id = str(row.get("id", ""))
if not activity_id:
continue
last_id = activity_id
if activity_id in self._seen_activity_ids:
continue
# Filter: must have a non-empty source_id that is NOT this workspace
# (peer agent messages only; skip canvas-user messages and self-notify).
source_id = row.get("source_id") or ""
if not source_id or source_id == self.workspace_id:
continue
self._seen_activity_ids.add(activity_id)
summary = row.get("summary") or ""
# Extract response text from request_body if available.
# Shape mirrors inbox._extract_text: walk parts for "text" field.
response_text = summary
request_body = row.get("request_body")
if isinstance(request_body, dict):
params_obj = request_body.get("params")
if isinstance(params_obj, dict):
msg = params_obj.get("message")
if isinstance(msg, dict):
parts = msg.get("parts") or []
texts = []
for p in (parts if isinstance(parts, list) else []):
if isinstance(p, dict) and p.get("kind") == "text" or p.get("type") == "text":
t = p.get("text", "")
if t:
texts.append(t)
if texts:
response_text = " ".join(texts)
new_results.append({
"delegation_id": activity_id, # Use activity ID as pseudo-delegation ID
"target_id": source_id,
"source_id": self.workspace_id,
"status": "completed",
"summary": summary,
"response_preview": response_text[:4096],
"error": "",
"timestamp": time.time(),
})
if not new_results:
return
# Persist cursor so restarts don't re-process these rows.
if last_id:
try:
with open(_ACTIVITY_DELEGATION_CURSOR_FILE, "w") as f:
# Keep cursor as comma-joined IDs; truncate if over 100KB.
cursor_str = ",".join(sorted(self._seen_activity_ids))
if len(cursor_str) > 102_400:
# Evict oldest half when cursor file grows too large.
sorted_ids = sorted(self._seen_activity_ids)
self._seen_activity_ids = set(sorted_ids[len(sorted_ids) // 2:])
cursor_str = ",".join(sorted(self._seen_activity_ids))
f.write(cursor_str)
except Exception:
pass # Non-fatal; next cycle will retry
# Append to results file and trigger self-message (mirrors _check_delegations).
with open(DELEGATION_RESULTS_FILE, "a") as f:
for r in new_results:
f.write(json.dumps(r) + "\n")
logger.info(
"Heartbeat: %d new a2a_receive delegation results from activity_logs — "
"triggering self-message",
len(new_results),
)
# Build and send self-message to wake the agent.
summary_lines = []
for r in new_results:
line = f"- [completed] Peer response from {r['target_id'][:8]}: {r['summary'][:80] or '(no summary)'}"
if r.get("error"):
line += f"\n Error: {r['error'][:100]}"
summary_lines.append(line)
# Look up parent name (reuse cached value from _check_delegations if set).
if self._parent_name is None:
try:
parent_resp = await client.get(
f"{self.platform_url}/workspaces/{self.workspace_id}",
headers=auth_headers(),
)
if parent_resp.status_code == 200:
parent_id = parent_resp.json().get("parent_id", "")
if parent_id:
parent_info = await client.get(
f"{self.platform_url}/workspaces/{parent_id}",
headers=auth_headers(),
)
if parent_info.status_code == 200:
self._parent_name = parent_info.json().get("name", "")
if self._parent_name is None:
self._parent_name = ""
except Exception:
self._parent_name = ""
parent_name = self._parent_name or ""
report_instruction = ""
if parent_name:
report_instruction = (
f"\n\nIMPORTANT: Delegate a summary of these results to your parent "
f"'{parent_name}' using delegate_task. Also use send_message_to_user "
f"to notify the user."
)
else:
report_instruction = (
"\n\nReport results using send_message_to_user to notify the user."
)
trigger_msg = (
"Delegation results are ready (from a2a_receive via activity_logs). "
"Review them and take appropriate action:\n"
+ "\n".join(summary_lines)
+ report_instruction
)
now = time.time()
if now - self._last_self_message_time < SELF_MESSAGE_COOLDOWN:
logger.debug(
"Heartbeat: self-message cooldown active; "
"a2a_receive results will be retried next cycle"
)
else:
self._last_self_message_time = now
try:
await client.post(
f"{self.platform_url}/workspaces/{self.workspace_id}/a2a",
json={
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": trigger_msg}],
},
},
},
headers=self_source_headers(self.workspace_id),
timeout=120.0,
)
logger.info("Heartbeat: a2a_receive self-message sent")
except Exception as e:
logger.warning("Heartbeat: failed to send a2a_receive self-message: %s", e)
# Also notify the user via canvas.
for r in new_results:
try:
msg = f"Delegation completed: {r['summary'][:100] or '(no summary)'}"
preview = r.get("response_preview", "")
if preview:
msg += f"\nResult: {preview[:200]}"
await client.post(
f"{self.platform_url}/workspaces/{self.workspace_id}/notify",
json={"message": msg, "type": "delegation_result"},
headers=auth_headers(),
)
except Exception:
pass
except Exception as e:
logger.debug("Activity delegation check error: %s", e)