From 6f24cc0961af70ee789bd6764e0d2d4c85d7911c Mon Sep 17 00:00:00 2001 From: Molecule AI Core Platform Lead Date: Fri, 24 Apr 2026 18:03:12 +0000 Subject: [PATCH] fix(executors): move set_current_task inside try so active_tasks always decrements (#2026) If asyncio.CancelledError arrived during the heartbeat HTTP push inside set_current_task() (the increment call), the code raised before entering the try/finally block in _execute_locked. The finally block never ran, so active_tasks stayed at 1 forever. Every subsequent heartbeat reported active_tasks=1, the server saw active_tasks < max_concurrent_tasks as false (1 < 1), and DrainQueueForWorkspace never fired. Queued A2A requests were permanently stuck. Fix: move set_current_task(increment) to be the FIRST statement inside the try block, not before it. set_current_task's synchronous portion (heartbeat.active_tasks mutation) still runs unconditionally; only the optional HTTP push can be cancelled. The finally block now always runs and always decrements active_tasks back to 0. Affected executors: claude_sdk_executor, cli_executor, a2a_executor. hermes_executor is not affected (does not call set_current_task). Root cause of today's "active_tasks: 1 + queue drain never triggers" P1 pattern across three workspaces. All 167 executor tests pass. Co-Authored-By: Claude Sonnet 4.6 --- workspace/a2a_executor.py | 8 ++++++-- workspace/claude_sdk_executor.py | 9 +++++++-- workspace/cli_executor.py | 22 +++++++++++++--------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 0c160645..b550a350 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -247,8 +247,6 @@ class LangGraphA2AExecutor(AgentExecutor): task_span.set_attribute(A2A_TASK_ID, context.context_id or "") task_span.set_attribute("a2a.input_preview", user_input[:256]) - await set_current_task(self._heartbeat, brief_task(user_input)) - # Resolve IDs — the RequestContextBuilder always sets them, but # we generate fallbacks for safety (e.g. in unit tests). task_id = context.task_id or str(uuid.uuid4()) @@ -257,6 +255,12 @@ class LangGraphA2AExecutor(AgentExecutor): updater = TaskUpdater(event_queue, task_id, context_id) try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a window where cancellation left active_tasks stuck + # at 1, permanently blocking queue drain. (#2026) + await set_current_task(self._heartbeat, brief_task(user_input)) messages = _extract_history(context) if messages: logger.info("A2A execute: injecting %d history messages", len(messages)) diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index e299af6f..893aafdb 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -426,14 +426,19 @@ class ClaudeSDKExecutor(AgentExecutor): # Keep a clean copy of the user's actual message for the memory record, # BEFORE any delegation or memory injection. original_input = user_input - await set_current_task(self.heartbeat, brief_summary(user_input)) logger.debug("SDK execute [claude-code]: %s", user_input[:200]) prompt = self._prepare_prompt(user_input) - prompt = await self._inject_memories_if_first_turn(prompt) response_text: str = "" try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a narrow window where cancellation left active_tasks + # stuck at 1 forever, permanently blocking queue drain. (#2026) + await set_current_task(self.heartbeat, brief_summary(user_input)) + prompt = await self._inject_memories_if_first_turn(prompt) for attempt in range(_MAX_RETRIES): options = self._build_options() try: diff --git a/workspace/cli_executor.py b/workspace/cli_executor.py index 5be84d9f..ce180f82 100644 --- a/workspace/cli_executor.py +++ b/workspace/cli_executor.py @@ -280,9 +280,6 @@ class CLIAgentExecutor(AgentExecutor): # delegation or memory injection happens. original_input = user_input - # Show current task on canvas — extract a brief one-line summary - await set_current_task(self._heartbeat, brief_summary(user_input)) - logger.debug("CLI execute [%s]: %s", self.runtime, user_input[:200]) # Inject delegation results that arrived since last message @@ -290,13 +287,20 @@ class CLIAgentExecutor(AgentExecutor): if delegation_context: user_input = f"[Delegation results received while you were idle]\n{delegation_context}\n\n[New message]\n{user_input}" - # Auto-recall: inject prior memories into every prompt. (The CLI - # runtimes don't keep a session, so there's no "first turn" concept.) - memories = await recall_memories() - if memories: - user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}" - try: + # set_current_task INSIDE the try so active_tasks is always + # decremented by the finally block even if CancelledError hits + # during the heartbeat HTTP push. Moving it outside the try + # created a window where cancellation left active_tasks stuck + # at 1, permanently blocking queue drain. (#2026) + await set_current_task(self._heartbeat, brief_summary(user_input)) + + # Auto-recall: inject prior memories into every prompt. (The CLI + # runtimes don't keep a session, so there's no "first turn" concept.) + memories = await recall_memories() + if memories: + user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}" + await self._run_cli(user_input, event_queue) finally: await set_current_task(self._heartbeat, "")