diff --git a/workspace/a2a_cli.py b/workspace/a2a_cli.py index 5ba7381c..1dac43c6 100644 --- a/workspace/a2a_cli.py +++ b/workspace/a2a_cli.py @@ -25,10 +25,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID") if not _WORKSPACE_ID_raw: raise RuntimeError("WORKSPACE_ID environment variable is required but not set") WORKSPACE_ID = _WORKSPACE_ID_raw -if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"): - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") -else: - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080") +# Platform URL: always host.docker.internal inside containers (Docker or not). +# The if/else is kept structurally for historical context; both paths now +# use the same default — the platform API is only reachable via the Docker +# network mesh from inside a workspace container regardless of runtime env. +PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") async def discover(target_id: str) -> dict | None: diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 8e499f40..06f64ce4 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -26,10 +26,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID") if not _WORKSPACE_ID_raw: raise RuntimeError("WORKSPACE_ID environment variable is required but not set") WORKSPACE_ID = _WORKSPACE_ID_raw -if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"): - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") -else: - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080") +# Platform URL: always host.docker.internal inside containers (Docker or not). +# The if/else is kept structurally for historical context; both paths now +# use the same default — the platform API is only reachable via the Docker +# network mesh from inside a workspace container regardless of runtime env. +PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") # Cache workspace ID → name mappings (populated by list_peers calls) _peer_names: dict[str, str] = {} diff --git a/workspace/a2a_response.py b/workspace/a2a_response.py index 769715fe..1741fef3 100644 --- a/workspace/a2a_response.py +++ b/workspace/a2a_response.py @@ -194,7 +194,7 @@ def parse(data: Any) -> Variant: method, data.get("queue_id", "?"), ) - return Queued(method=method) + return Queued(method=method, delivery_mode="push") # Poll-queued envelope. Both keys must be present — the workspace # server sets them together; if only one is present the body is diff --git a/workspace/builtin_tools/temporal_workflow.py b/workspace/builtin_tools/temporal_workflow.py index 8f8e6f41..9e934ca1 100644 --- a/workspace/builtin_tools/temporal_workflow.py +++ b/workspace/builtin_tools/temporal_workflow.py @@ -54,6 +54,19 @@ import httpx logger = logging.getLogger(__name__) + +def _platform_url() -> str: + """Return the platform URL, defaulting to host.docker.internal. + + The workspace runtime always runs inside a Docker container, so + ``localhost`` refers to the container itself, not the platform host. + The platform API is only reachable via ``host.docker.internal`` from + within a workspace container, regardless of how the container was started. + The legacy non-Docker branch is removed (it would have returned + ``localhost:8080`` which is unreachable from inside the container). + """ + return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") + # ───────────────────────────────────────────────────────────────────────────── # Constants # ───────────────────────────────────────────────────────────────────────────── @@ -79,12 +92,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]: workspace_id: The workspace to query. Reads: - PLATFORM_URL Platform base URL (default ``http://localhost:8080``). + PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``). """ try: from platform_auth import auth_headers as _auth_headers # type: ignore[import] - platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080") + platform_url = _platform_url() url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest" async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(url, headers=_auth_headers()) @@ -125,12 +138,12 @@ async def _save_checkpoint( payload: Optional JSON-serialisable dict stored as JSONB. Reads: - PLATFORM_URL Platform base URL (default ``http://localhost:8080``). + PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``). """ try: from platform_auth import auth_headers as _auth_headers # type: ignore[import] - platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080") + platform_url = _platform_url() url = f"{platform_url}/workspaces/{workspace_id}/checkpoints" body: dict = { "workflow_id": workflow_id, diff --git a/workspace/consolidation.py b/workspace/consolidation.py index 81e9ec88..3f860abc 100644 --- a/workspace/consolidation.py +++ b/workspace/consolidation.py @@ -18,10 +18,11 @@ from platform_auth import auth_headers logger = logging.getLogger(__name__) -if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"): - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") -else: - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080") +# Platform URL: always host.docker.internal inside containers (Docker or not). +# The if/else is kept structurally for historical context; both paths now +# use the same default — the platform API is only reachable via the Docker +# network mesh from inside a workspace container regardless of runtime env. +PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID") if not _WORKSPACE_ID_raw: raise RuntimeError("WORKSPACE_ID environment variable is required but not set") diff --git a/workspace/coordinator.py b/workspace/coordinator.py index 12d317ef..10e63e10 100644 --- a/workspace/coordinator.py +++ b/workspace/coordinator.py @@ -22,10 +22,11 @@ from policies.routing import build_team_routing_payload logger = logging.getLogger(__name__) -if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"): - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") -else: - PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080") +# Platform URL: always host.docker.internal inside containers (Docker or not). +# The if/else is kept structurally for historical context; both paths now +# use the same default — the platform API is only reachable via the Docker +# network mesh from inside a workspace container regardless of runtime env. +PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID") if not _WORKSPACE_ID_raw: raise RuntimeError("WORKSPACE_ID environment variable is required but not set") diff --git a/workspace/main.py b/workspace/main.py index 77c2d2d6..97e71137 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -60,10 +60,10 @@ async def main(): # pragma: no cover config_path = os.environ.get("WORKSPACE_CONFIG_PATH", "/configs") # Docker-aware default — host.docker.internal resolves the platform service # from inside the Docker network mesh; falls back to localhost for local dev. - if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"): - platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") - else: - platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080") + # Both branches now use the same default (architectural decision: the platform + # API is only reachable via host.docker.internal from within a workspace + # container, regardless of how the container was started). + platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") awareness_config = get_awareness_config() # 0. Initialise OpenTelemetry (no-op if packages not installed) diff --git a/workspace/plugins_registry/__init__.py b/workspace/plugins_registry/__init__.py index 363f26fe..18e517ad 100644 --- a/workspace/plugins_registry/__init__.py +++ b/workspace/plugins_registry/__init__.py @@ -51,6 +51,32 @@ class AdaptorSource: def _load_module_from_path(module_name: str, path: Path): """Import a Python file by absolute path. Returns the module or None on failure.""" + + # KI-296: Before exec'ing plugin-adapter files (which import + # ``from plugins_registry import ...`` as a top-level name), register + # the molecule-runtime subpackage as ``plugins_registry`` in sys.modules. + # In the molecule-core workspace source this is already a top-level package, + # so the setdefault is a no-op. In the PyPI-installed runtime wheel + # (molecule-ai-workspace-runtime 0.1.129+), the package ships as + # ``molecule_runtime.plugins_registry`` and without this shim every + # plugin adapter would fail with ModuleNotFoundError. + import sys as _sys + + if "plugins_registry" not in _sys.modules: + try: + _mr_pr = __import__("molecule_runtime.plugins_registry", fromlist=[""]) + _sys.modules["plugins_registry"] = _mr_pr + # Also register submodules the adapters commonly import directly. + for _sub in ("builtins", "protocol", "raw_drop"): + _submod = getattr(_mr_pr, _sub, None) + if _submod is not None: + _sys.modules[f"plugins_registry.{_sub}"] = _submod + except ImportError: + # molecule-runtime not installed (e.g. test environment with + # workspace/ on sys.path directly) — skip shim; the top-level + # workspace/plugins_registry package is already findable. + pass + spec = importlib.util.spec_from_file_location(module_name, path) if spec is None or spec.loader is None: return None diff --git a/workspace/tests/test_a2a_response.py b/workspace/tests/test_a2a_response.py index cf254b36..8e9649ae 100644 --- a/workspace/tests/test_a2a_response.py +++ b/workspace/tests/test_a2a_response.py @@ -105,6 +105,27 @@ _FIXTURES = { "status": "queued", "delivery_mode": "poll", }, + # Push-mode queue envelope: returned when a push-mode workspace is at + # capacity. The platform queues the request and returns + # {queued: true, message: "...", queue_id: "..."}. The ``delivery_mode`` + # field is not present in this envelope (distinguishes it from poll-mode). + "push_queued_full": { + "queued": True, + "method": "message/send", + "queue_id": "q-abc-123", + }, + "push_queued_notify": { + "queued": True, + "method": "notify", + }, + "push_queued_no_method": { + "queued": True, + }, + "push_queued_no_queue_id": { + # queue_id is purely informational — parser must not raise on its absence. + "queued": True, + "method": "message/send", + }, "malformed_empty_dict": {}, "malformed_unexpected_keys": {"foo": "bar", "baz": 42}, "malformed_status_queued_no_delivery_mode": { @@ -159,6 +180,62 @@ class TestQueuedVariant: a2a_response.parse(_FIXTURES["poll_queued_full"]) assert any("queued for poll-mode peer" in r.message for r in caplog.records) + # --- Push-mode queue (handleA2ADispatchError → EnqueueA2A → 202 {queued: true}) --- + + def test_push_queued_full_returns_queued_with_delivery_mode_push(self): + # The push-mode path must set delivery_mode="push", not silently default to "poll". + # Callers that branch on v.delivery_mode will mis-route poll-mode responses + # as push-mode (and vice versa) if this field is wrong. + v = a2a_response.parse(_FIXTURES["push_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "push" + + def test_push_queued_notify(self): + v = a2a_response.parse(_FIXTURES["push_queued_notify"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "notify" + assert v.delivery_mode == "push" + + def test_push_queued_missing_method_defaults_to_message_send(self): + # Push-mode servers should always send method, but we handle absence gracefully. + v = a2a_response.parse(_FIXTURES["push_queued_no_method"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "push" + + def test_push_queued_missing_queue_id_still_parsed(self): + # queue_id is purely informational — its absence must not break parsing. + v = a2a_response.parse(_FIXTURES["push_queued_no_queue_id"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "push" + + def test_push_queued_is_distinct_from_poll_queued(self): + # Both paths return Queued, but from different wire envelopes. + # Verify both parse correctly and are independent. + push_v = a2a_response.parse(_FIXTURES["push_queued_full"]) + poll_v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(push_v, a2a_response.Queued) + assert isinstance(poll_v, a2a_response.Queued) + assert push_v.method == poll_v.method == "message/send" + assert push_v.delivery_mode == "push" + assert poll_v.delivery_mode == "poll" + + def test_push_queued_logs_queue_id(self, caplog): + with caplog.at_level(logging.INFO, logger="a2a_response"): + a2a_response.parse(_FIXTURES["push_queued_full"]) + assert any("q-abc-123" in r.message for r in caplog.records) + + def test_queued_string_yes_is_malformed_not_push_queued(self): + # ``{"queued": "yes"}`` is not True, so it must NOT enter the push branch. + v = a2a_response.parse({"queued": "yes"}) + assert isinstance(v, a2a_response.Malformed) + + def test_queued_false_is_malformed(self): + v = a2a_response.parse({"queued": False}) + assert isinstance(v, a2a_response.Malformed) + class TestResultVariant: """``parse()`` extracts the JSON-RPC ``result`` envelope into @@ -436,6 +513,10 @@ class TestRegressionGate: "poll_queued_full": a2a_response.Queued, "poll_queued_notify": a2a_response.Queued, "poll_queued_no_method": a2a_response.Queued, + "push_queued_full": a2a_response.Queued, + "push_queued_notify": a2a_response.Queued, + "push_queued_no_method": a2a_response.Queued, + "push_queued_no_queue_id": a2a_response.Queued, "malformed_empty_dict": a2a_response.Malformed, "malformed_unexpected_keys": a2a_response.Malformed, "malformed_status_queued_no_delivery_mode": a2a_response.Malformed, diff --git a/workspace/tests/test_plugins_registry.py b/workspace/tests/test_plugins_registry.py index 44531eb4..ebfb7e1a 100644 --- a/workspace/tests/test_plugins_registry.py +++ b/workspace/tests/test_plugins_registry.py @@ -325,3 +325,44 @@ def test_resolve_registry_missing_module_falls_through(monkeypatch, tmp_path: Pa monkeypatch.setattr(pr, "_REGISTRY_ROOT", tmp_path / "empty-registry") _, source = pr.resolve("demo-plugin", "test_runtime", plugin_root) assert source == AdaptorSource.RAW_DROP + + +def test_load_module_from_path_registers_plugins_registry_sys_modules(tmp_path: Path): + """KI-296: _load_module_from_path registers ``plugins_registry`` in sys.modules + before exec'ing the adapter, so adapter files that do + ``from plugins_registry import ...`` resolve correctly when the runtime is + installed from the PyPI wheel (where the package ships as + ``molecule_runtime.plugins_registry`` rather than a top-level ``plugins_registry``). + """ + import sys as _sys + import plugins_registry as pr + + # Create a fake adapter that imports plugins_registry at top level. + adapter_file = tmp_path / "fake_runtime_adapter.py" + adapter_file.write_text( + "from plugins_registry import InstallContext # noqa: F401\n" + "from plugins_registry.builtins import AgentskillsAdaptor as Adaptor # noqa: F401\n" + ) + + # Evict any pre-existing sys.modules entries for the shim keys so the + # import inside _load_module_from_path actually runs. + _saved = { + k: _sys.modules.pop(k, None) + for k in ( + "plugins_registry", "plugins_registry.builtins", + "plugins_registry.protocol", "plugins_registry.raw_drop", + "_plugin_adaptor.test.fake_runtime", + ) + } + + try: + result = pr._load_module_from_path("_plugin_adaptor.test.fake_runtime", adapter_file) + assert result is not None, "module should load without ImportError" + assert hasattr(result, "Adaptor"), "AgentskillsAdaptor alias should be in namespace" + finally: + # Restore sys.modules state. + for k, v in _saved.items(): + if v is None: + _sys.modules.pop(k, None) + else: + _sys.modules[k] = v