fix(shared_runtime): push heartbeat on CLEAR in set_current_task() (#37)
Fixes #1372 — phantom busy: canvas showed workspace as active for up to 30s after task completion because set_current_task("") returned early without posting the updated heartbeat. Before: clearing only updated the heartbeat object; the next 30s scheduled heartbeat cycle propagated the clear. Quick tasks would leave a phantom-busy indicator. After: both SET and CLEAR push immediately to /registry/heartbeat. active_tasks=0 on clear, active_tasks=1 on set. Heartbeat object update and HTTP post are now unconditional. Tests: 5 new cases covering SET/CLEAR HTTP body, error resilience, None heartbeat, and missing env vars. Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
This commit is contained in:
parent
1e545ed6ba
commit
dcb6edd1a1
136
tests/test_shared_runtime.py
Normal file
136
tests/test_shared_runtime.py
Normal file
@ -0,0 +1,136 @@
|
||||
"""Regression tests for set_current_task() phantom-busy fix (issue #1372)."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class MockHeartbeat:
|
||||
"""Minimal heartbeat object matching the shape used by adapters."""
|
||||
|
||||
def __init__(self):
|
||||
self.current_task = ""
|
||||
self.active_tasks = 0
|
||||
|
||||
|
||||
def _run(coro):
|
||||
"""Run an async coroutine synchronously (no pytest-asyncio available)."""
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
class TestSetCurrentTask:
|
||||
"""set_current_task() must push heartbeat on both SET and CLEAR."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _env(self, monkeypatch):
|
||||
monkeypatch.setenv("WORKSPACE_ID", "test-workspace-001")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.platform:8080")
|
||||
|
||||
@pytest.fixture
|
||||
def heartbeat(self):
|
||||
return MockHeartbeat()
|
||||
|
||||
def test_set_pushes_with_active_tasks_1(self, heartbeat):
|
||||
"""Setting a task posts active_tasks=1 immediately."""
|
||||
from molecule_runtime.adapters.shared_runtime import set_current_task
|
||||
|
||||
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||
mock_client = AsyncMock()
|
||||
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||
mock_response = AsyncMock()
|
||||
mock_response.status_code = 200
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
_run(set_current_task(heartbeat, "Summarising docs"))
|
||||
|
||||
mock_client.post.assert_called_once()
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args.kwargs["json"]["active_tasks"] == 1
|
||||
assert call_args.kwargs["json"]["current_task"] == "Summarising docs"
|
||||
|
||||
assert heartbeat.active_tasks == 1
|
||||
assert heartbeat.current_task == "Summarising docs"
|
||||
|
||||
def test_clear_pushes_with_active_tasks_0(self, heartbeat):
|
||||
"""Clearing a task posts active_tasks=0 immediately (phantom-busy fix)."""
|
||||
from molecule_runtime.adapters.shared_runtime import set_current_task
|
||||
|
||||
heartbeat.current_task = "Previous task"
|
||||
heartbeat.active_tasks = 1
|
||||
|
||||
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||
mock_client = AsyncMock()
|
||||
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||
mock_response = AsyncMock()
|
||||
mock_response.status_code = 200
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
_run(set_current_task(heartbeat, ""))
|
||||
|
||||
mock_client.post.assert_called_once()
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args.kwargs["json"]["active_tasks"] == 0
|
||||
assert call_args.kwargs["json"]["current_task"] == ""
|
||||
|
||||
assert heartbeat.active_tasks == 0
|
||||
assert heartbeat.current_task == ""
|
||||
|
||||
def test_clear_updates_heartbeat_object_even_if_post_fails(self, heartbeat):
|
||||
"""Heartbeat object is updated even when the HTTP POST raises."""
|
||||
from molecule_runtime.adapters.shared_runtime import set_current_task
|
||||
|
||||
heartbeat.current_task = "Long running task"
|
||||
heartbeat.active_tasks = 1
|
||||
|
||||
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||
mock_client = AsyncMock()
|
||||
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||
mock_client.post.side_effect = Exception("network error")
|
||||
|
||||
_run(set_current_task(heartbeat, ""))
|
||||
|
||||
# Heartbeat object must still be updated even if post fails
|
||||
assert heartbeat.active_tasks == 0
|
||||
assert heartbeat.current_task == ""
|
||||
|
||||
def test_no_env_vars_skips_post(self, monkeypatch):
|
||||
"""When WORKSPACE_ID or PLATFORM_URL is absent, post is skipped."""
|
||||
from molecule_runtime.adapters.shared_runtime import set_current_task
|
||||
|
||||
heartbeat = MockHeartbeat()
|
||||
monkeypatch.delenv("WORKSPACE_ID", raising=False)
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.platform:8080")
|
||||
|
||||
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||
mock_client = AsyncMock()
|
||||
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||
|
||||
_run(set_current_task(heartbeat, "Any task"))
|
||||
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
def test_none_heartbeat_skips_post(self, monkeypatch):
|
||||
"""Passing None as heartbeat object skips post (no-op, no crash).
|
||||
|
||||
When heartbeat is None the function must not raise even if env vars
|
||||
are present — None is valid when heartbeat isn't wired yet.
|
||||
"""
|
||||
from molecule_runtime.adapters.shared_runtime import set_current_task
|
||||
|
||||
# Ensure no env vars so httpx is definitely not called
|
||||
monkeypatch.delenv("WORKSPACE_ID", raising=False)
|
||||
monkeypatch.delenv("PLATFORM_URL", raising=False)
|
||||
|
||||
with patch("httpx.AsyncClient") as mock_client_cls:
|
||||
mock_client = AsyncMock()
|
||||
mock_client_cls.return_value.__aenter__.return_value = mock_client
|
||||
|
||||
# Must not raise — None is valid when heartbeat isn't wired yet
|
||||
_run(set_current_task(None, "Task"))
|
||||
|
||||
mock_client.post.assert_not_called()
|
||||
Loading…
Reference in New Issue
Block a user