diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 0c160645..b550a350 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -247,8 +247,6 @@ class LangGraphA2AExecutor(AgentExecutor): task_span.set_attribute(A2A_TASK_ID, context.context_id or "") task_span.set_attribute("a2a.input_preview", user_input[:256]) - await set_current_task(self._heartbeat, brief_task(user_input)) - # Resolve IDs — the RequestContextBuilder always sets them, but # we generate fallbacks for safety (e.g. in unit tests). task_id = context.task_id or str(uuid.uuid4()) @@ -257,6 +255,12 @@ class LangGraphA2AExecutor(AgentExecutor): updater = TaskUpdater(event_queue, task_id, context_id) try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a window where cancellation left active_tasks stuck + # at 1, permanently blocking queue drain. (#2026) + await set_current_task(self._heartbeat, brief_task(user_input)) messages = _extract_history(context) if messages: logger.info("A2A execute: injecting %d history messages", len(messages)) diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index e299af6f..893aafdb 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -426,14 +426,19 @@ class ClaudeSDKExecutor(AgentExecutor): # Keep a clean copy of the user's actual message for the memory record, # BEFORE any delegation or memory injection. original_input = user_input - await set_current_task(self.heartbeat, brief_summary(user_input)) logger.debug("SDK execute [claude-code]: %s", user_input[:200]) prompt = self._prepare_prompt(user_input) - prompt = await self._inject_memories_if_first_turn(prompt) response_text: str = "" try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a narrow window where cancellation left active_tasks + # stuck at 1 forever, permanently blocking queue drain. (#2026) + await set_current_task(self.heartbeat, brief_summary(user_input)) + prompt = await self._inject_memories_if_first_turn(prompt) for attempt in range(_MAX_RETRIES): options = self._build_options() try: diff --git a/workspace/cli_executor.py b/workspace/cli_executor.py index 5be84d9f..ce180f82 100644 --- a/workspace/cli_executor.py +++ b/workspace/cli_executor.py @@ -280,9 +280,6 @@ class CLIAgentExecutor(AgentExecutor): # delegation or memory injection happens. original_input = user_input - # Show current task on canvas — extract a brief one-line summary - await set_current_task(self._heartbeat, brief_summary(user_input)) - logger.debug("CLI execute [%s]: %s", self.runtime, user_input[:200]) # Inject delegation results that arrived since last message @@ -290,13 +287,20 @@ class CLIAgentExecutor(AgentExecutor): if delegation_context: user_input = f"[Delegation results received while you were idle]\n{delegation_context}\n\n[New message]\n{user_input}" - # Auto-recall: inject prior memories into every prompt. (The CLI - # runtimes don't keep a session, so there's no "first turn" concept.) - memories = await recall_memories() - if memories: - user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}" - try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a window where cancellation left active_tasks stuck + # at 1, permanently blocking queue drain. (#2026) + await set_current_task(self._heartbeat, brief_summary(user_input)) + + # Auto-recall: inject prior memories into every prompt. (The CLI + # runtimes don't keep a session, so there's no "first turn" concept.) + memories = await recall_memories() + if memories: + user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}" + await self._run_cli(user_input, event_queue) finally: await set_current_task(self._heartbeat, "")