From 0c54166e223cdde7b9b53fed096c112c331d202e Mon Sep 17 00:00:00 2001 From: Molecule AI Core-BE Date: Mon, 11 May 2026 02:35:39 +0000 Subject: [PATCH] [core-be-agent] fix(#354): wire delegation-results consumer into a2a executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close the A2A delegation auto-resume gap. Root cause: heartbeat.py's _check_delegations already writes completed delegation rows to DELEGATION_RESULTS_FILE and sends a self-message to wake the agent. executor_helpers.read_delegation_results() was defined to atomically consume that file, but a2a_executor._core_execute() never called it — so delegation results were written but the agent never saw them. Fix: call read_delegation_results() at the top of _core_execute() and prepend the results to the user input context so the agent can act on them without an explicit check_task_status call. The Temporal durable workflow path is also covered because it calls _core_execute() directly. Test: two new cases — delegation results injected when file exists; user input passed through unchanged when file is empty. Closes molecule-core#354. --- workspace/a2a_executor.py | 12 ++++ workspace/tests/test_a2a_executor.py | 91 ++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 9b4d9464..ddcc8ea0 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -51,6 +51,7 @@ from shared_runtime import ( from executor_helpers import ( collect_outbound_files, extract_attached_files, + read_delegation_results, ) from builtin_tools.telemetry import ( A2A_TASK_ID, @@ -215,6 +216,17 @@ class LangGraphA2AExecutor(AgentExecutor): 3. Message(final_text) — terminal event """ user_input = extract_message_text(context) + # Inject delegation results from prior turns. Heartbeat writes + # completed delegation rows to DELEGATION_RESULTS_FILE and sends + # a self-message to wake the agent; this consumes the file and + # surfaces the results as context so the agent can act on them + # without needing an explicit check_task_status call. + # Results are prepended so they are visible even when the + # self-message text is overwritten by a subsequent user message. + pending_results = read_delegation_results() + if pending_results: + logger.info("A2A execute: injecting %d delegation result(s)", pending_results.count("\n") + 1) + user_input = f"[Delegation results available]\n{pending_results}\n\n{user_input}" # Pull attached files from A2A message parts (kind: "file") and # append a manifest to the prompt so the agent knows they exist. # LangGraph tools (filesystem, bash, skills) can then open the diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index a61ed0a7..24b8fd68 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -1205,3 +1205,94 @@ async def test_terminal_error_routes_via_updater_failed(): assert not eq._complete_calls, ( "complete() should not fire when execute() raises" ) + + +# --------------------------------------------------------------------------- +# Issue #354 — delegation results auto-resume gap +# --------------------------------------------------------------------------- +# heartbeat.py's _check_delegations writes completed delegation rows to +# DELEGATION_RESULTS_FILE and sends a self-message to wake the agent. +# read_delegation_results() in executor_helpers.py atomically reads+consumes +# that file. The fix wires this consumer into _core_execute so the agent +# receives delegation results as context in the next turn — closing the gap +# where parallel delegate_task calls return after the SDK turn ends and the +# agent has no way to discover the results. + +@pytest.mark.asyncio +async def test_delegation_results_injected_into_user_input(monkeypatch): + """When delegation results exist, they are prepended to the user input + passed to the agent so the agent can act on them without an explicit + check_task_status call.""" + import a2a_executor + from unittest.mock import patch + + pending_results = ( + "- [completed] Delegation abc123: Checked 3 issues\n" + " Response: 3 open, 0 critical\n" + "- [failed] Delegation def456: Scan PR #352\n" + " Error: peer workspace offline" + ) + + # Patch read_delegation_results at the module level where a2a_executor + # imported it so the _core_execute call picks it up. + with patch.object(a2a_executor, "read_delegation_results", return_value=pending_results): + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Got it"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "What's the status?" + context = _make_context([part], "ctx-deleg", task_id="task-deleg") + eq = _make_event_queue() + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + # Verify the agent received the injected context + agent.astream_events.assert_called_once() + call_args = agent.astream_events.call_args + messages = call_args[0][0]["messages"] + + # The last message should be a human turn with the injected context + human_turn = messages[-1] + assert human_turn[0] == "human" + # Must contain the delegation results marker + assert "[Delegation results available]" in human_turn[1] + # Must contain the completed delegation + assert "abc123" in human_turn[1] + assert "3 open" in human_turn[1] + # Must contain the failed delegation + assert "def456" in human_turn[1] + # Must contain the original user message + assert "What's the status?" in human_turn[1] + + +@pytest.mark.asyncio +async def test_no_delegation_results_no_injection(monkeypatch): + """When no delegation results exist, user input is passed through unchanged.""" + import a2a_executor + from unittest.mock import patch + + with patch.object(a2a_executor, "read_delegation_results", return_value=""): + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Hello" + context = _make_context([part], "ctx-clean", task_id="task-clean") + eq = _make_event_queue() + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + agent.astream_events.assert_called_once() + call_args = agent.astream_events.call_args + messages = call_args[0][0]["messages"] + human_turn = messages[-1] + assert human_turn[0] == "human" + # Must NOT contain the injection marker + assert "[Delegation results available]" not in human_turn[1] + assert human_turn[1] == "Hello"