Merge pull request #2048 from Molecule-AI/fix/active-tasks-cancellation-stuck-2026

fix(executors): active_tasks stuck at 1 under CancelledError — queue drain blocked (#2026)
This commit is contained in:
molecule-ai[bot] 2026-04-24 18:17:03 +00:00 committed by GitHub
commit f5d44eba8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 13 deletions

View File

@ -247,8 +247,6 @@ class LangGraphA2AExecutor(AgentExecutor):
task_span.set_attribute(A2A_TASK_ID, context.context_id or "")
task_span.set_attribute("a2a.input_preview", user_input[:256])
await set_current_task(self._heartbeat, brief_task(user_input))
# Resolve IDs — the RequestContextBuilder always sets them, but
# we generate fallbacks for safety (e.g. in unit tests).
task_id = context.task_id or str(uuid.uuid4())
@ -257,6 +255,12 @@ class LangGraphA2AExecutor(AgentExecutor):
updater = TaskUpdater(event_queue, task_id, context_id)
try:
# set_current_task INSIDE the try so active_tasks is always
# decremented by the finally block even if CancelledError hits
# during the heartbeat HTTP push. Moving it outside the try
# created a window where cancellation left active_tasks stuck
# at 1, permanently blocking queue drain. (#2026)
await set_current_task(self._heartbeat, brief_task(user_input))
messages = _extract_history(context)
if messages:
logger.info("A2A execute: injecting %d history messages", len(messages))

View File

@ -426,14 +426,19 @@ class ClaudeSDKExecutor(AgentExecutor):
# Keep a clean copy of the user's actual message for the memory record,
# BEFORE any delegation or memory injection.
original_input = user_input
await set_current_task(self.heartbeat, brief_summary(user_input))
logger.debug("SDK execute [claude-code]: %s", user_input[:200])
prompt = self._prepare_prompt(user_input)
prompt = await self._inject_memories_if_first_turn(prompt)
response_text: str = ""
try:
# set_current_task INSIDE the try so active_tasks is always
# decremented by the finally block even if CancelledError hits
# during the heartbeat HTTP push. Moving it outside the try
# created a narrow window where cancellation left active_tasks
# stuck at 1 forever, permanently blocking queue drain. (#2026)
await set_current_task(self.heartbeat, brief_summary(user_input))
prompt = await self._inject_memories_if_first_turn(prompt)
for attempt in range(_MAX_RETRIES):
options = self._build_options()
try:

View File

@ -280,9 +280,6 @@ class CLIAgentExecutor(AgentExecutor):
# delegation or memory injection happens.
original_input = user_input
# Show current task on canvas — extract a brief one-line summary
await set_current_task(self._heartbeat, brief_summary(user_input))
logger.debug("CLI execute [%s]: %s", self.runtime, user_input[:200])
# Inject delegation results that arrived since last message
@ -290,13 +287,20 @@ class CLIAgentExecutor(AgentExecutor):
if delegation_context:
user_input = f"[Delegation results received while you were idle]\n{delegation_context}\n\n[New message]\n{user_input}"
# Auto-recall: inject prior memories into every prompt. (The CLI
# runtimes don't keep a session, so there's no "first turn" concept.)
memories = await recall_memories()
if memories:
user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}"
try:
# set_current_task INSIDE the try so active_tasks is always
# decremented by the finally block even if CancelledError hits
# during the heartbeat HTTP push. Moving it outside the try
# created a window where cancellation left active_tasks stuck
# at 1, permanently blocking queue drain. (#2026)
await set_current_task(self._heartbeat, brief_summary(user_input))
# Auto-recall: inject prior memories into every prompt. (The CLI
# runtimes don't keep a session, so there's no "first turn" concept.)
memories = await recall_memories()
if memories:
user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}"
await self._run_cli(user_input, event_queue)
finally:
await set_current_task(self._heartbeat, "")