[core-be-agent] fix(#354): wire delegation-results consumer into a2a executor

Close the A2A delegation auto-resume gap.

Root cause: heartbeat.py's _check_delegations already writes completed
delegation rows to DELEGATION_RESULTS_FILE and sends a self-message to
wake the agent. executor_helpers.read_delegation_results() was defined to
atomically consume that file, but a2a_executor._core_execute() never
called it — so delegation results were written but the agent never saw
them.

Fix: call read_delegation_results() at the top of _core_execute() and
prepend the results to the user input context so the agent can act on
them without an explicit check_task_status call. The Temporal durable
workflow path is also covered because it calls _core_execute() directly.

Test: two new cases — delegation results injected when file exists;
user input passed through unchanged when file is empty.

Closes molecule-core#354.
This commit is contained in:
Molecule AI · core-be 2026-05-11 02:35:39 +00:00 committed by Molecule AI App-FE
parent dba6fca02b
commit 0c54166e22
2 changed files with 103 additions and 0 deletions

View File

@ -51,6 +51,7 @@ from shared_runtime import (
from executor_helpers import (
collect_outbound_files,
extract_attached_files,
read_delegation_results,
)
from builtin_tools.telemetry import (
A2A_TASK_ID,
@ -215,6 +216,17 @@ class LangGraphA2AExecutor(AgentExecutor):
3. Message(final_text) terminal event
"""
user_input = extract_message_text(context)
# Inject delegation results from prior turns. Heartbeat writes
# completed delegation rows to DELEGATION_RESULTS_FILE and sends
# a self-message to wake the agent; this consumes the file and
# surfaces the results as context so the agent can act on them
# without needing an explicit check_task_status call.
# Results are prepended so they are visible even when the
# self-message text is overwritten by a subsequent user message.
pending_results = read_delegation_results()
if pending_results:
logger.info("A2A execute: injecting %d delegation result(s)", pending_results.count("\n") + 1)
user_input = f"[Delegation results available]\n{pending_results}\n\n{user_input}"
# Pull attached files from A2A message parts (kind: "file") and
# append a manifest to the prompt so the agent knows they exist.
# LangGraph tools (filesystem, bash, skills) can then open the

View File

@ -1205,3 +1205,94 @@ async def test_terminal_error_routes_via_updater_failed():
assert not eq._complete_calls, (
"complete() should not fire when execute() raises"
)
# ---------------------------------------------------------------------------
# Issue #354 — delegation results auto-resume gap
# ---------------------------------------------------------------------------
# heartbeat.py's _check_delegations writes completed delegation rows to
# DELEGATION_RESULTS_FILE and sends a self-message to wake the agent.
# read_delegation_results() in executor_helpers.py atomically reads+consumes
# that file. The fix wires this consumer into _core_execute so the agent
# receives delegation results as context in the next turn — closing the gap
# where parallel delegate_task calls return after the SDK turn ends and the
# agent has no way to discover the results.
@pytest.mark.asyncio
async def test_delegation_results_injected_into_user_input(monkeypatch):
"""When delegation results exist, they are prepended to the user input
passed to the agent so the agent can act on them without an explicit
check_task_status call."""
import a2a_executor
from unittest.mock import patch
pending_results = (
"- [completed] Delegation abc123: Checked 3 issues\n"
" Response: 3 open, 0 critical\n"
"- [failed] Delegation def456: Scan PR #352\n"
" Error: peer workspace offline"
)
# Patch read_delegation_results at the module level where a2a_executor
# imported it so the _core_execute call picks it up.
with patch.object(a2a_executor, "read_delegation_results", return_value=pending_results):
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Got it")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "What's the status?"
context = _make_context([part], "ctx-deleg", task_id="task-deleg")
eq = _make_event_queue()
eq._complete_calls = []
eq._failed_calls = []
await executor.execute(context, eq)
# Verify the agent received the injected context
agent.astream_events.assert_called_once()
call_args = agent.astream_events.call_args
messages = call_args[0][0]["messages"]
# The last message should be a human turn with the injected context
human_turn = messages[-1]
assert human_turn[0] == "human"
# Must contain the delegation results marker
assert "[Delegation results available]" in human_turn[1]
# Must contain the completed delegation
assert "abc123" in human_turn[1]
assert "3 open" in human_turn[1]
# Must contain the failed delegation
assert "def456" in human_turn[1]
# Must contain the original user message
assert "What's the status?" in human_turn[1]
@pytest.mark.asyncio
async def test_no_delegation_results_no_injection(monkeypatch):
"""When no delegation results exist, user input is passed through unchanged."""
import a2a_executor
from unittest.mock import patch
with patch.object(a2a_executor, "read_delegation_results", return_value=""):
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "Hello"
context = _make_context([part], "ctx-clean", task_id="task-clean")
eq = _make_event_queue()
eq._complete_calls = []
eq._failed_calls = []
await executor.execute(context, eq)
agent.astream_events.assert_called_once()
call_args = agent.astream_events.call_args
messages = call_args[0][0]["messages"]
human_turn = messages[-1]
assert human_turn[0] == "human"
# Must NOT contain the injection marker
assert "[Delegation results available]" not in human_turn[1]
assert human_turn[1] == "Hello"