Merge pull request #2437 from Molecule-AI/staging
staging → main: auto-promote c901d52
This commit is contained in:
commit
76c604fb4f
@ -150,6 +150,13 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
|
||||
`import X` → `import molecule_runtime.X as X` (preserve binding)
|
||||
`from X import Y` → `from molecule_runtime.X import Y`
|
||||
`from X.sub import Y` → `from molecule_runtime.X.sub import Y`
|
||||
|
||||
Rejects `import X as Y` because the rewrite would produce
|
||||
`import molecule_runtime.X as X as Y`, a syntax error. The PR #2433
|
||||
incident shipped this exact pattern past `Python Lint & Test` (which
|
||||
runs against pre-rewrite source) but blew up the wheel-smoke gate.
|
||||
Detecting it here turns the silent build failure into a build-time
|
||||
error with a clear path: use `from X import …` or plain `import X`.
|
||||
"""
|
||||
def repl(m: re.Match) -> str:
|
||||
indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest")
|
||||
@ -163,6 +170,26 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
|
||||
# `import X.sub` — rewrite as `import molecule_runtime.X.sub` and
|
||||
# leave the trailing dot pattern intact for the rest of the line.
|
||||
return f"{indent}import molecule_runtime.{mod}{rest}"
|
||||
# Detect `import X as Y` — the regex's `rest` group captures only
|
||||
# the immediate following char (whitespace, comma, or EOL), so we
|
||||
# have to peek at the surrounding line context. The match start is
|
||||
# at the line's `import` keyword; everything after the matched
|
||||
# name on the same line is what the source author wrote.
|
||||
line_start = text.rfind("\n", 0, m.start()) + 1
|
||||
line_end = text.find("\n", m.end())
|
||||
if line_end == -1:
|
||||
line_end = len(text)
|
||||
line_after = text[m.end() - len(rest):line_end]
|
||||
# Strip comments from consideration so `import X # noqa` doesn't trip.
|
||||
line_after_no_comment = line_after.split("#", 1)[0]
|
||||
if re.search(r"^\s*as\s+\w+", line_after_no_comment):
|
||||
raise ValueError(
|
||||
f"rewrite_imports: cannot rewrite 'import {mod} as <alias>' on a "
|
||||
f"workspace module — the regex would produce "
|
||||
f"'import molecule_runtime.{mod} as {mod} as <alias>', invalid syntax. "
|
||||
f"Use 'from {mod} import …' or plain 'import {mod}' instead. "
|
||||
f"Offending line: {text[line_start:line_end]!r}"
|
||||
)
|
||||
# Plain `import X` — alias preserves the local name.
|
||||
return f"{indent}import molecule_runtime.{mod} as {mod}{rest}"
|
||||
return regex.sub(repl, text)
|
||||
|
||||
@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None:
|
||||
InboxState,
|
||||
activate as inbox_activate,
|
||||
get_state as inbox_get_state,
|
||||
set_notification_callback as inbox_set_notification_callback,
|
||||
start_poller_thread as inbox_start_poller_thread,
|
||||
)
|
||||
assert callable(inbox_activate), "inbox.activate must be callable"
|
||||
assert callable(inbox_get_state), "inbox.get_state must be callable"
|
||||
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
|
||||
assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable"
|
||||
|
||||
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
|
||||
assert callable(get_adapter), "adapters.get_adapter must be callable"
|
||||
|
||||
@ -277,12 +277,26 @@ var openTunnelCmd = func(o eicSSHOptions) *exec.Cmd {
|
||||
// to 22; with CP provisioning today the workspace runs as a native
|
||||
// process under the ubuntu user, so landing at ubuntu's shell IS the
|
||||
// terminal experience.
|
||||
//
|
||||
// ConnectTimeout=10 is the user-experience guard — without it, ssh waits
|
||||
// indefinitely for the remote sshd's banner. When the workspace EC2's
|
||||
// sshd is unresponsive (mid-restart, SG drop, AMI without ec2-instance-
|
||||
// connect installed) the canvas's xterm shows the user's typed bytes
|
||||
// echoed back by the workspace-server's *local* PTY (cooked + echo mode
|
||||
// before ssh finishes its handshake) and then closes silently when CF's
|
||||
// idle WebSocket timer fires, with no "Connection refused" or "Permission
|
||||
// denied" output ever reaching the user. Capping at 10s makes the failure
|
||||
// surface as a real ssh error message in the terminal — caught 2026-04-30
|
||||
// when hongmingwang's hermes shell hung after the heartbeat-fix redeploy
|
||||
// and a probe at /workspaces/<id>/terminal sat for 60s with the only
|
||||
// frame being the local-PTY echo of a single 'X' typed mid-handshake.
|
||||
var sshCommandCmd = func(o eicSSHOptions) *exec.Cmd {
|
||||
return exec.Command(
|
||||
"ssh",
|
||||
"-i", o.PrivateKeyPath,
|
||||
"-o", "StrictHostKeyChecking=no",
|
||||
"-o", "UserKnownHostsFile=/dev/null",
|
||||
"-o", "ConnectTimeout=10",
|
||||
"-o", "ServerAliveInterval=30",
|
||||
"-o", "ServerAliveCountMax=3",
|
||||
"-p", fmt.Sprintf("%d", o.LocalPort),
|
||||
|
||||
@ -320,6 +320,7 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
|
||||
"-i", "/tmp/k",
|
||||
"-o", "StrictHostKeyChecking=no",
|
||||
"-o", "UserKnownHostsFile=/dev/null",
|
||||
"-o", "ConnectTimeout=10",
|
||||
"-o", "ServerAliveInterval=30",
|
||||
"-o", "ServerAliveCountMax=3",
|
||||
"-p", "2222",
|
||||
@ -490,3 +491,57 @@ func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSHCommandCmd_ConnectTimeoutPresent pins the user-experience guard
|
||||
// against ssh-handshake-hang. Without ConnectTimeout, ssh waits forever
|
||||
// for the remote sshd's banner — which masquerades as a "silently dead"
|
||||
// shell to the user, because the workspace-server's local PTY is in
|
||||
// cooked + echo mode before ssh finishes its handshake, so the canvas
|
||||
// echoes the user's keystrokes back without ever reaching remote bash,
|
||||
// and Cloudflare eventually closes the WebSocket on idle (~100s) with
|
||||
// no error frame to surface what went wrong.
|
||||
//
|
||||
// Repro 2026-04-30: a 60s probe at hongmingwang's hermes /terminal
|
||||
// endpoint after the heartbeat-fix redeploy showed only the local-PTY
|
||||
// echo of a single 'X' typed mid-handshake. Workspace EC2 was up and
|
||||
// heartbeating but its sshd was unresponsive; ssh hung indefinitely.
|
||||
//
|
||||
// Behavior-based: matches the literal `-o ConnectTimeout=N` arg pair so
|
||||
// this stays pinned even if the rest of the args reorder. Does not pin
|
||||
// the exact value — operators may tune it — but does pin presence.
|
||||
func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cmd := sshCommandCmd(eicSSHOptions{
|
||||
InstanceID: "i-test",
|
||||
OSUser: "ubuntu",
|
||||
Region: "us-east-2",
|
||||
LocalPort: 2222,
|
||||
PrivateKeyPath: "/tmp/test-key",
|
||||
})
|
||||
|
||||
args := cmd.Args
|
||||
found := false
|
||||
for i, a := range args {
|
||||
if a != "-o" {
|
||||
continue
|
||||
}
|
||||
if i+1 >= len(args) {
|
||||
continue
|
||||
}
|
||||
val := args[i+1]
|
||||
if len(val) >= len("ConnectTimeout=") &&
|
||||
val[:len("ConnectTimeout=")] == "ConnectTimeout=" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("sshCommandCmd is missing `-o ConnectTimeout=N` — without it, "+
|
||||
"ssh hangs forever when the workspace EC2's sshd is unresponsive "+
|
||||
"and the canvas terminal silently dies on Cloudflare's idle WS "+
|
||||
"timeout with no error message reaching the user. See terminal.go "+
|
||||
"sshCommandCmd comment (2026-04-30 hongmingwang hermes). args=%v",
|
||||
args)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,10 @@ import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import inbox # noqa: F401 — bridge wiring lives in main(); the rewriter
|
||||
# produces `import molecule_runtime.inbox as inbox`
|
||||
# which preserves this binding for set_notification_callback.
|
||||
|
||||
from a2a_tools import (
|
||||
tool_check_task_status,
|
||||
tool_commit_memory,
|
||||
@ -130,6 +134,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
return f"Unknown tool: {name}"
|
||||
|
||||
|
||||
# --- MCP Notification bridge ---
|
||||
|
||||
# `notifications/claude/channel` matches the contract used by the
|
||||
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
|
||||
# MCP runtime treats this method as a conversation interrupt — `content`
|
||||
# becomes the agent turn, `meta` is structured metadata. Notification-
|
||||
# capable hosts (Claude Code today; any compliant client tomorrow)
|
||||
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
|
||||
# still work unchanged. See task #46 + the deprecation path documented
|
||||
# in workspace/inbox.py:set_notification_callback.
|
||||
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
|
||||
|
||||
|
||||
def _build_channel_notification(msg: dict) -> dict:
|
||||
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
|
||||
envelope expected by Claude Code's channel-bridge contract.
|
||||
|
||||
Pure function so the wire shape is unit-testable without spinning
|
||||
up an asyncio loop. The wire-up in ``main()`` just composes this
|
||||
with ``asyncio.run_coroutine_threadsafe``.
|
||||
"""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"method": _CHANNEL_NOTIFICATION_METHOD,
|
||||
"params": {
|
||||
"content": msg.get("text", ""),
|
||||
"meta": {
|
||||
"source": "molecule",
|
||||
"kind": msg.get("kind", ""),
|
||||
"peer_id": msg.get("peer_id", ""),
|
||||
"method": msg.get("method", ""),
|
||||
"activity_id": msg.get("activity_id", ""),
|
||||
"ts": msg.get("created_at", ""),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
@ -148,6 +190,34 @@ async def main(): # pragma: no cover
|
||||
writer.write(data.encode())
|
||||
await writer.drain()
|
||||
|
||||
# Wire the inbox → MCP notification bridge. Inbox poller (daemon
|
||||
# thread) calls into here when a new activity row lands; we
|
||||
# schedule the notification onto the asyncio loop and best-effort
|
||||
# fire it on the same stdout the responses go to.
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
async def _emit_notification(payload: dict) -> None:
|
||||
data = json.dumps(payload) + "\n"
|
||||
writer.write(data.encode())
|
||||
try:
|
||||
await writer.drain()
|
||||
except Exception: # noqa: BLE001
|
||||
# Closed pipe (host disconnected) shouldn't crash the
|
||||
# inbox poller; let it sit until the host reconnects.
|
||||
pass
|
||||
|
||||
def _on_inbox_message(msg: dict) -> None:
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_emit_notification(_build_channel_notification(msg)),
|
||||
loop,
|
||||
)
|
||||
except RuntimeError:
|
||||
# Loop closed during shutdown — best-effort, swallow.
|
||||
pass
|
||||
|
||||
inbox.set_notification_callback(_on_inbox_message)
|
||||
|
||||
buffer = ""
|
||||
while True:
|
||||
try:
|
||||
|
||||
@ -323,7 +323,19 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
|
||||
args=runtime_raw.get("args", []),
|
||||
required_env=runtime_raw.get("required_env", []),
|
||||
timeout=runtime_raw.get("timeout", 0),
|
||||
model=runtime_raw.get("model", ""),
|
||||
# Fall back to top-level resolved `model` (which already honors
|
||||
# MODEL_PROVIDER env override, line 277) when YAML doesn't carry
|
||||
# runtime_config.model. Without this fallback, SaaS workspaces
|
||||
# silently boot with the adapter's hard-coded default —
|
||||
# claude-code-default reads `runtime_config.model or "sonnet"`,
|
||||
# so a user who picks Opus in the canvas Config tab gets Sonnet
|
||||
# on the next CP-driven restart. Root cause: the CP user-data
|
||||
# script regenerates /configs/config.yaml at every boot with
|
||||
# only `name`, `runtime`, `a2a` keys (intentionally minimal so
|
||||
# it doesn't carry stale state), losing runtime_config.model.
|
||||
# MODEL_PROVIDER is plumbed as an env var, so picking it up via
|
||||
# the top-level resolved model keeps the selection sticky.
|
||||
model=runtime_raw.get("model") or model,
|
||||
# Deprecated fields — kept for backward compat
|
||||
auth_token_env=runtime_raw.get("auth_token_env", ""),
|
||||
auth_token_file=runtime_raw.get("auth_token_file", ""),
|
||||
|
||||
@ -53,7 +53,7 @@ import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -173,10 +173,14 @@ class InboxState:
|
||||
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
|
||||
|
||||
def record(self, message: InboxMessage) -> None:
|
||||
"""Append a message and wake any waiter.
|
||||
"""Append a message, wake any waiter, and fire the notification
|
||||
callback (if registered) for push-UX-capable hosts.
|
||||
|
||||
Skips a row whose activity_id we've already queued — defensive
|
||||
against the poller racing with the consumer + cursor save.
|
||||
against the poller racing with the consumer + cursor save. The
|
||||
dedupe short-circuits BEFORE the notification fires, so a
|
||||
notification-capable host doesn't see duplicate push events on
|
||||
backlog overlap.
|
||||
"""
|
||||
with self._lock:
|
||||
for existing in self._queue:
|
||||
@ -184,6 +188,19 @@ class InboxState:
|
||||
return
|
||||
self._queue.append(message)
|
||||
self._arrival.set()
|
||||
# Fire notification AFTER releasing the lock so the callback
|
||||
# is free to do anything (including calling back into inbox)
|
||||
# without deadlock. Best-effort: a raising callback must not
|
||||
# prevent the message from landing in the queue — observability
|
||||
# is more important than push delivery.
|
||||
cb = _NOTIFICATION_CALLBACK
|
||||
if cb is not None:
|
||||
try:
|
||||
cb(message.to_dict())
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"inbox: notification callback raised", exc_info=True
|
||||
)
|
||||
|
||||
def peek(self, limit: int = 10) -> list[InboxMessage]:
|
||||
"""Return up to ``limit`` pending messages without removing them."""
|
||||
@ -240,6 +257,35 @@ class InboxState:
|
||||
_STATE: InboxState | None = None
|
||||
|
||||
|
||||
# Notification bridge — set by the universal MCP server (a2a_mcp_server.py)
|
||||
# at startup so that new inbox arrivals can be pushed to notification-
|
||||
# capable hosts (Claude Code) as MCP `notifications/claude/channel`
|
||||
# events. Kept module-level (rather than a method on InboxState) so the
|
||||
# inbox doesn't need to know about MCP — a thin pluggable seam.
|
||||
#
|
||||
# Defaults to None: in-container runtimes that don't activate the inbox
|
||||
# also don't push notifications, and tests start clean. The wheel's
|
||||
# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge
|
||||
# tests below.
|
||||
_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None
|
||||
|
||||
|
||||
def set_notification_callback(cb: Callable[[dict], None] | None) -> None:
|
||||
"""Register (or clear) the per-message notification callback.
|
||||
|
||||
The callback receives ``InboxMessage.to_dict()`` for each new
|
||||
arrival — same shape ``inbox_peek`` returns to the agent, so a
|
||||
bridge can build its MCP notification payload without re-deriving
|
||||
fields.
|
||||
|
||||
Best-effort: a raising callback does NOT prevent the message from
|
||||
landing in the queue (see ``InboxState.record``). Pass ``None`` to
|
||||
clear (used by tests + the wheel's shutdown path).
|
||||
"""
|
||||
global _NOTIFICATION_CALLBACK
|
||||
_NOTIFICATION_CALLBACK = cb
|
||||
|
||||
|
||||
def activate(state: InboxState) -> None:
|
||||
"""Register an InboxState as the singleton this module exposes.
|
||||
|
||||
|
||||
@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED():
|
||||
assert forbidden in desc, (
|
||||
f"`attachments` description must call out {forbidden!r} as a wrong alternative"
|
||||
)
|
||||
|
||||
|
||||
# ============== Inbox → MCP notification bridge (2026-05-01) ==============
|
||||
# Notification-capable hosts (Claude Code) get push UX when a new inbound
|
||||
# message lands; pollers (wait_for_message/inbox_peek) keep working.
|
||||
# `_build_channel_notification` is the pure shape transformer — wire-up
|
||||
# in main() composes it with asyncio.run_coroutine_threadsafe.
|
||||
|
||||
|
||||
def test_build_channel_notification_method_matches_claude_contract():
|
||||
"""Method MUST be `notifications/claude/channel` exactly — that's
|
||||
what Claude Code's MCP runtime listens for as a conversation
|
||||
interrupt. Same string as the bun channel bridge sends
|
||||
(server.ts:509) so this is a drop-in replacement."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
|
||||
|
||||
def test_build_channel_notification_content_is_message_text():
|
||||
"""`content` is what becomes the agent conversation turn —
|
||||
pulled directly from the inbox message text."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello from canvas",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["params"]["content"] == "hello from canvas"
|
||||
|
||||
|
||||
def test_build_channel_notification_meta_carries_routing_fields():
|
||||
"""Meta must include kind, peer_id, method, activity_id, ts —
|
||||
fields the agent or downstream tooling needs to route a reply
|
||||
(canvas_user → /notify, peer_agent → /a2a) and to acknowledge
|
||||
via inbox_pop."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-7",
|
||||
"text": "ping",
|
||||
"peer_id": "ws-peer-uuid",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T01:23:45Z",
|
||||
})
|
||||
meta = payload["params"]["meta"]
|
||||
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "ws-peer-uuid"
|
||||
assert meta["method"] == "message/send"
|
||||
assert meta["activity_id"] == "act-7"
|
||||
assert meta["ts"] == "2026-05-01T01:23:45Z"
|
||||
|
||||
|
||||
def test_build_channel_notification_no_id_field():
|
||||
"""Notifications MUST NOT carry a JSON-RPC `id` field — that's
|
||||
what distinguishes them from requests. A notification with `id`
|
||||
would be mis-interpreted as a request and clients would wait
|
||||
for a response that never comes."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({"text": "x"})
|
||||
|
||||
assert "id" not in payload, (
|
||||
"notifications must omit `id` per JSON-RPC 2.0 spec — "
|
||||
"presence would make MCP clients await a phantom response"
|
||||
)
|
||||
|
||||
|
||||
def test_build_channel_notification_handles_missing_fields_gracefully():
|
||||
"""Some fields may be absent on edge-case messages (e.g. cursor
|
||||
bootstrapping with no created_at yet). Default to empty strings
|
||||
so the wire shape stays valid JSON instead of crashing."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({})
|
||||
|
||||
assert payload["params"]["content"] == ""
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["activity_id"] == ""
|
||||
assert meta["peer_id"] == ""
|
||||
assert meta["kind"] == ""
|
||||
|
||||
@ -81,6 +81,89 @@ def test_load_config_model_no_env(tmp_path, monkeypatch):
|
||||
assert cfg.model == "openai:gpt-4o"
|
||||
|
||||
|
||||
def test_runtime_config_model_falls_back_to_top_level(tmp_path, monkeypatch):
|
||||
"""When YAML omits runtime_config.model, fall back to the top-level
|
||||
resolved model.
|
||||
|
||||
Without this fallback, SaaS workspaces silently boot with the
|
||||
adapter's hard-coded default — claude-code-default reads
|
||||
``runtime_config.model or "sonnet"``, so even a user who picks Opus
|
||||
in the canvas Config tab gets Sonnet on the next restart. Root
|
||||
cause: the CP user-data script regenerates /configs/config.yaml
|
||||
at every boot with only ``name``, ``runtime``, ``a2a`` keys
|
||||
(intentionally minimal so it doesn't carry stale state), losing
|
||||
runtime_config.model. MODEL_PROVIDER is plumbed as an env var, so
|
||||
picking it up via the top-level resolved ``model`` keeps the
|
||||
selection sticky across restarts.
|
||||
"""
|
||||
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
|
||||
config_yaml = tmp_path / "config.yaml"
|
||||
# Top-level model set, runtime_config.model NOT set — exactly the
|
||||
# shape the CP user-data writes after restart.
|
||||
config_yaml.write_text(yaml.dump({"model": "anthropic:claude-opus-4-7"}))
|
||||
|
||||
cfg = load_config(str(tmp_path))
|
||||
assert cfg.runtime_config.model == "anthropic:claude-opus-4-7"
|
||||
|
||||
|
||||
def test_runtime_config_model_yaml_wins_over_top_level(tmp_path, monkeypatch):
|
||||
"""When YAML explicitly sets runtime_config.model, it takes precedence
|
||||
over the top-level model. Tests the fallback is only a fallback —
|
||||
not a clobber that would break workspaces with intentionally
|
||||
different runtime_config.model vs top-level model values.
|
||||
"""
|
||||
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
|
||||
config_yaml = tmp_path / "config.yaml"
|
||||
config_yaml.write_text(
|
||||
yaml.dump(
|
||||
{
|
||||
"model": "anthropic:claude-opus-4-7",
|
||||
"runtime_config": {"model": "openai:gpt-4o"},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
cfg = load_config(str(tmp_path))
|
||||
# Top-level still resolves to its own value.
|
||||
assert cfg.model == "anthropic:claude-opus-4-7"
|
||||
# runtime_config.model wins — fallback only fires when YAML is empty.
|
||||
assert cfg.runtime_config.model == "openai:gpt-4o"
|
||||
|
||||
|
||||
def test_runtime_config_model_picks_up_env_via_top_level(tmp_path, monkeypatch):
|
||||
"""End-to-end path the canvas Save+Restart relies on: user picks
|
||||
a model → workspace_secrets.MODEL_PROVIDER updated → CP user-data
|
||||
re-renders /configs/config.yaml WITHOUT runtime_config.model →
|
||||
workspace boots with MODEL_PROVIDER env var. The top-level model
|
||||
resolves from MODEL_PROVIDER (line 277), then runtime_config.model
|
||||
falls back to that. Adapter sees the user's selection.
|
||||
|
||||
This is the regression test for the canvas-side feedback
|
||||
"Provisioner doesn't read model from config.yaml and doesn't set
|
||||
MODEL env var. Without MODEL, the adapter defaults to sonnet and
|
||||
bypasses the mimo routing." (2026-04-30).
|
||||
"""
|
||||
monkeypatch.setenv("MODEL_PROVIDER", "minimax/abab7-chat-preview")
|
||||
config_yaml = tmp_path / "config.yaml"
|
||||
# CP-shaped minimal config.yaml: only name + runtime + a2a, NO
|
||||
# top-level model, NO runtime_config.model.
|
||||
config_yaml.write_text(
|
||||
yaml.dump(
|
||||
{
|
||||
"name": "Test Agent",
|
||||
"runtime": "claude-code",
|
||||
"a2a": {"port": 8000, "streaming": True},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
cfg = load_config(str(tmp_path))
|
||||
assert cfg.model == "minimax/abab7-chat-preview"
|
||||
# The adapter (claude-code-default reads runtime_config.model or "sonnet")
|
||||
# now sees the user's selected model instead of "sonnet".
|
||||
assert cfg.runtime_config.model == "minimax/abab7-chat-preview"
|
||||
|
||||
|
||||
def test_delegation_config_defaults(tmp_path):
|
||||
"""DelegationConfig nested defaults are applied."""
|
||||
config_yaml = tmp_path / "config.yaml"
|
||||
|
||||
@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
|
||||
def test_default_cursor_path_falls_back_to_default(monkeypatch):
|
||||
monkeypatch.delenv("CONFIGS_DIR", raising=False)
|
||||
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Notification callback bridge — push UX for notification-capable hosts
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# `record()` is called from the poller daemon thread when a new activity
|
||||
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
|
||||
# pushed a notification — the universal wheel registers a callback via
|
||||
# `set_notification_callback()` that fires the MCP notification. Pollers
|
||||
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_notification_callback():
|
||||
"""Each test starts with no callback registered. Notification
|
||||
state must not leak across tests — same pattern as _reset_singleton."""
|
||||
inbox.set_notification_callback(None)
|
||||
yield
|
||||
inbox.set_notification_callback(None)
|
||||
|
||||
|
||||
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
|
||||
"""When a callback is registered, record() invokes it with the
|
||||
canonical to_dict() shape — same shape inbox_peek returns to the
|
||||
agent. Callers can build MCP notification payloads from this
|
||||
without re-deriving fields."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
|
||||
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
|
||||
|
||||
assert len(received) == 1
|
||||
payload = received[0]
|
||||
assert payload["activity_id"] == "act-1"
|
||||
assert payload["text"] == "hello"
|
||||
assert payload["peer_id"] == "ws-peer"
|
||||
assert payload["kind"] == "peer_agent" # to_dict derives this
|
||||
assert payload["method"] == "message/send"
|
||||
|
||||
|
||||
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
|
||||
"""The activity_id dedupe path must short-circuit BEFORE invoking
|
||||
the callback — otherwise a notification-capable host would see
|
||||
duplicate push events on poller backlog overlap."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
state.record(_msg("act-1")) # dedupe — same id
|
||||
|
||||
assert len(received) == 1, (
|
||||
f"expected 1 callback (dedupe), got {len(received)} — "
|
||||
f"would cause duplicate Claude conversation interrupts"
|
||||
)
|
||||
|
||||
|
||||
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
|
||||
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
|
||||
serialization error on an exotic message) must NOT prevent the
|
||||
message from landing in the queue. Notification delivery is
|
||||
best-effort; inbox correctness is not negotiable."""
|
||||
|
||||
def boom(_payload):
|
||||
raise RuntimeError("simulated callback failure")
|
||||
|
||||
inbox.set_notification_callback(boom)
|
||||
|
||||
# Must not raise, must still queue the message.
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
queued = state.peek(10)
|
||||
assert len(queued) == 1
|
||||
assert queued[0].activity_id == "act-1"
|
||||
|
||||
|
||||
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
|
||||
"""When no callback is set (in-container path, or before
|
||||
activation), record() proceeds normally — no None-call crash."""
|
||||
# No set_notification_callback() in this test — autouse fixture
|
||||
# cleared any previous registration.
|
||||
state.record(_msg("act-1"))
|
||||
assert len(state.peek(10)) == 1
|
||||
|
||||
|
||||
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
|
||||
"""Re-registering the callback replaces the previous — only the
|
||||
latest callback fires. Test ensures the universal wheel can update
|
||||
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
|
||||
first: list[dict] = []
|
||||
second: list[dict] = []
|
||||
inbox.set_notification_callback(first.append)
|
||||
inbox.set_notification_callback(second.append)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
assert len(first) == 0, "first callback should be unregistered"
|
||||
assert len(second) == 1, "second callback should receive the event"
|
||||
|
||||
|
||||
def test_set_notification_callback_none_clears(state: inbox.InboxState):
|
||||
"""Setting None clears the callback — used by tests + the wheel's
|
||||
shutdown path."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
inbox.set_notification_callback(None)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
assert received == []
|
||||
|
||||
Loading…
Reference in New Issue
Block a user