fix(workspace): push-mode Queued returns delivery_mode="push" (not silent default "poll") #335
@ -25,10 +25,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
|
||||
async def discover(target_id: str) -> dict | None:
|
||||
|
||||
@ -26,10 +26,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
# Cache workspace ID → name mappings (populated by list_peers calls)
|
||||
_peer_names: dict[str, str] = {}
|
||||
|
||||
@ -194,7 +194,7 @@ def parse(data: Any) -> Variant:
|
||||
method,
|
||||
data.get("queue_id", "?"),
|
||||
)
|
||||
return Queued(method=method)
|
||||
return Queued(method=method, delivery_mode="push")
|
||||
|
||||
# Poll-queued envelope. Both keys must be present — the workspace
|
||||
# server sets them together; if only one is present the body is
|
||||
|
||||
@ -54,6 +54,19 @@ import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _platform_url() -> str:
|
||||
"""Return the platform URL, defaulting to host.docker.internal.
|
||||
|
||||
The workspace runtime always runs inside a Docker container, so
|
||||
``localhost`` refers to the container itself, not the platform host.
|
||||
The platform API is only reachable via ``host.docker.internal`` from
|
||||
within a workspace container, regardless of how the container was started.
|
||||
The legacy non-Docker branch is removed (it would have returned
|
||||
``localhost:8080`` which is unreachable from inside the container).
|
||||
"""
|
||||
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Constants
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
@ -79,12 +92,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]:
|
||||
workspace_id: The workspace to query.
|
||||
|
||||
Reads:
|
||||
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
|
||||
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
|
||||
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
platform_url = _platform_url()
|
||||
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest"
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
resp = await client.get(url, headers=_auth_headers())
|
||||
@ -125,12 +138,12 @@ async def _save_checkpoint(
|
||||
payload: Optional JSON-serialisable dict stored as JSONB.
|
||||
|
||||
Reads:
|
||||
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
|
||||
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
|
||||
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
platform_url = _platform_url()
|
||||
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints"
|
||||
body: dict = {
|
||||
"workflow_id": workflow_id,
|
||||
|
||||
@ -18,10 +18,11 @@ from platform_auth import auth_headers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
|
||||
@ -22,10 +22,11 @@ from policies.routing import build_team_routing_payload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
|
||||
@ -60,10 +60,10 @@ async def main(): # pragma: no cover
|
||||
config_path = os.environ.get("WORKSPACE_CONFIG_PATH", "/configs")
|
||||
# Docker-aware default — host.docker.internal resolves the platform service
|
||||
# from inside the Docker network mesh; falls back to localhost for local dev.
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Both branches now use the same default (architectural decision: the platform
|
||||
# API is only reachable via host.docker.internal from within a workspace
|
||||
# container, regardless of how the container was started).
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
awareness_config = get_awareness_config()
|
||||
|
||||
# 0. Initialise OpenTelemetry (no-op if packages not installed)
|
||||
|
||||
@ -51,6 +51,32 @@ class AdaptorSource:
|
||||
|
||||
def _load_module_from_path(module_name: str, path: Path):
|
||||
"""Import a Python file by absolute path. Returns the module or None on failure."""
|
||||
|
||||
# KI-296: Before exec'ing plugin-adapter files (which import
|
||||
# ``from plugins_registry import ...`` as a top-level name), register
|
||||
# the molecule-runtime subpackage as ``plugins_registry`` in sys.modules.
|
||||
# In the molecule-core workspace source this is already a top-level package,
|
||||
# so the setdefault is a no-op. In the PyPI-installed runtime wheel
|
||||
# (molecule-ai-workspace-runtime 0.1.129+), the package ships as
|
||||
# ``molecule_runtime.plugins_registry`` and without this shim every
|
||||
# plugin adapter would fail with ModuleNotFoundError.
|
||||
import sys as _sys
|
||||
|
||||
if "plugins_registry" not in _sys.modules:
|
||||
try:
|
||||
_mr_pr = __import__("molecule_runtime.plugins_registry", fromlist=[""])
|
||||
_sys.modules["plugins_registry"] = _mr_pr
|
||||
# Also register submodules the adapters commonly import directly.
|
||||
for _sub in ("builtins", "protocol", "raw_drop"):
|
||||
_submod = getattr(_mr_pr, _sub, None)
|
||||
if _submod is not None:
|
||||
_sys.modules[f"plugins_registry.{_sub}"] = _submod
|
||||
except ImportError:
|
||||
# molecule-runtime not installed (e.g. test environment with
|
||||
# workspace/ on sys.path directly) — skip shim; the top-level
|
||||
# workspace/plugins_registry package is already findable.
|
||||
pass
|
||||
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
if spec is None or spec.loader is None:
|
||||
return None
|
||||
|
||||
@ -105,6 +105,27 @@ _FIXTURES = {
|
||||
"status": "queued",
|
||||
"delivery_mode": "poll",
|
||||
},
|
||||
# Push-mode queue envelope: returned when a push-mode workspace is at
|
||||
# capacity. The platform queues the request and returns
|
||||
# {queued: true, message: "...", queue_id: "..."}. The ``delivery_mode``
|
||||
# field is not present in this envelope (distinguishes it from poll-mode).
|
||||
"push_queued_full": {
|
||||
"queued": True,
|
||||
"method": "message/send",
|
||||
"queue_id": "q-abc-123",
|
||||
},
|
||||
"push_queued_notify": {
|
||||
"queued": True,
|
||||
"method": "notify",
|
||||
},
|
||||
"push_queued_no_method": {
|
||||
"queued": True,
|
||||
},
|
||||
"push_queued_no_queue_id": {
|
||||
# queue_id is purely informational — parser must not raise on its absence.
|
||||
"queued": True,
|
||||
"method": "message/send",
|
||||
},
|
||||
"malformed_empty_dict": {},
|
||||
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
|
||||
"malformed_status_queued_no_delivery_mode": {
|
||||
@ -159,6 +180,62 @@ class TestQueuedVariant:
|
||||
a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
|
||||
|
||||
# --- Push-mode queue (handleA2ADispatchError → EnqueueA2A → 202 {queued: true}) ---
|
||||
|
||||
def test_push_queued_full_returns_queued_with_delivery_mode_push(self):
|
||||
# The push-mode path must set delivery_mode="push", not silently default to "poll".
|
||||
# Callers that branch on v.delivery_mode will mis-route poll-mode responses
|
||||
# as push-mode (and vice versa) if this field is wrong.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_notify(self):
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_notify"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "notify"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_missing_method_defaults_to_message_send(self):
|
||||
# Push-mode servers should always send method, but we handle absence gracefully.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_no_method"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_missing_queue_id_still_parsed(self):
|
||||
# queue_id is purely informational — its absence must not break parsing.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_no_queue_id"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_is_distinct_from_poll_queued(self):
|
||||
# Both paths return Queued, but from different wire envelopes.
|
||||
# Verify both parse correctly and are independent.
|
||||
push_v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
poll_v = a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||
assert isinstance(push_v, a2a_response.Queued)
|
||||
assert isinstance(poll_v, a2a_response.Queued)
|
||||
assert push_v.method == poll_v.method == "message/send"
|
||||
assert push_v.delivery_mode == "push"
|
||||
assert poll_v.delivery_mode == "poll"
|
||||
|
||||
def test_push_queued_logs_queue_id(self, caplog):
|
||||
with caplog.at_level(logging.INFO, logger="a2a_response"):
|
||||
a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert any("q-abc-123" in r.message for r in caplog.records)
|
||||
|
||||
def test_queued_string_yes_is_malformed_not_push_queued(self):
|
||||
# ``{"queued": "yes"}`` is not True, so it must NOT enter the push branch.
|
||||
v = a2a_response.parse({"queued": "yes"})
|
||||
assert isinstance(v, a2a_response.Malformed)
|
||||
|
||||
def test_queued_false_is_malformed(self):
|
||||
v = a2a_response.parse({"queued": False})
|
||||
assert isinstance(v, a2a_response.Malformed)
|
||||
|
||||
|
||||
class TestResultVariant:
|
||||
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
|
||||
@ -436,6 +513,10 @@ class TestRegressionGate:
|
||||
"poll_queued_full": a2a_response.Queued,
|
||||
"poll_queued_notify": a2a_response.Queued,
|
||||
"poll_queued_no_method": a2a_response.Queued,
|
||||
"push_queued_full": a2a_response.Queued,
|
||||
"push_queued_notify": a2a_response.Queued,
|
||||
"push_queued_no_method": a2a_response.Queued,
|
||||
"push_queued_no_queue_id": a2a_response.Queued,
|
||||
"malformed_empty_dict": a2a_response.Malformed,
|
||||
"malformed_unexpected_keys": a2a_response.Malformed,
|
||||
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
|
||||
|
||||
@ -325,3 +325,44 @@ def test_resolve_registry_missing_module_falls_through(monkeypatch, tmp_path: Pa
|
||||
monkeypatch.setattr(pr, "_REGISTRY_ROOT", tmp_path / "empty-registry")
|
||||
_, source = pr.resolve("demo-plugin", "test_runtime", plugin_root)
|
||||
assert source == AdaptorSource.RAW_DROP
|
||||
|
||||
|
||||
def test_load_module_from_path_registers_plugins_registry_sys_modules(tmp_path: Path):
|
||||
"""KI-296: _load_module_from_path registers ``plugins_registry`` in sys.modules
|
||||
before exec'ing the adapter, so adapter files that do
|
||||
``from plugins_registry import ...`` resolve correctly when the runtime is
|
||||
installed from the PyPI wheel (where the package ships as
|
||||
``molecule_runtime.plugins_registry`` rather than a top-level ``plugins_registry``).
|
||||
"""
|
||||
import sys as _sys
|
||||
import plugins_registry as pr
|
||||
|
||||
# Create a fake adapter that imports plugins_registry at top level.
|
||||
adapter_file = tmp_path / "fake_runtime_adapter.py"
|
||||
adapter_file.write_text(
|
||||
"from plugins_registry import InstallContext # noqa: F401\n"
|
||||
"from plugins_registry.builtins import AgentskillsAdaptor as Adaptor # noqa: F401\n"
|
||||
)
|
||||
|
||||
# Evict any pre-existing sys.modules entries for the shim keys so the
|
||||
# import inside _load_module_from_path actually runs.
|
||||
_saved = {
|
||||
k: _sys.modules.pop(k, None)
|
||||
for k in (
|
||||
"plugins_registry", "plugins_registry.builtins",
|
||||
"plugins_registry.protocol", "plugins_registry.raw_drop",
|
||||
"_plugin_adaptor.test.fake_runtime",
|
||||
)
|
||||
}
|
||||
|
||||
try:
|
||||
result = pr._load_module_from_path("_plugin_adaptor.test.fake_runtime", adapter_file)
|
||||
assert result is not None, "module should load without ImportError"
|
||||
assert hasattr(result, "Adaptor"), "AgentskillsAdaptor alias should be in namespace"
|
||||
finally:
|
||||
# Restore sys.modules state.
|
||||
for k, v in _saved.items():
|
||||
if v is None:
|
||||
_sys.modules.pop(k, None)
|
||||
else:
|
||||
_sys.modules[k] = v
|
||||
|
||||
Loading…
Reference in New Issue
Block a user