diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index d345d5a7..d418f127 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -139,6 +139,14 @@ SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to preve # same file via executor_helpers.read_delegation_results so heartbeat- # delivered async delegation results land in the next agent turn. DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl") +# Cursor file for tracking activity_log IDs processed from the a2a_receive path +# (delegations fired via tool_delegate_task → POST /workspaces/:id/a2a proxy, not +# POST /workspaces/:id/delegate). Persisted to disk so heartbeat restarts +# don't re-process the same rows. +_ACTIVITY_DELEGATION_CURSOR_FILE = os.environ.get( + "DELEGATION_ACTIVITY_CURSOR_FILE", + "/tmp/delegation_activity_cursor", +) class HeartbeatLoop: @@ -169,6 +177,10 @@ class HeartbeatLoop: self._seen_delegation_ids: set[str] = set() self._last_self_message_time = 0.0 self._parent_name: str | None = None # Cached after first lookup + # Seen activity IDs for a2a_receive polling (delegations via POST /a2a proxy path). + # Loaded lazily from cursor file on first poll to avoid blocking startup. + self._seen_activity_ids: set[str] = set() + self._activity_cursor_loaded = False @property def error_rate(self) -> float: @@ -293,6 +305,15 @@ class HeartbeatLoop: except Exception as e: logger.debug("Delegation check failed: %s", e) + # 3. Check activity_logs for delegation results that arrived via + # the POST /a2a proxy path (tool_delegate_task → send_a2a_message). + # These are NOT written to the delegations table, so + # _check_delegations misses them. See issue #354. + try: + await self._check_activity_delegations(client) + except Exception as e: + logger.debug("Activity delegation check failed: %s", e) + await asyncio.sleep(self._interval_seconds) except asyncio.CancelledError: @@ -469,3 +490,217 @@ class HeartbeatLoop: except Exception as e: logger.debug("Delegation check error: %s", e) + + async def _check_activity_delegations(self, client: httpx.AsyncClient): + """Poll activity_logs for delegation results that arrived via the POST /a2a proxy path. + + tool_delegate_task → send_a2a_message → POST /workspaces/:id/a2a (proxy) + logs to activity_logs but NOT the delegations table. _check_delegations + only checks the delegations table, so these results are invisible to the + heartbeat — the agent never wakes up to consume them (issue #354). + + This method closes that gap: polls GET /workspaces/:id/activity?type=a2a_receive, + filters for rows from peer workspaces (source_id != "" and != self.workspace_id), + tracks seen IDs with a cursor file, and sends a self-message to wake the agent. + """ + try: + # Load cursor lazily on first call so startup is not blocked by disk I/O. + if not self._activity_cursor_loaded: + self._activity_cursor_loaded = True + try: + if os.path.exists(_ACTIVITY_DELEGATION_CURSOR_FILE): + cursor = open(_ACTIVITY_DELEGATION_CURSOR_FILE).read().strip() + if cursor: + self._seen_activity_ids = set(cursor.split(",")) + except Exception: + pass # Corrupt cursor — start fresh + + params: dict[str, str] = {"type": "a2a_receive"} + resp = await client.get( + f"{self.platform_url}/workspaces/{self.workspace_id}/activity", + params=params, + headers=auth_headers(), + ) + if resp.status_code != 200: + return + + rows = resp.json() + if not isinstance(rows, list): + return + + # Activity API returns newest-first; process in reverse order so + # we advance the cursor monotonically (oldest → newest). + rows = list(reversed(rows)) + + new_results: list[dict] = [] + last_id: str | None = None + for row in rows: + if not isinstance(row, dict): + continue + activity_id = str(row.get("id", "")) + if not activity_id: + continue + last_id = activity_id + + if activity_id in self._seen_activity_ids: + continue + + # Filter: must have a non-empty source_id that is NOT this workspace + # (peer agent messages only; skip canvas-user messages and self-notify). + source_id = row.get("source_id") or "" + if not source_id or source_id == self.workspace_id: + continue + + self._seen_activity_ids.add(activity_id) + summary = row.get("summary") or "" + # Extract response text from request_body if available. + # Shape mirrors inbox._extract_text: walk parts for "text" field. + response_text = summary + request_body = row.get("request_body") + if isinstance(request_body, dict): + params_obj = request_body.get("params") + if isinstance(params_obj, dict): + msg = params_obj.get("message") + if isinstance(msg, dict): + parts = msg.get("parts") or [] + texts = [] + for p in (parts if isinstance(parts, list) else []): + if isinstance(p, dict) and p.get("kind") == "text" or p.get("type") == "text": + t = p.get("text", "") + if t: + texts.append(t) + if texts: + response_text = " ".join(texts) + + new_results.append({ + "delegation_id": activity_id, # Use activity ID as pseudo-delegation ID + "target_id": source_id, + "source_id": self.workspace_id, + "status": "completed", + "summary": summary, + "response_preview": response_text[:4096], + "error": "", + "timestamp": time.time(), + }) + + if not new_results: + return + + # Persist cursor so restarts don't re-process these rows. + if last_id: + try: + with open(_ACTIVITY_DELEGATION_CURSOR_FILE, "w") as f: + # Keep cursor as comma-joined IDs; truncate if over 100KB. + cursor_str = ",".join(sorted(self._seen_activity_ids)) + if len(cursor_str) > 102_400: + # Evict oldest half when cursor file grows too large. + sorted_ids = sorted(self._seen_activity_ids) + self._seen_activity_ids = set(sorted_ids[len(sorted_ids) // 2:]) + cursor_str = ",".join(sorted(self._seen_activity_ids)) + f.write(cursor_str) + except Exception: + pass # Non-fatal; next cycle will retry + + # Append to results file and trigger self-message (mirrors _check_delegations). + with open(DELEGATION_RESULTS_FILE, "a") as f: + for r in new_results: + f.write(json.dumps(r) + "\n") + logger.info( + "Heartbeat: %d new a2a_receive delegation results from activity_logs — " + "triggering self-message", + len(new_results), + ) + + # Build and send self-message to wake the agent. + summary_lines = [] + for r in new_results: + line = f"- [completed] Peer response from {r['target_id'][:8]}: {r['summary'][:80] or '(no summary)'}" + if r.get("error"): + line += f"\n Error: {r['error'][:100]}" + summary_lines.append(line) + + # Look up parent name (reuse cached value from _check_delegations if set). + if self._parent_name is None: + try: + parent_resp = await client.get( + f"{self.platform_url}/workspaces/{self.workspace_id}", + headers=auth_headers(), + ) + if parent_resp.status_code == 200: + parent_id = parent_resp.json().get("parent_id", "") + if parent_id: + parent_info = await client.get( + f"{self.platform_url}/workspaces/{parent_id}", + headers=auth_headers(), + ) + if parent_info.status_code == 200: + self._parent_name = parent_info.json().get("name", "") + if self._parent_name is None: + self._parent_name = "" + except Exception: + self._parent_name = "" + parent_name = self._parent_name or "" + + report_instruction = "" + if parent_name: + report_instruction = ( + f"\n\nIMPORTANT: Delegate a summary of these results to your parent " + f"'{parent_name}' using delegate_task. Also use send_message_to_user " + f"to notify the user." + ) + else: + report_instruction = ( + "\n\nReport results using send_message_to_user to notify the user." + ) + + trigger_msg = ( + "Delegation results are ready (from a2a_receive via activity_logs). " + "Review them and take appropriate action:\n" + + "\n".join(summary_lines) + + report_instruction + ) + + now = time.time() + if now - self._last_self_message_time < SELF_MESSAGE_COOLDOWN: + logger.debug( + "Heartbeat: self-message cooldown active; " + "a2a_receive results will be retried next cycle" + ) + else: + self._last_self_message_time = now + try: + await client.post( + f"{self.platform_url}/workspaces/{self.workspace_id}/a2a", + json={ + "method": "message/send", + "params": { + "message": { + "role": "user", + "parts": [{"type": "text", "text": trigger_msg}], + }, + }, + }, + headers=self_source_headers(self.workspace_id), + timeout=120.0, + ) + logger.info("Heartbeat: a2a_receive self-message sent") + except Exception as e: + logger.warning("Heartbeat: failed to send a2a_receive self-message: %s", e) + + # Also notify the user via canvas. + for r in new_results: + try: + msg = f"Delegation completed: {r['summary'][:100] or '(no summary)'}" + preview = r.get("response_preview", "") + if preview: + msg += f"\nResult: {preview[:200]}" + await client.post( + f"{self.platform_url}/workspaces/{self.workspace_id}/notify", + json={"message": msg, "type": "delegation_result"}, + headers=auth_headers(), + ) + except Exception: + pass + + except Exception as e: + logger.debug("Activity delegation check error: %s", e)