PR #2756's contract — card route always mounted regardless of adapter.setup() outcome — lived inline in main.py's `# pragma: no cover` boot sequence. A future refactor that re-coupled the two would have silently bypassed PR #2756 and shipped the original "stuck booting forever" UX again, with no pytest catching it. This change extracts route assembly into workspace/boot_routes.py's build_routes(card, executor, adapter_error) and pins the contract with 6 integration tests using Starlette's TestClient: - test_card_route_serves_200_when_adapter_ready: happy path - test_card_route_serves_200_when_adapter_failed: misconfigured boot, card still 200, skill stubs survive - test_jsonrpc_returns_503_when_no_executor: full -32603 envelope with the adapter_error in error.data - test_jsonrpc_returns_503_with_generic_when_no_error_string: fallback reason for the rare case main.py reaches this branch without one - test_card_route_does_not_depend_on_executor: direct PR #2756 regression guard — both branches MUST mount the card route - test_executor_present_does_not_mount_not_configured_handler: sanity that a healthy workspace doesn't return -32603 to every request Conftest stubs extended with a2a.server.routes / request_handlers classes so the tests work under the existing a2a-mock infra (pattern matches the AgentCard/AgentSkill stubs added for PR #2765). main.py now calls build_routes; the inline if/else is gone. Same production behaviour, cleaner shape, regression-proof. Heavy a2a-sdk imports inside build_routes() are lazy (deferred to the executor-only branch) so tests that only exercise the not-configured path don't pull DefaultRequestHandler / InMemoryTaskStore. card_helpers + boot_routes registered in TOP_LEVEL_MODULES (build drift gate would have caught the missing entry on the wheel-publish smoke). All 18 related tests pass (test_boot_routes.py: 6, test_card_helpers.py: 6, test_not_configured_handler.py: 6). Closes #2761 Pairs with: PR #2756 (decouple agent-card from setup), PR #2765 (defensive isolation of enrichment + transcript) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
491 lines
21 KiB
Python
491 lines
21 KiB
Python
"""Shared fixtures and module mocks for workspace-template tests.
|
|
|
|
Mocks the a2a SDK modules before any test imports a2a_executor,
|
|
since the a2a SDK is a heavy external dependency.
|
|
"""
|
|
|
|
import sys
|
|
from types import ModuleType
|
|
from unittest.mock import MagicMock
|
|
|
|
|
|
def _make_a2a_mocks():
|
|
"""Create mock modules for the a2a SDK with real base classes."""
|
|
|
|
# a2a.server.agent_execution needs a real AgentExecutor base class
|
|
agent_execution_mod = ModuleType("a2a.server.agent_execution")
|
|
|
|
class AgentExecutor:
|
|
"""Stub base class for LangGraphA2AExecutor."""
|
|
pass
|
|
|
|
class RequestContext:
|
|
"""Stub for type hints."""
|
|
pass
|
|
|
|
agent_execution_mod.AgentExecutor = AgentExecutor
|
|
agent_execution_mod.RequestContext = RequestContext
|
|
|
|
# a2a.server.events needs a real EventQueue reference
|
|
events_mod = ModuleType("a2a.server.events")
|
|
|
|
class EventQueue:
|
|
"""Stub for type hints."""
|
|
pass
|
|
|
|
events_mod.EventQueue = EventQueue
|
|
|
|
# a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops
|
|
# for status transitions but ROUTE the terminal message back through
|
|
# event_queue.enqueue_event so legacy assertions on enqueue_event keep
|
|
# working. The wrapper preserves identity (the same Message object the
|
|
# executor passed in) so tests inspecting str(event_arg) still see the
|
|
# response text. complete()/failed() also record their last call on the
|
|
# event_queue itself (`_complete_calls`, `_failed_calls`) so the v1
|
|
# contract regression test (#262 follow-on to #2558) can pin the proper
|
|
# path was taken — raw enqueue from executor would NOT touch these.
|
|
tasks_mod = ModuleType("a2a.server.tasks")
|
|
|
|
class TaskUpdater:
|
|
"""Stub TaskUpdater — terminal helpers route through event_queue."""
|
|
|
|
def __init__(self, event_queue, task_id, context_id, *args, **kwargs):
|
|
self.event_queue = event_queue
|
|
self.task_id = task_id
|
|
self.context_id = context_id
|
|
if not hasattr(event_queue, "_complete_calls"):
|
|
event_queue._complete_calls = []
|
|
if not hasattr(event_queue, "_failed_calls"):
|
|
event_queue._failed_calls = []
|
|
|
|
async def start_work(self, message=None):
|
|
pass
|
|
|
|
async def complete(self, message=None):
|
|
self.event_queue._complete_calls.append(message)
|
|
if message is not None:
|
|
await self.event_queue.enqueue_event(message)
|
|
|
|
async def failed(self, message=None):
|
|
self.event_queue._failed_calls.append(message)
|
|
if message is not None:
|
|
await self.event_queue.enqueue_event(message)
|
|
|
|
async def add_artifact(
|
|
self, parts, artifact_id=None, name=None, metadata=None,
|
|
append=None, last_chunk=None, extensions=None
|
|
):
|
|
pass
|
|
|
|
tasks_mod.TaskUpdater = TaskUpdater
|
|
|
|
# a2a.types needs stubs for Part, Message, Role.
|
|
# v1 Part: flat protobuf with optional text/url/filename/media_type/raw/data fields.
|
|
# v1 Message: has message_id, role, parts, task_id, context_id, etc.
|
|
# Stubs preserve all kwargs so tests can assert on any field.
|
|
types_mod = ModuleType("a2a.types")
|
|
|
|
class Part:
|
|
"""Stub for A2A Part (v1: flat protobuf with optional fields)."""
|
|
def __init__(self, text=None, root=None, **kwargs):
|
|
self.text = text
|
|
# Preserve every other kwarg as an attribute so tests can
|
|
# assert on Part(url=..., filename=..., media_type=...).
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class Message:
|
|
"""Stub for A2A Message (v1: protobuf with snake_case fields)."""
|
|
def __init__(self, message_id="", role=0, parts=None, task_id="",
|
|
context_id="", **kwargs):
|
|
self.message_id = message_id
|
|
self.role = role
|
|
self.parts = list(parts) if parts is not None else []
|
|
self.task_id = task_id
|
|
self.context_id = context_id
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class _RoleEnum:
|
|
"""Stub for A2A Role enum (v1 protobuf: ROLE_UNSPECIFIED=0, ROLE_USER=1, ROLE_AGENT=2)."""
|
|
ROLE_UNSPECIFIED = 0
|
|
ROLE_USER = 1
|
|
ROLE_AGENT = 2
|
|
|
|
types_mod.Part = Part
|
|
types_mod.Message = Message
|
|
types_mod.Role = _RoleEnum
|
|
|
|
# v1 Task / TaskStatus / TaskState — used by the executor's "enqueue Task
|
|
# before any TaskStatusUpdateEvent" guard (a2a-sdk ≥ 1.0 contract). The
|
|
# stubs preserve every kwarg so tests can assert on Task(id=..., status=...).
|
|
class TaskStatus:
|
|
def __init__(self, state=None, **kwargs):
|
|
self.state = state
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class _TaskStateEnum:
|
|
TASK_STATE_SUBMITTED = 1
|
|
TASK_STATE_WORKING = 2
|
|
TASK_STATE_COMPLETED = 3
|
|
TASK_STATE_CANCELED = 4
|
|
TASK_STATE_FAILED = 5
|
|
TASK_STATE_REJECTED = 6
|
|
|
|
class Task:
|
|
def __init__(self, id="", context_id="", status=None, **kwargs):
|
|
self.id = id
|
|
self.context_id = context_id
|
|
self.status = status
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
types_mod.Task = Task
|
|
types_mod.TaskStatus = TaskStatus
|
|
types_mod.TaskState = _TaskStateEnum
|
|
|
|
# v1 AgentCard / AgentSkill / AgentCapabilities / AgentInterface — used
|
|
# by main.py's static-card construction (PR #2756) and by
|
|
# card_helpers.enrich_card_skills's swap path. Stubs preserve kwargs so
|
|
# tests can assert on card.skills[i].name etc., and let card.skills be
|
|
# reassigned in place (the production code's enrichment pattern).
|
|
class AgentSkill:
|
|
def __init__(self, id="", name="", description="", tags=None, examples=None, **kwargs):
|
|
self.id = id
|
|
self.name = name
|
|
self.description = description
|
|
self.tags = list(tags) if tags is not None else []
|
|
self.examples = list(examples) if examples is not None else []
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class AgentCapabilities:
|
|
def __init__(self, **kwargs):
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class AgentInterface:
|
|
def __init__(self, **kwargs):
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
class AgentCard:
|
|
def __init__(self, **kwargs):
|
|
self.skills = []
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
types_mod.AgentSkill = AgentSkill
|
|
types_mod.AgentCapabilities = AgentCapabilities
|
|
types_mod.AgentInterface = AgentInterface
|
|
types_mod.AgentCard = AgentCard
|
|
|
|
# a2a.server.routes — used by boot_routes.build_routes (PR #2756 chain
|
|
# / #2761) to mount /.well-known/agent-card.json. The real SDK builds
|
|
# a Starlette route that serializes the card on each request; the stub
|
|
# mirrors that behaviour with json.dumps over the card's __dict__ so
|
|
# TestClient.get("/.well-known/agent-card.json") returns the same
|
|
# shape canvas would see in production.
|
|
routes_mod = ModuleType("a2a.server.routes")
|
|
|
|
def _create_agent_card_routes(card):
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Route
|
|
|
|
async def _card_handler(_request):
|
|
# Convert the stub AgentCard into a JSON-serialisable dict.
|
|
# Real a2a.types.AgentCard is a Pydantic model with proper
|
|
# serialisation; the stub stores attrs raw, so we walk
|
|
# __dict__ and serialise nested AgentSkill objects too.
|
|
def _to_dict(obj):
|
|
if hasattr(obj, "__dict__"):
|
|
return {k: _to_dict(v) for k, v in vars(obj).items()}
|
|
if isinstance(obj, list):
|
|
return [_to_dict(x) for x in obj]
|
|
if isinstance(obj, dict):
|
|
return {k: _to_dict(v) for k, v in obj.items()}
|
|
return obj
|
|
|
|
return JSONResponse(_to_dict(card))
|
|
|
|
return [Route("/.well-known/agent-card.json", _card_handler, methods=["GET"])]
|
|
|
|
def _create_jsonrpc_routes(request_handler=None, rpc_url="/", **_kwargs):
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Route
|
|
|
|
async def _jsonrpc_handler(_request):
|
|
# Stub: real DefaultRequestHandler dispatches to the executor;
|
|
# tests that need real behaviour will use a test-side mock.
|
|
# This stub just returns a JSON-RPC envelope so the not-configured
|
|
# branch's discriminator (`error.data` containing "setup() failed")
|
|
# has something to differ from.
|
|
return JSONResponse({"jsonrpc": "2.0", "result": "stub-jsonrpc-handler"})
|
|
|
|
return [Route(rpc_url, _jsonrpc_handler, methods=["POST"])]
|
|
|
|
routes_mod.create_agent_card_routes = _create_agent_card_routes
|
|
routes_mod.create_jsonrpc_routes = _create_jsonrpc_routes
|
|
sys.modules["a2a.server.routes"] = routes_mod
|
|
|
|
# a2a.server.request_handlers — used by boot_routes' executor branch.
|
|
# DefaultRequestHandler stub takes the same kwargs as the real one;
|
|
# tests that exercise the executor path don't poke at the handler's
|
|
# internals, only that it gets mounted at "/".
|
|
rh_mod = ModuleType("a2a.server.request_handlers")
|
|
|
|
class DefaultRequestHandler:
|
|
def __init__(self, agent_executor=None, task_store=None, agent_card=None, **_kwargs):
|
|
self.agent_executor = agent_executor
|
|
self.task_store = task_store
|
|
self.agent_card = agent_card
|
|
|
|
rh_mod.DefaultRequestHandler = DefaultRequestHandler
|
|
sys.modules["a2a.server.request_handlers"] = rh_mod
|
|
|
|
# InMemoryTaskStore is exposed via a2a.server.tasks (already stubbed
|
|
# above with TaskUpdater). Add it as a no-op class.
|
|
class _InMemoryTaskStore:
|
|
def __init__(self):
|
|
pass
|
|
|
|
tasks_mod.InMemoryTaskStore = _InMemoryTaskStore
|
|
|
|
# a2a.helpers (v1: moved from a2a.utils, renamed new_agent_text_message
|
|
# → new_text_message). Mock both names — production code only calls
|
|
# new_text_message, but if any test still references the old name it
|
|
# gets the same lambda for backward compat during the rename rollout.
|
|
helpers_mod = ModuleType("a2a.helpers")
|
|
helpers_mod.new_text_message = lambda text, **kwargs: text
|
|
helpers_mod.new_agent_text_message = helpers_mod.new_text_message
|
|
|
|
# Register all module paths
|
|
a2a_mod = ModuleType("a2a")
|
|
a2a_server_mod = ModuleType("a2a.server")
|
|
|
|
sys.modules["a2a"] = a2a_mod
|
|
sys.modules["a2a.server"] = a2a_server_mod
|
|
sys.modules["a2a.server.agent_execution"] = agent_execution_mod
|
|
sys.modules["a2a.server.events"] = events_mod
|
|
sys.modules["a2a.server.tasks"] = tasks_mod
|
|
sys.modules["a2a.types"] = types_mod
|
|
sys.modules["a2a.helpers"] = helpers_mod
|
|
|
|
|
|
def _make_langchain_mocks():
|
|
"""Create mock modules for langchain_core so coordinator.py can be imported."""
|
|
langchain_core_mod = ModuleType("langchain_core")
|
|
langchain_core_tools_mod = ModuleType("langchain_core.tools")
|
|
# Make @tool a no-op decorator
|
|
langchain_core_tools_mod.tool = lambda f: f
|
|
|
|
sys.modules["langchain_core"] = langchain_core_mod
|
|
sys.modules["langchain_core.tools"] = langchain_core_tools_mod
|
|
|
|
|
|
def _make_tools_mocks():
|
|
"""Create mock modules for tools.* so adapters can be imported in tests."""
|
|
tools_mod = ModuleType("builtin_tools")
|
|
tools_mod.__path__ = [] # Make it a proper package
|
|
|
|
tools_delegation_mod = ModuleType("builtin_tools.delegation")
|
|
tools_delegation_mod.delegate_task = MagicMock()
|
|
tools_delegation_mod.delegate_task.name = "delegate_task"
|
|
tools_delegation_mod.delegate_task_async = MagicMock()
|
|
tools_delegation_mod.delegate_task_async.name = "delegate_task_async"
|
|
tools_delegation_mod.check_task_status = MagicMock()
|
|
tools_delegation_mod.check_task_status.name = "check_task_status"
|
|
|
|
tools_approval_mod = ModuleType("builtin_tools.approval")
|
|
tools_approval_mod.request_approval = MagicMock()
|
|
tools_approval_mod.request_approval.name = "request_approval"
|
|
|
|
tools_memory_mod = ModuleType("builtin_tools.memory")
|
|
tools_memory_mod.commit_memory = MagicMock()
|
|
tools_memory_mod.commit_memory.name = "commit_memory"
|
|
tools_memory_mod.recall_memory = MagicMock()
|
|
tools_memory_mod.recall_memory.name = "recall_memory"
|
|
|
|
tools_sandbox_mod = ModuleType("builtin_tools.sandbox")
|
|
tools_sandbox_mod.run_code = MagicMock()
|
|
tools_sandbox_mod.run_code.name = "run_code"
|
|
|
|
tools_a2a_mod = ModuleType("builtin_tools.a2a_tools")
|
|
tools_a2a_mod.delegate_task = MagicMock()
|
|
tools_a2a_mod.list_peers = MagicMock()
|
|
tools_a2a_mod.get_peers_summary = MagicMock()
|
|
|
|
tools_awareness_mod = ModuleType("builtin_tools.awareness_client")
|
|
tools_awareness_mod.get_awareness_config = MagicMock(return_value=None)
|
|
|
|
# tools.telemetry — provide constants and no-op callables used by a2a_executor
|
|
from contextvars import ContextVar
|
|
tools_telemetry_mod = ModuleType("builtin_tools.telemetry")
|
|
tools_telemetry_mod.GEN_AI_SYSTEM = "gen_ai.system"
|
|
tools_telemetry_mod.GEN_AI_REQUEST_MODEL = "gen_ai.request.model"
|
|
tools_telemetry_mod.GEN_AI_OPERATION_NAME = "gen_ai.operation.name"
|
|
tools_telemetry_mod.GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
|
|
tools_telemetry_mod.GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
|
|
tools_telemetry_mod.GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons"
|
|
tools_telemetry_mod.WORKSPACE_ID_ATTR = "workspace.id"
|
|
tools_telemetry_mod.A2A_TASK_ID = "a2a.task_id"
|
|
tools_telemetry_mod.A2A_SOURCE_WORKSPACE = "a2a.source_workspace_id"
|
|
tools_telemetry_mod.A2A_TARGET_WORKSPACE = "a2a.target_workspace_id"
|
|
tools_telemetry_mod.MEMORY_SCOPE = "memory.scope"
|
|
tools_telemetry_mod.MEMORY_QUERY = "memory.query"
|
|
tools_telemetry_mod._incoming_trace_context = ContextVar("otel_incoming_trace_context", default=None)
|
|
tools_telemetry_mod.get_tracer = MagicMock(return_value=MagicMock())
|
|
tools_telemetry_mod.setup_telemetry = MagicMock()
|
|
tools_telemetry_mod.make_trace_middleware = MagicMock(side_effect=lambda app: app)
|
|
tools_telemetry_mod.inject_trace_headers = MagicMock(side_effect=lambda h: h)
|
|
tools_telemetry_mod.extract_trace_context = MagicMock(return_value=None)
|
|
tools_telemetry_mod.get_current_traceparent = MagicMock(return_value=None)
|
|
tools_telemetry_mod.gen_ai_system_from_model = lambda m: m.split(":")[0] if ":" in m else "unknown"
|
|
tools_telemetry_mod.record_llm_token_usage = MagicMock()
|
|
|
|
# tools.audit — provide RBAC helpers and log_event as no-ops
|
|
tools_audit_mod = ModuleType("builtin_tools.audit")
|
|
tools_audit_mod.log_event = MagicMock(return_value="mock-trace-id")
|
|
tools_audit_mod.check_permission = MagicMock(return_value=True)
|
|
tools_audit_mod.get_workspace_roles = MagicMock(return_value=(["operator"], {}))
|
|
tools_audit_mod.ROLE_PERMISSIONS = {
|
|
"admin": {"delegate", "approve", "memory.read", "memory.write"},
|
|
"operator": {"delegate", "approve", "memory.read", "memory.write"},
|
|
"read-only": {"memory.read"},
|
|
}
|
|
|
|
# tools.hitl — lightweight stubs for the HITL tools
|
|
tools_hitl_mod = ModuleType("builtin_tools.hitl")
|
|
tools_hitl_mod.pause_task = MagicMock()
|
|
tools_hitl_mod.pause_task.name = "pause_task"
|
|
tools_hitl_mod.resume_task = MagicMock()
|
|
tools_hitl_mod.resume_task.name = "resume_task"
|
|
tools_hitl_mod.list_paused_tasks = MagicMock()
|
|
tools_hitl_mod.list_paused_tasks.name = "list_paused_tasks"
|
|
tools_hitl_mod.requires_approval = MagicMock(side_effect=lambda *a, **kw: (lambda f: f))
|
|
tools_hitl_mod.pause_registry = MagicMock()
|
|
|
|
# builtin_tools.security — load the real module so _redact_secrets is
|
|
# available to executor_helpers, a2a_tools, and any other module that
|
|
# imports from it. The module is pure-Python with no external deps.
|
|
import importlib.util as _ilu
|
|
import os as _os
|
|
_sec_path = _os.path.join(
|
|
_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))),
|
|
"builtin_tools", "security.py",
|
|
)
|
|
_sec_spec = _ilu.spec_from_file_location("builtin_tools.security", _sec_path)
|
|
_sec_mod = _ilu.module_from_spec(_sec_spec)
|
|
_sec_spec.loader.exec_module(_sec_mod)
|
|
|
|
sys.modules["builtin_tools"] = tools_mod
|
|
sys.modules["builtin_tools.delegation"] = tools_delegation_mod
|
|
sys.modules["builtin_tools.approval"] = tools_approval_mod
|
|
sys.modules["builtin_tools.memory"] = tools_memory_mod
|
|
sys.modules["builtin_tools.sandbox"] = tools_sandbox_mod
|
|
sys.modules["builtin_tools.a2a_tools"] = tools_a2a_mod
|
|
sys.modules["builtin_tools.awareness_client"] = tools_awareness_mod
|
|
sys.modules["builtin_tools.telemetry"] = tools_telemetry_mod
|
|
sys.modules["builtin_tools.audit"] = tools_audit_mod
|
|
sys.modules["builtin_tools.hitl"] = tools_hitl_mod
|
|
sys.modules["builtin_tools.security"] = _sec_mod
|
|
|
|
|
|
# Install mocks before any test collection imports a2a_executor
|
|
if "a2a" not in sys.modules:
|
|
_make_a2a_mocks()
|
|
|
|
# Note: the claude_agent_sdk stub was removed alongside
|
|
# workspace/claude_sdk_executor.py (#87 Phase 2). The executor + its
|
|
# tests now live in the claude-code template repo, where the real SDK
|
|
# IS installed via Dockerfile, so no stub is needed.
|
|
|
|
if "langchain_core" not in sys.modules:
|
|
_make_langchain_mocks()
|
|
|
|
if "builtin_tools" not in sys.modules or not hasattr(sys.modules.get("builtin_tools"), "__path__"):
|
|
_make_tools_mocks()
|
|
|
|
# Mock additional modules needed by _common_setup in base.py
|
|
if "plugins" not in sys.modules:
|
|
plugins_mod = ModuleType("plugins")
|
|
plugins_mod.load_plugins = MagicMock()
|
|
sys.modules["plugins"] = plugins_mod
|
|
|
|
if "skill_loader" not in sys.modules:
|
|
# Add workspace-template to path so real skills.loader can be imported
|
|
import importlib.util
|
|
_ws_root = str(MagicMock.__module__).replace("unittest.mock", "") # just a trick to get path
|
|
import os as _os
|
|
_ws_root = _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))
|
|
if _ws_root not in sys.path:
|
|
sys.path.insert(0, _ws_root)
|
|
# Import real skills module so LoadedSkill/SkillMetadata are available
|
|
skills_mod = ModuleType("skill_loader")
|
|
skills_mod.__path__ = [_os.path.join(_ws_root, "skill_loader")]
|
|
sys.modules["skill_loader"] = skills_mod
|
|
_spec = importlib.util.spec_from_file_location("skill_loader.loader", _os.path.join(_ws_root, "skill_loader", "loader.py"))
|
|
_loader_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules["skill_loader.loader"] = _loader_mod
|
|
_spec.loader.exec_module(_loader_mod)
|
|
|
|
if "coordinator" not in sys.modules:
|
|
# Try importing real coordinator first
|
|
try:
|
|
import coordinator as _coord # noqa: F401
|
|
except (ImportError, RuntimeError):
|
|
coordinator_mod = ModuleType("coordinator")
|
|
coordinator_mod.get_children = MagicMock()
|
|
coordinator_mod.get_parent_context = MagicMock()
|
|
coordinator_mod.build_children_description = MagicMock()
|
|
coordinator_mod.route_task_to_team = MagicMock()
|
|
coordinator_mod.route_task_to_team.name = "route_task_to_team"
|
|
sys.modules["coordinator"] = coordinator_mod
|
|
|
|
# Don't mock prompt or coordinator if they can be imported from the workspace-template dir
|
|
# test_prompt.py and test_coordinator.py need the real modules
|
|
|
|
|
|
|
|
# ─── runtime_wedge cross-test isolation ─────────────────────────────────
|
|
#
|
|
# `runtime_wedge` carries module-scope state via the `_DEFAULT` instance
|
|
# (workspace/runtime_wedge.py). Any test that calls `mark_wedged` and
|
|
# doesn't clean up leaks a sticky wedge into every later test in the
|
|
# same pytest process. Smoke tests (test_smoke_mode.py) that read
|
|
# `is_wedged()` would then fail-via-leak instead of assessing the code
|
|
# under test.
|
|
#
|
|
# Autouse fixture is scoped to the workspace/tests/ tree (this conftest
|
|
# is at workspace/tests/conftest.py), so it runs for every test that
|
|
# touches the runtime — without each test having to opt in. The
|
|
# import is deferred to fixture-call time so the fixture also works
|
|
# in environments where runtime_wedge isn't yet importable (matches
|
|
# the fail-open posture that smoke_mode + heartbeat take at the
|
|
# consumer side).
|
|
import pytest as _pytest # alias to avoid colliding with any existing `pytest` name
|
|
|
|
|
|
@_pytest.fixture(autouse=True)
|
|
def _reset_runtime_wedge_between_tests():
|
|
"""Reset the universal runtime_wedge flag before AND after every
|
|
workspace test so module-scope state can't leak across tests.
|
|
|
|
A test that calls `mark_wedged` without cleanup would otherwise
|
|
contaminate the next test's `is_wedged()` read — and because the
|
|
flag is sticky-first-write-wins, the later test couldn't even
|
|
overwrite the leaked reason. Two-sided reset (yield + cleanup)
|
|
means an early failure also doesn't poison the rest of the run.
|
|
"""
|
|
try:
|
|
from runtime_wedge import reset_for_test
|
|
except (ImportError, ModuleNotFoundError):
|
|
# No runtime_wedge installed — nothing to reset. Yield as a
|
|
# no-op so the fixture still runs the test.
|
|
yield
|
|
return
|
|
reset_for_test()
|
|
yield
|
|
reset_for_test()
|