refactor(workspace): extract idle-loop pending-check guard for direct unit-testing
Some checks failed
sop-tier-check / tier-check (pull_request) bypass
Harness Replays / detect-changes (pull_request) bypass
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 19s
Harness Replays / Harness Replays (pull_request) Has been skipped
CI / Detect changes (pull_request) Successful in 1m0s
E2E API Smoke Test / detect-changes (pull_request) Successful in 56s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 53s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 52s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 53s
CI / Platform (Go) (pull_request) Successful in 10s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 12s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 9s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m32s
CI / Python Lint & Test (pull_request) Failing after 7m50s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8m29s
Secret scan / Scan diff for credential-shaped strings (pull_request) bypass
CI / Canvas (Next.js) (pull_request) Failing after 11m24s
CI / Canvas Deploy Reminder (pull_request) Has been skipped

Follows up on #432 (merged). Extracts _check_delegation_results_pending()
from the inline guard in _run_idle_loop() so tests can call the real
production function directly via patch(builtins.open, ...).

Fixes #401: the previous test used a mirror copy of the guard logic,
which risks drifting from the production implementation over time.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · infra-runtime-be 2026-05-11 10:07:42 +00:00
parent 896d5e70f0
commit 27b82ce786
2 changed files with 80 additions and 59 deletions

View File

@ -48,6 +48,27 @@ def get_machine_ip() -> str: # pragma: no cover
return "127.0.0.1"
def _check_delegation_results_pending() -> bool:
"""Check if there are unconsumed delegation results waiting.
Reads ``DELEGATION_RESULTS_FILE``. Returns ``True`` if the file
exists and contains non-whitespace content (after stripping) meaning
the idle loop should skip this tick. Returns ``False`` if the file is
absent, empty, or contains only whitespace.
The extracted form lets unit tests call this directly rather than mirroring
the logic (anti-pattern flagged as #401).
"""
from heartbeat import DELEGATION_RESULTS_FILE
try:
with open(DELEGATION_RESULTS_FILE) as rf:
rf.seek(0)
return bool(rf.read().strip())
except FileNotFoundError:
return False
# Re-exported from transcript_auth for the inline /transcript handler.
# Separate module keeps the security-critical gate import-light + unit-testable.
from transcript_auth import transcript_authorized as _transcript_authorized
@ -678,20 +699,15 @@ async def main(): # pragma: no cover
# heartbeat's own self-message wake the agent after results are
# written. The agent then sees the results in _prepare_prompt()
# and processes them before composing.
from heartbeat import DELEGATION_RESULTS_FILE as _DRF
try:
with open(_DRF) as _rf:
_rf.seek(0)
_content = _rf.read().strip()
if _content:
print(
f"Idle loop: skipping — {len(_content)} bytes of unconsumed "
f"delegation results pending (heartbeat will notify agent)",
flush=True,
)
continue
except FileNotFoundError:
pass # No results file — normal, proceed with idle prompt
# Guard logic extracted to _check_delegation_results_pending() for
# direct unit-testing (#401 follow-up).
if _check_delegation_results_pending():
print(
"Idle loop: skipping — unconsumed delegation results pending "
"(heartbeat will notify agent)",
flush=True,
)
continue
# Self-post the idle prompt via the platform A2A proxy (same
# path as initial_prompt). The agent's own concurrency control

View File

