refactor(workspace): extract idle-loop pending-check guard for direct unit-testing
Some checks failed
sop-tier-check / tier-check (pull_request) bypass
Harness Replays / detect-changes (pull_request) bypass
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 19s
Harness Replays / Harness Replays (pull_request) Has been skipped
CI / Detect changes (pull_request) Successful in 1m0s
E2E API Smoke Test / detect-changes (pull_request) Successful in 56s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 53s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 52s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 53s
CI / Platform (Go) (pull_request) Successful in 10s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 12s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 9s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m32s
CI / Python Lint & Test (pull_request) Failing after 7m50s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8m29s
Secret scan / Scan diff for credential-shaped strings (pull_request) bypass
CI / Canvas (Next.js) (pull_request) Failing after 11m24s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Some checks failed
sop-tier-check / tier-check (pull_request) bypass
Harness Replays / detect-changes (pull_request) bypass
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 19s
Harness Replays / Harness Replays (pull_request) Has been skipped
CI / Detect changes (pull_request) Successful in 1m0s
E2E API Smoke Test / detect-changes (pull_request) Successful in 56s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 53s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 52s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 53s
CI / Platform (Go) (pull_request) Successful in 10s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 12s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 9s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m32s
CI / Python Lint & Test (pull_request) Failing after 7m50s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8m29s
Secret scan / Scan diff for credential-shaped strings (pull_request) bypass
CI / Canvas (Next.js) (pull_request) Failing after 11m24s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Follows up on #432 (merged). Extracts _check_delegation_results_pending() from the inline guard in _run_idle_loop() so tests can call the real production function directly via patch(builtins.open, ...). Fixes #401: the previous test used a mirror copy of the guard logic, which risks drifting from the production implementation over time. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
896d5e70f0
commit
27b82ce786
@ -48,6 +48,27 @@ def get_machine_ip() -> str: # pragma: no cover
|
||||
return "127.0.0.1"
|
||||
|
||||
|
||||
def _check_delegation_results_pending() -> bool:
|
||||
"""Check if there are unconsumed delegation results waiting.
|
||||
|
||||
Reads ``DELEGATION_RESULTS_FILE``. Returns ``True`` if the file
|
||||
exists and contains non-whitespace content (after stripping) — meaning
|
||||
the idle loop should skip this tick. Returns ``False`` if the file is
|
||||
absent, empty, or contains only whitespace.
|
||||
|
||||
The extracted form lets unit tests call this directly rather than mirroring
|
||||
the logic (anti-pattern flagged as #401).
|
||||
"""
|
||||
from heartbeat import DELEGATION_RESULTS_FILE
|
||||
|
||||
try:
|
||||
with open(DELEGATION_RESULTS_FILE) as rf:
|
||||
rf.seek(0)
|
||||
return bool(rf.read().strip())
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
|
||||
|
||||
# Re-exported from transcript_auth for the inline /transcript handler.
|
||||
# Separate module keeps the security-critical gate import-light + unit-testable.
|
||||
from transcript_auth import transcript_authorized as _transcript_authorized
|
||||
@ -678,20 +699,15 @@ async def main(): # pragma: no cover
|
||||
# heartbeat's own self-message wake the agent after results are
|
||||
# written. The agent then sees the results in _prepare_prompt()
|
||||
# and processes them before composing.
|
||||
from heartbeat import DELEGATION_RESULTS_FILE as _DRF
|
||||
try:
|
||||
with open(_DRF) as _rf:
|
||||
_rf.seek(0)
|
||||
_content = _rf.read().strip()
|
||||
if _content:
|
||||
print(
|
||||
f"Idle loop: skipping — {len(_content)} bytes of unconsumed "
|
||||
f"delegation results pending (heartbeat will notify agent)",
|
||||
flush=True,
|
||||
)
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
pass # No results file — normal, proceed with idle prompt
|
||||
# Guard logic extracted to _check_delegation_results_pending() for
|
||||
# direct unit-testing (#401 follow-up).
|
||||
if _check_delegation_results_pending():
|
||||
print(
|
||||
"Idle loop: skipping — unconsumed delegation results pending "
|
||||
"(heartbeat will notify agent)",
|
||||
flush=True,
|
||||
)
|
||||
continue
|
||||
|
||||
# Self-post the idle prompt via the platform A2A proxy (same
|
||||
# path as initial_prompt). The agent's own concurrency control
|
||||
|
||||
@ -4,77 +4,82 @@ The idle loop skips sending the idle prompt when DELEGATION_RESULTS_FILE
|
||||
contains unconsumed results, preventing the agent from composing a stale tick
|
||||
before processing pending delegation notifications from the heartbeat.
|
||||
|
||||
Source: workspace/main.py:_run_idle_loop() pending-results guard.
|
||||
Source: ``workspace/main.py:_check_delegation_results_pending()`` (extracted from
|
||||
``_run_idle_loop()`` guard; see PR #432 follow-up).
|
||||
|
||||
The guard is extracted into a module-level function so unit tests call the
|
||||
real production logic directly — not a mirror copy. This avoids the
|
||||
test-mirror anti-pattern (issue #401) where a copied implementation
|
||||
drifts from the production code it is supposed to test.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def check_results_pending(file_path: str) -> bool:
|
||||
"""Mirror the guard logic from workspace/main.py:_run_idle_loop().
|
||||
|
||||
Returns True if the results file exists and is non-empty,
|
||||
meaning the idle loop should skip this tick.
|
||||
"""
|
||||
try:
|
||||
with open(file_path) as rf:
|
||||
rf.seek(0)
|
||||
content = rf.read().strip()
|
||||
return bool(content)
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
from main import _check_delegation_results_pending
|
||||
|
||||
|
||||
class TestIdleLoopPendingCheck:
|
||||
"""Tests for the idle-loop pending-delegation-results guard."""
|
||||
"""Tests for the idle-loop pending-delegation-results guard.
|
||||
|
||||
def test_no_file_means_proceed(self, tmp_path):
|
||||
Each test patches ``builtins.open`` so ``_check_delegation_results_pending``
|
||||
reads the controlled payload instead of the real DELEGATION_RESULTS_FILE.
|
||||
No filesystem side-effects.
|
||||
"""
|
||||
|
||||
def _patch_open(self, payload: str | None):
|
||||
"""Patch builtins.open for _check_delegation_results_pending.
|
||||
|
||||
Args:
|
||||
payload: file contents to return. None → FileNotFoundError.
|
||||
"""
|
||||
if payload is None:
|
||||
return patch("builtins.open", side_effect=FileNotFoundError)
|
||||
else:
|
||||
fake_file = io.StringIO(payload)
|
||||
return patch("builtins.open", return_value=fake_file)
|
||||
|
||||
def test_no_file_means_proceed(self):
|
||||
"""No delegation results file → idle loop fires normally."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
assert not check_results_pending(str(results_file))
|
||||
with self._patch_open(None):
|
||||
assert _check_delegation_results_pending() is False
|
||||
|
||||
def test_empty_file_means_proceed(self, tmp_path):
|
||||
def test_empty_file_means_proceed(self):
|
||||
"""Empty file → no pending results → idle loop fires."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
results_file.write_text("", encoding="utf-8")
|
||||
assert not check_results_pending(str(results_file))
|
||||
with self._patch_open(""):
|
||||
assert _check_delegation_results_pending() is False
|
||||
|
||||
def test_whitespace_only_file_means_proceed(self, tmp_path):
|
||||
def test_whitespace_only_file_means_proceed(self):
|
||||
"""File with only whitespace → treated as empty → idle loop fires."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
results_file.write_text(" \n ", encoding="utf-8")
|
||||
assert not check_results_pending(str(results_file))
|
||||
with self._patch_open(" \n "):
|
||||
assert _check_delegation_results_pending() is False
|
||||
|
||||
def test_single_result_means_skip(self, tmp_path):
|
||||
def test_single_result_means_skip(self):
|
||||
"""File with one delegation result → skip idle tick."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
results_file.write_text(
|
||||
payload = (
|
||||
json.dumps({
|
||||
"status": "completed",
|
||||
"delegation_id": "del-abc",
|
||||
"summary": "Done",
|
||||
}) + "\n",
|
||||
encoding="utf-8",
|
||||
}) + "\n"
|
||||
)
|
||||
assert check_results_pending(str(results_file))
|
||||
with self._patch_open(payload):
|
||||
assert _check_delegation_results_pending() is True
|
||||
|
||||
def test_multiple_results_means_skip(self, tmp_path):
|
||||
def test_multiple_results_means_skip(self):
|
||||
"""File with multiple delegation results → skip idle tick."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
results_file.write_text(
|
||||
payload = (
|
||||
json.dumps({"status": "completed", "delegation_id": "del-1", "summary": "A"})
|
||||
+ "\n"
|
||||
+ json.dumps({"status": "failed", "delegation_id": "del-2", "summary": "B"})
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
+ "\n"
|
||||
)
|
||||
assert check_results_pending(str(results_file))
|
||||
with self._patch_open(payload):
|
||||
assert _check_delegation_results_pending() is True
|
||||
|
||||
def test_file_with_only_newline_means_proceed(self, tmp_path):
|
||||
def test_file_with_only_newline_means_proceed(self):
|
||||
"""File with only a newline character → stripped to empty → fires."""
|
||||
results_file = tmp_path / "delegation_results.jsonl"
|
||||
results_file.write_text("\n", encoding="utf-8")
|
||||
assert not check_results_pending(str(results_file))
|
||||
with self._patch_open("\n"):
|
||||
assert _check_delegation_results_pending() is False
|
||||
|
||||
Loading…
Reference in New Issue
Block a user