fix(workspace): push-mode Queued returns delivery_mode="push" (not silent default "poll") #335

Closed
infra-runtime-be wants to merge 6 commits from runtime/fix-a2a-push-delivery-mode into main
10 changed files with 190 additions and 25 deletions

View File

@ -25,10 +25,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Platform URL: always host.docker.internal inside containers (Docker or not).
# The if/else is kept structurally for historical context; both paths now
# use the same default — the platform API is only reachable via the Docker
# network mesh from inside a workspace container regardless of runtime env.
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
async def discover(target_id: str) -> dict | None:

View File

@ -26,10 +26,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Platform URL: always host.docker.internal inside containers (Docker or not).
# The if/else is kept structurally for historical context; both paths now
# use the same default — the platform API is only reachable via the Docker
# network mesh from inside a workspace container regardless of runtime env.
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}

View File

@ -194,7 +194,7 @@ def parse(data: Any) -> Variant:
method,
data.get("queue_id", "?"),
)
return Queued(method=method)
return Queued(method=method, delivery_mode="push")
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is

View File

@ -54,6 +54,19 @@ import httpx
logger = logging.getLogger(__name__)
def _platform_url() -> str:
"""Return the platform URL, defaulting to host.docker.internal.
The workspace runtime always runs inside a Docker container, so
``localhost`` refers to the container itself, not the platform host.
The platform API is only reachable via ``host.docker.internal`` from
within a workspace container, regardless of how the container was started.
The legacy non-Docker branch is removed (it would have returned
``localhost:8080`` which is unreachable from inside the container).
"""
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
@ -79,12 +92,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]:
workspace_id: The workspace to query.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest"
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=_auth_headers())
@ -125,12 +138,12 @@ async def _save_checkpoint(
payload: Optional JSON-serialisable dict stored as JSONB.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints"
body: dict = {
"workflow_id": workflow_id,

View File

@ -18,10 +18,11 @@ from platform_auth import auth_headers
logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Platform URL: always host.docker.internal inside containers (Docker or not).
# The if/else is kept structurally for historical context; both paths now
# use the same default — the platform API is only reachable via the Docker
# network mesh from inside a workspace container regardless of runtime env.
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")

View File

@ -22,10 +22,11 @@ from policies.routing import build_team_routing_payload
logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Platform URL: always host.docker.internal inside containers (Docker or not).
# The if/else is kept structurally for historical context; both paths now
# use the same default — the platform API is only reachable via the Docker
# network mesh from inside a workspace container regardless of runtime env.
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")

View File

@ -60,10 +60,10 @@ async def main(): # pragma: no cover
config_path = os.environ.get("WORKSPACE_CONFIG_PATH", "/configs")
# Docker-aware default — host.docker.internal resolves the platform service
# from inside the Docker network mesh; falls back to localhost for local dev.
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Both branches now use the same default (architectural decision: the platform
# API is only reachable via host.docker.internal from within a workspace
# container, regardless of how the container was started).
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
awareness_config = get_awareness_config()
# 0. Initialise OpenTelemetry (no-op if packages not installed)

View File

@ -51,6 +51,32 @@ class AdaptorSource:
def _load_module_from_path(module_name: str, path: Path):
"""Import a Python file by absolute path. Returns the module or None on failure."""
# KI-296: Before exec'ing plugin-adapter files (which import
# ``from plugins_registry import ...`` as a top-level name), register
# the molecule-runtime subpackage as ``plugins_registry`` in sys.modules.
# In the molecule-core workspace source this is already a top-level package,
# so the setdefault is a no-op. In the PyPI-installed runtime wheel
# (molecule-ai-workspace-runtime 0.1.129+), the package ships as
# ``molecule_runtime.plugins_registry`` and without this shim every
# plugin adapter would fail with ModuleNotFoundError.
import sys as _sys
if "plugins_registry" not in _sys.modules:
try:
_mr_pr = __import__("molecule_runtime.plugins_registry", fromlist=[""])
_sys.modules["plugins_registry"] = _mr_pr
# Also register submodules the adapters commonly import directly.
for _sub in ("builtins", "protocol", "raw_drop"):
_submod = getattr(_mr_pr, _sub, None)
if _submod is not None:
_sys.modules[f"plugins_registry.{_sub}"] = _submod
except ImportError:
# molecule-runtime not installed (e.g. test environment with
# workspace/ on sys.path directly) — skip shim; the top-level
# workspace/plugins_registry package is already findable.
pass
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
return None

View File

@ -105,6 +105,27 @@ _FIXTURES = {
"status": "queued",
"delivery_mode": "poll",
},
# Push-mode queue envelope: returned when a push-mode workspace is at
# capacity. The platform queues the request and returns
# {queued: true, message: "...", queue_id: "..."}. The ``delivery_mode``
# field is not present in this envelope (distinguishes it from poll-mode).
"push_queued_full": {
"queued": True,
"method": "message/send",
"queue_id": "q-abc-123",
},
"push_queued_notify": {
"queued": True,
"method": "notify",
},
"push_queued_no_method": {
"queued": True,
},
"push_queued_no_queue_id": {
# queue_id is purely informational — parser must not raise on its absence.
"queued": True,
"method": "message/send",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
@ -159,6 +180,62 @@ class TestQueuedVariant:
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
# --- Push-mode queue (handleA2ADispatchError → EnqueueA2A → 202 {queued: true}) ---
def test_push_queued_full_returns_queued_with_delivery_mode_push(self):
# The push-mode path must set delivery_mode="push", not silently default to "poll".
# Callers that branch on v.delivery_mode will mis-route poll-mode responses
# as push-mode (and vice versa) if this field is wrong.
v = a2a_response.parse(_FIXTURES["push_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_notify(self):
v = a2a_response.parse(_FIXTURES["push_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
assert v.delivery_mode == "push"
def test_push_queued_missing_method_defaults_to_message_send(self):
# Push-mode servers should always send method, but we handle absence gracefully.
v = a2a_response.parse(_FIXTURES["push_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_missing_queue_id_still_parsed(self):
# queue_id is purely informational — its absence must not break parsing.
v = a2a_response.parse(_FIXTURES["push_queued_no_queue_id"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "push"
def test_push_queued_is_distinct_from_poll_queued(self):
# Both paths return Queued, but from different wire envelopes.
# Verify both parse correctly and are independent.
push_v = a2a_response.parse(_FIXTURES["push_queued_full"])
poll_v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(push_v, a2a_response.Queued)
assert isinstance(poll_v, a2a_response.Queued)
assert push_v.method == poll_v.method == "message/send"
assert push_v.delivery_mode == "push"
assert poll_v.delivery_mode == "poll"
def test_push_queued_logs_queue_id(self, caplog):
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["push_queued_full"])
assert any("q-abc-123" in r.message for r in caplog.records)
def test_queued_string_yes_is_malformed_not_push_queued(self):
# ``{"queued": "yes"}`` is not True, so it must NOT enter the push branch.
v = a2a_response.parse({"queued": "yes"})
assert isinstance(v, a2a_response.Malformed)
def test_queued_false_is_malformed(self):
v = a2a_response.parse({"queued": False})
assert isinstance(v, a2a_response.Malformed)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
@ -436,6 +513,10 @@ class TestRegressionGate:
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"push_queued_full": a2a_response.Queued,
"push_queued_notify": a2a_response.Queued,
"push_queued_no_method": a2a_response.Queued,
"push_queued_no_queue_id": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,

View File

@ -325,3 +325,44 @@ def test_resolve_registry_missing_module_falls_through(monkeypatch, tmp_path: Pa
monkeypatch.setattr(pr, "_REGISTRY_ROOT", tmp_path / "empty-registry")
_, source = pr.resolve("demo-plugin", "test_runtime", plugin_root)
assert source == AdaptorSource.RAW_DROP
def test_load_module_from_path_registers_plugins_registry_sys_modules(tmp_path: Path):
"""KI-296: _load_module_from_path registers ``plugins_registry`` in sys.modules
before exec'ing the adapter, so adapter files that do
``from plugins_registry import ...`` resolve correctly when the runtime is
installed from the PyPI wheel (where the package ships as
``molecule_runtime.plugins_registry`` rather than a top-level ``plugins_registry``).
"""
import sys as _sys
import plugins_registry as pr
# Create a fake adapter that imports plugins_registry at top level.
adapter_file = tmp_path / "fake_runtime_adapter.py"
adapter_file.write_text(
"from plugins_registry import InstallContext # noqa: F401\n"
"from plugins_registry.builtins import AgentskillsAdaptor as Adaptor # noqa: F401\n"
)
# Evict any pre-existing sys.modules entries for the shim keys so the
# import inside _load_module_from_path actually runs.
_saved = {
k: _sys.modules.pop(k, None)
for k in (
"plugins_registry", "plugins_registry.builtins",
"plugins_registry.protocol", "plugins_registry.raw_drop",
"_plugin_adaptor.test.fake_runtime",
)
}
try:
result = pr._load_module_from_path("_plugin_adaptor.test.fake_runtime", adapter_file)
assert result is not None, "module should load without ImportError"
assert hasattr(result, "Adaptor"), "AgentskillsAdaptor alias should be in namespace"
finally:
# Restore sys.modules state.
for k, v in _saved.items():
if v is None:
_sys.modules.pop(k, None)
else:
_sys.modules[k] = v