From 4a7e1bd9888f7820047ae9c9674b4b45c13572ae Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Mon, 11 May 2026 10:07:42 +0000 Subject: [PATCH 1/2] refactor(workspace): extract idle-loop pending-check guard for direct unit-testing Follows up on #432 (merged). Extracts _check_delegation_results_pending() from the inline guard in _run_idle_loop() so tests can call the real production function directly via patch(builtins.open, ...). Fixes #401: the previous test used a mirror copy of the guard logic, which risks drifting from the production implementation over time. Co-Authored-By: Claude Opus 4.7 --- workspace/main.py | 44 ++++++--- .../tests/test_idle_loop_pending_check.py | 95 ++++++++++--------- 2 files changed, 80 insertions(+), 59 deletions(-) diff --git a/workspace/main.py b/workspace/main.py index 8c569309..04285815 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -48,6 +48,27 @@ def get_machine_ip() -> str: # pragma: no cover return "127.0.0.1" +def _check_delegation_results_pending() -> bool: + """Check if there are unconsumed delegation results waiting. + + Reads ``DELEGATION_RESULTS_FILE``. Returns ``True`` if the file + exists and contains non-whitespace content (after stripping) — meaning + the idle loop should skip this tick. Returns ``False`` if the file is + absent, empty, or contains only whitespace. + + The extracted form lets unit tests call this directly rather than mirroring + the logic (anti-pattern flagged as #401). + """ + from heartbeat import DELEGATION_RESULTS_FILE + + try: + with open(DELEGATION_RESULTS_FILE) as rf: + rf.seek(0) + return bool(rf.read().strip()) + except FileNotFoundError: + return False + + # Re-exported from transcript_auth for the inline /transcript handler. # Separate module keeps the security-critical gate import-light + unit-testable. from transcript_auth import transcript_authorized as _transcript_authorized @@ -678,20 +699,15 @@ async def main(): # pragma: no cover # heartbeat's own self-message wake the agent after results are # written. The agent then sees the results in _prepare_prompt() # and processes them before composing. - from heartbeat import DELEGATION_RESULTS_FILE as _DRF - try: - with open(_DRF) as _rf: - _rf.seek(0) - _content = _rf.read().strip() - if _content: - print( - f"Idle loop: skipping — {len(_content)} bytes of unconsumed " - f"delegation results pending (heartbeat will notify agent)", - flush=True, - ) - continue - except FileNotFoundError: - pass # No results file — normal, proceed with idle prompt + # Guard logic extracted to _check_delegation_results_pending() for + # direct unit-testing (#401 follow-up). + if _check_delegation_results_pending(): + print( + "Idle loop: skipping — unconsumed delegation results pending " + "(heartbeat will notify agent)", + flush=True, + ) + continue # Self-post the idle prompt via the platform A2A proxy (same # path as initial_prompt). The agent's own concurrency control diff --git a/workspace/tests/test_idle_loop_pending_check.py b/workspace/tests/test_idle_loop_pending_check.py index 6699bf8f..f3a043a8 100644 --- a/workspace/tests/test_idle_loop_pending_check.py +++ b/workspace/tests/test_idle_loop_pending_check.py @@ -4,77 +4,82 @@ The idle loop skips sending the idle prompt when DELEGATION_RESULTS_FILE contains unconsumed results, preventing the agent from composing a stale tick before processing pending delegation notifications from the heartbeat. -Source: workspace/main.py:_run_idle_loop() pending-results guard. +Source: ``workspace/main.py:_check_delegation_results_pending()`` (extracted from +``_run_idle_loop()`` guard; see PR #432 follow-up). + +The guard is extracted into a module-level function so unit tests call the +real production logic directly — not a mirror copy. This avoids the +test-mirror anti-pattern (issue #401) where a copied implementation +drifts from the production code it is supposed to test. """ from __future__ import annotations +import io import json +from unittest.mock import patch -import pytest - - -def check_results_pending(file_path: str) -> bool: - """Mirror the guard logic from workspace/main.py:_run_idle_loop(). - - Returns True if the results file exists and is non-empty, - meaning the idle loop should skip this tick. - """ - try: - with open(file_path) as rf: - rf.seek(0) - content = rf.read().strip() - return bool(content) - except FileNotFoundError: - return False +from main import _check_delegation_results_pending class TestIdleLoopPendingCheck: - """Tests for the idle-loop pending-delegation-results guard.""" + """Tests for the idle-loop pending-delegation-results guard. - def test_no_file_means_proceed(self, tmp_path): + Each test patches ``builtins.open`` so ``_check_delegation_results_pending`` + reads the controlled payload instead of the real DELEGATION_RESULTS_FILE. + No filesystem side-effects. + """ + + def _patch_open(self, payload: str | None): + """Patch builtins.open for _check_delegation_results_pending. + + Args: + payload: file contents to return. None → FileNotFoundError. + """ + if payload is None: + return patch("builtins.open", side_effect=FileNotFoundError) + else: + fake_file = io.StringIO(payload) + return patch("builtins.open", return_value=fake_file) + + def test_no_file_means_proceed(self): """No delegation results file → idle loop fires normally.""" - results_file = tmp_path / "delegation_results.jsonl" - assert not check_results_pending(str(results_file)) + with self._patch_open(None): + assert _check_delegation_results_pending() is False - def test_empty_file_means_proceed(self, tmp_path): + def test_empty_file_means_proceed(self): """Empty file → no pending results → idle loop fires.""" - results_file = tmp_path / "delegation_results.jsonl" - results_file.write_text("", encoding="utf-8") - assert not check_results_pending(str(results_file)) + with self._patch_open(""): + assert _check_delegation_results_pending() is False - def test_whitespace_only_file_means_proceed(self, tmp_path): + def test_whitespace_only_file_means_proceed(self): """File with only whitespace → treated as empty → idle loop fires.""" - results_file = tmp_path / "delegation_results.jsonl" - results_file.write_text(" \n ", encoding="utf-8") - assert not check_results_pending(str(results_file)) + with self._patch_open(" \n "): + assert _check_delegation_results_pending() is False - def test_single_result_means_skip(self, tmp_path): + def test_single_result_means_skip(self): """File with one delegation result → skip idle tick.""" - results_file = tmp_path / "delegation_results.jsonl" - results_file.write_text( + payload = ( json.dumps({ "status": "completed", "delegation_id": "del-abc", "summary": "Done", - }) + "\n", - encoding="utf-8", + }) + "\n" ) - assert check_results_pending(str(results_file)) + with self._patch_open(payload): + assert _check_delegation_results_pending() is True - def test_multiple_results_means_skip(self, tmp_path): + def test_multiple_results_means_skip(self): """File with multiple delegation results → skip idle tick.""" - results_file = tmp_path / "delegation_results.jsonl" - results_file.write_text( + payload = ( json.dumps({"status": "completed", "delegation_id": "del-1", "summary": "A"}) + "\n" + json.dumps({"status": "failed", "delegation_id": "del-2", "summary": "B"}) - + "\n", - encoding="utf-8", + + "\n" ) - assert check_results_pending(str(results_file)) + with self._patch_open(payload): + assert _check_delegation_results_pending() is True - def test_file_with_only_newline_means_proceed(self, tmp_path): + def test_file_with_only_newline_means_proceed(self): """File with only a newline character → stripped to empty → fires.""" - results_file = tmp_path / "delegation_results.jsonl" - results_file.write_text("\n", encoding="utf-8") - assert not check_results_pending(str(results_file)) + with self._patch_open("\n"): + assert _check_delegation_results_pending() is False -- 2.45.2 From df2e69b32fe33d5574c04af901bef89f96c46f74 Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Mon, 11 May 2026 10:32:24 +0000 Subject: [PATCH 2/2] ci: re-trigger Gitea Actions status reporting (infra-runtime-be-agent) -- 2.45.2