@ -4,77 +4,82 @@ The idle loop skips sending the idle prompt when DELEGATION_RESULTS_FILE
contains unconsumed results, preventing the agent from composing a stale tick
before processing pending delegation notifications from the heartbeat.
Source: workspace/main.py:_run_idle_loop() pending-results guard.
Source: ``workspace/main.py:_check_delegation_results_pending()`` (extracted from
``_run_idle_loop()`` guard; see PR #432 follow-up).
The guard is extracted into a module-level function so unit tests call the
real production logic directly not a mirror copy. This avoids the
test-mirror anti-pattern (issue #401) where a copied implementation
drifts from the production code it is supposed to test.
"""
from __future__ import annotations
import io
import json
from unittest.mock import patch
import pytest
def check_results_pending(file_path: str) -> bool:
"""Mirror the guard logic from workspace/main.py:_run_idle_loop().
Returns True if the results file exists and is non-empty,
meaning the idle loop should skip this tick.
"""
try:
with open(file_path) as rf:
rf.seek(0)
content = rf.read().strip()
return bool(content)
except FileNotFoundError:
return False
from main import _check_delegation_results_pending
class TestIdleLoopPendingCheck:
"""Tests for the idle-loop pending-delegation-results guard."""
"""Tests for the idle-loop pending-delegation-results guard.
def test_no_file_means_proceed(self, tmp_path):
Each test patches ``builtins.open`` so ``_check_delegation_results_pending``
reads the controlled payload instead of the real DELEGATION_RESULTS_FILE.
No filesystem side-effects.
"""
def _patch_open(self, payload: str | None):
"""Patch builtins.open for _check_delegation_results_pending.
Args:
payload: file contents to return. None FileNotFoundError.
"""
if payload is None:
return patch("builtins.open", side_effect=FileNotFoundError)
else:
fake_file = io.StringIO(payload)
return patch("builtins.open", return_value=fake_file)
def test_no_file_means_proceed(self):
"""No delegation results file → idle loop fires normally."""
results_file = tmp_path / "delegation_results.jsonl"
assert not check_results_pending(str(results_file))
with self._patch_open(None):
assert _check_delegation_results_pending() is False
def test_empty_file_means_proceed(self, tmp_path):
def test_empty_file_means_proceed(self):
"""Empty file → no pending results → idle loop fires."""
results_file = tmp_path / "delegation_results.jsonl"
results_file.write_text("", encoding="utf-8")
assert not check_results_pending(str(results_file))
with self._patch_open(""):
assert _check_delegation_results_pending() is False
def test_whitespace_only_file_means_proceed(self, tmp_path):
def test_whitespace_only_file_means_proceed(self):
"""File with only whitespace → treated as empty → idle loop fires."""
results_file = tmp_path / "delegation_results.jsonl"
results_file.write_text(" \n ", encoding="utf-8")
assert not check_results_pending(str(results_file))
with self._patch_open(" \n "):
assert _check_delegation_results_pending() is False
def test_single_result_means_skip(self, tmp_path):
def test_single_result_means_skip(self):
"""File with one delegation result → skip idle tick."""
results_file = tmp_path / "delegation_results.jsonl"
results_file.write_text(
payload = (
json.dumps({
"status": "completed",
"delegation_id": "del-abc",
"summary": "Done",
}) + "\n",
encoding="utf-8",
}) + "\n"
)
assert check_results_pending(str(results_file))
with self._patch_open(payload):
assert _check_delegation_results_pending() is True
def test_multiple_results_means_skip(self, tmp_path):
def test_multiple_results_means_skip(self):
"""File with multiple delegation results → skip idle tick."""
results_file = tmp_path / "delegation_results.jsonl"
results_file.write_text(
payload = (
json.dumps({"status": "completed", "delegation_id": "del-1", "summary": "A"})
+ "\n"
+ json.dumps({"status": "failed", "delegation_id": "del-2", "summary": "B"})
+ "\n",
encoding="utf-8",
+ "\n"
)
assert check_results_pending(str(results_file))
with self._patch_open(payload):
assert _check_delegation_results_pending() is True
def test_file_with_only_newline_means_proceed(self, tmp_path):
def test_file_with_only_newline_means_proceed(self):
"""File with only a newline character → stripped to empty → fires."""
results_file = tmp_path / "delegation_results.jsonl"
results_file.write_text("\n", encoding="utf-8")
assert not check_results_pending(str(results_file))
with self._patch_open("\n"):
assert _check_delegation_results_pending() is False