Merge pull request #2437 from Molecule-AI/staging

staging → main: auto-promote c901d52
This commit is contained in:
github-actions[bot] 2026-05-01 03:42:34 +00:00 committed by GitHub
commit 76c604fb4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 522 additions and 4 deletions

View File

@ -150,6 +150,13 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
`import X` `import molecule_runtime.X as X` (preserve binding) `import X` `import molecule_runtime.X as X` (preserve binding)
`from X import Y` `from molecule_runtime.X import Y` `from X import Y` `from molecule_runtime.X import Y`
`from X.sub import Y` `from molecule_runtime.X.sub import Y` `from X.sub import Y` `from molecule_runtime.X.sub import Y`
Rejects `import X as Y` because the rewrite would produce
`import molecule_runtime.X as X as Y`, a syntax error. The PR #2433
incident shipped this exact pattern past `Python Lint & Test` (which
runs against pre-rewrite source) but blew up the wheel-smoke gate.
Detecting it here turns the silent build failure into a build-time
error with a clear path: use `from X import ` or plain `import X`.
""" """
def repl(m: re.Match) -> str: def repl(m: re.Match) -> str:
indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest") indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest")
@ -163,6 +170,26 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
# `import X.sub` — rewrite as `import molecule_runtime.X.sub` and # `import X.sub` — rewrite as `import molecule_runtime.X.sub` and
# leave the trailing dot pattern intact for the rest of the line. # leave the trailing dot pattern intact for the rest of the line.
return f"{indent}import molecule_runtime.{mod}{rest}" return f"{indent}import molecule_runtime.{mod}{rest}"
# Detect `import X as Y` — the regex's `rest` group captures only
# the immediate following char (whitespace, comma, or EOL), so we
# have to peek at the surrounding line context. The match start is
# at the line's `import` keyword; everything after the matched
# name on the same line is what the source author wrote.
line_start = text.rfind("\n", 0, m.start()) + 1
line_end = text.find("\n", m.end())
if line_end == -1:
line_end = len(text)
line_after = text[m.end() - len(rest):line_end]
# Strip comments from consideration so `import X # noqa` doesn't trip.
line_after_no_comment = line_after.split("#", 1)[0]
if re.search(r"^\s*as\s+\w+", line_after_no_comment):
raise ValueError(
f"rewrite_imports: cannot rewrite 'import {mod} as <alias>' on a "
f"workspace module — the regex would produce "
f"'import molecule_runtime.{mod} as {mod} as <alias>', invalid syntax. "
f"Use 'from {mod} import …' or plain 'import {mod}' instead. "
f"Offending line: {text[line_start:line_end]!r}"
)
# Plain `import X` — alias preserves the local name. # Plain `import X` — alias preserves the local name.
return f"{indent}import molecule_runtime.{mod} as {mod}{rest}" return f"{indent}import molecule_runtime.{mod} as {mod}{rest}"
return regex.sub(repl, text) return regex.sub(repl, text)

View File

@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None:
InboxState, InboxState,
activate as inbox_activate, activate as inbox_activate,
get_state as inbox_get_state, get_state as inbox_get_state,
set_notification_callback as inbox_set_notification_callback,
start_poller_thread as inbox_start_poller_thread, start_poller_thread as inbox_start_poller_thread,
) )
assert callable(inbox_activate), "inbox.activate must be callable" assert callable(inbox_activate), "inbox.activate must be callable"
assert callable(inbox_get_state), "inbox.get_state must be callable" assert callable(inbox_get_state), "inbox.get_state must be callable"
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable" assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable"
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel" assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
assert callable(get_adapter), "adapters.get_adapter must be callable" assert callable(get_adapter), "adapters.get_adapter must be callable"

View File

@ -277,12 +277,26 @@ var openTunnelCmd = func(o eicSSHOptions) *exec.Cmd {
// to 22; with CP provisioning today the workspace runs as a native // to 22; with CP provisioning today the workspace runs as a native
// process under the ubuntu user, so landing at ubuntu's shell IS the // process under the ubuntu user, so landing at ubuntu's shell IS the
// terminal experience. // terminal experience.
//
// ConnectTimeout=10 is the user-experience guard — without it, ssh waits
// indefinitely for the remote sshd's banner. When the workspace EC2's
// sshd is unresponsive (mid-restart, SG drop, AMI without ec2-instance-
// connect installed) the canvas's xterm shows the user's typed bytes
// echoed back by the workspace-server's *local* PTY (cooked + echo mode
// before ssh finishes its handshake) and then closes silently when CF's
// idle WebSocket timer fires, with no "Connection refused" or "Permission
// denied" output ever reaching the user. Capping at 10s makes the failure
// surface as a real ssh error message in the terminal — caught 2026-04-30
// when hongmingwang's hermes shell hung after the heartbeat-fix redeploy
// and a probe at /workspaces/<id>/terminal sat for 60s with the only
// frame being the local-PTY echo of a single 'X' typed mid-handshake.
var sshCommandCmd = func(o eicSSHOptions) *exec.Cmd { var sshCommandCmd = func(o eicSSHOptions) *exec.Cmd {
return exec.Command( return exec.Command(
"ssh", "ssh",
"-i", o.PrivateKeyPath, "-i", o.PrivateKeyPath,
"-o", "StrictHostKeyChecking=no", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null", "-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=10",
"-o", "ServerAliveInterval=30", "-o", "ServerAliveInterval=30",
"-o", "ServerAliveCountMax=3", "-o", "ServerAliveCountMax=3",
"-p", fmt.Sprintf("%d", o.LocalPort), "-p", fmt.Sprintf("%d", o.LocalPort),

View File

@ -320,6 +320,7 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
"-i", "/tmp/k", "-i", "/tmp/k",
"-o", "StrictHostKeyChecking=no", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null", "-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=10",
"-o", "ServerAliveInterval=30", "-o", "ServerAliveInterval=30",
"-o", "ServerAliveCountMax=3", "-o", "ServerAliveCountMax=3",
"-p", "2222", "-p", "2222",
@ -490,3 +491,57 @@ func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
} }
} }
// TestSSHCommandCmd_ConnectTimeoutPresent pins the user-experience guard
// against ssh-handshake-hang. Without ConnectTimeout, ssh waits forever
// for the remote sshd's banner — which masquerades as a "silently dead"
// shell to the user, because the workspace-server's local PTY is in
// cooked + echo mode before ssh finishes its handshake, so the canvas
// echoes the user's keystrokes back without ever reaching remote bash,
// and Cloudflare eventually closes the WebSocket on idle (~100s) with
// no error frame to surface what went wrong.
//
// Repro 2026-04-30: a 60s probe at hongmingwang's hermes /terminal
// endpoint after the heartbeat-fix redeploy showed only the local-PTY
// echo of a single 'X' typed mid-handshake. Workspace EC2 was up and
// heartbeating but its sshd was unresponsive; ssh hung indefinitely.
//
// Behavior-based: matches the literal `-o ConnectTimeout=N` arg pair so
// this stays pinned even if the rest of the args reorder. Does not pin
// the exact value — operators may tune it — but does pin presence.
func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
t.Parallel()
cmd := sshCommandCmd(eicSSHOptions{
InstanceID: "i-test",
OSUser: "ubuntu",
Region: "us-east-2",
LocalPort: 2222,
PrivateKeyPath: "/tmp/test-key",
})
args := cmd.Args
found := false
for i, a := range args {
if a != "-o" {
continue
}
if i+1 >= len(args) {
continue
}
val := args[i+1]
if len(val) >= len("ConnectTimeout=") &&
val[:len("ConnectTimeout=")] == "ConnectTimeout=" {
found = true
break
}
}
if !found {
t.Errorf("sshCommandCmd is missing `-o ConnectTimeout=N` — without it, "+
"ssh hangs forever when the workspace EC2's sshd is unresponsive "+
"and the canvas terminal silently dies on Cloudflare's idle WS "+
"timeout with no error message reaching the user. See terminal.go "+
"sshCommandCmd comment (2026-04-30 hongmingwang hermes). args=%v",
args)
}
}

View File

@ -17,6 +17,10 @@ import json
import logging import logging
import sys import sys
import inbox # noqa: F401 — bridge wiring lives in main(); the rewriter
# produces `import molecule_runtime.inbox as inbox`
# which preserves this binding for set_notification_callback.
from a2a_tools import ( from a2a_tools import (
tool_check_task_status, tool_check_task_status,
tool_commit_memory, tool_commit_memory,
@ -130,6 +134,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
return f"Unknown tool: {name}" return f"Unknown tool: {name}"
# --- MCP Notification bridge ---
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
def _build_channel_notification(msg: dict) -> dict:
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
envelope expected by Claude Code's channel-bridge contract.
Pure function so the wire shape is unit-testable without spinning
up an asyncio loop. The wire-up in ``main()`` just composes this
with ``asyncio.run_coroutine_threadsafe``.
"""
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"params": {
"content": msg.get("text", ""),
"meta": {
"source": "molecule",
"kind": msg.get("kind", ""),
"peer_id": msg.get("peer_id", ""),
"method": msg.get("method", ""),
"activity_id": msg.get("activity_id", ""),
"ts": msg.get("created_at", ""),
},
},
}
# --- MCP Server (JSON-RPC over stdio) --- # --- MCP Server (JSON-RPC over stdio) ---
async def main(): # pragma: no cover async def main(): # pragma: no cover
@ -148,6 +190,34 @@ async def main(): # pragma: no cover
writer.write(data.encode()) writer.write(data.encode())
await writer.drain() await writer.drain()
# Wire the inbox → MCP notification bridge. Inbox poller (daemon
# thread) calls into here when a new activity row lands; we
# schedule the notification onto the asyncio loop and best-effort
# fire it on the same stdout the responses go to.
loop = asyncio.get_running_loop()
async def _emit_notification(payload: dict) -> None:
data = json.dumps(payload) + "\n"
writer.write(data.encode())
try:
await writer.drain()
except Exception: # noqa: BLE001
# Closed pipe (host disconnected) shouldn't crash the
# inbox poller; let it sit until the host reconnects.
pass
def _on_inbox_message(msg: dict) -> None:
try:
asyncio.run_coroutine_threadsafe(
_emit_notification(_build_channel_notification(msg)),
loop,
)
except RuntimeError:
# Loop closed during shutdown — best-effort, swallow.
pass
inbox.set_notification_callback(_on_inbox_message)
buffer = "" buffer = ""
while True: while True:
try: try:

View File

@ -323,7 +323,19 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
args=runtime_raw.get("args", []), args=runtime_raw.get("args", []),
required_env=runtime_raw.get("required_env", []), required_env=runtime_raw.get("required_env", []),
timeout=runtime_raw.get("timeout", 0), timeout=runtime_raw.get("timeout", 0),
model=runtime_raw.get("model", ""), # Fall back to top-level resolved `model` (which already honors
# MODEL_PROVIDER env override, line 277) when YAML doesn't carry
# runtime_config.model. Without this fallback, SaaS workspaces
# silently boot with the adapter's hard-coded default —
# claude-code-default reads `runtime_config.model or "sonnet"`,
# so a user who picks Opus in the canvas Config tab gets Sonnet
# on the next CP-driven restart. Root cause: the CP user-data
# script regenerates /configs/config.yaml at every boot with
# only `name`, `runtime`, `a2a` keys (intentionally minimal so
# it doesn't carry stale state), losing runtime_config.model.
# MODEL_PROVIDER is plumbed as an env var, so picking it up via
# the top-level resolved model keeps the selection sticky.
model=runtime_raw.get("model") or model,
# Deprecated fields — kept for backward compat # Deprecated fields — kept for backward compat
auth_token_env=runtime_raw.get("auth_token_env", ""), auth_token_env=runtime_raw.get("auth_token_env", ""),
auth_token_file=runtime_raw.get("auth_token_file", ""), auth_token_file=runtime_raw.get("auth_token_file", ""),

View File

@ -53,7 +53,7 @@ import time
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any, Callable
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -173,10 +173,14 @@ class InboxState:
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc) logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
def record(self, message: InboxMessage) -> None: def record(self, message: InboxMessage) -> None:
"""Append a message and wake any waiter. """Append a message, wake any waiter, and fire the notification
callback (if registered) for push-UX-capable hosts.
Skips a row whose activity_id we've already queued — defensive Skips a row whose activity_id we've already queued — defensive
against the poller racing with the consumer + cursor save. against the poller racing with the consumer + cursor save. The
dedupe short-circuits BEFORE the notification fires, so a
notification-capable host doesn't see duplicate push events on
backlog overlap.
""" """
with self._lock: with self._lock:
for existing in self._queue: for existing in self._queue:
@ -184,6 +188,19 @@ class InboxState:
return return
self._queue.append(message) self._queue.append(message)
self._arrival.set() self._arrival.set()
# Fire notification AFTER releasing the lock so the callback
# is free to do anything (including calling back into inbox)
# without deadlock. Best-effort: a raising callback must not
# prevent the message from landing in the queue — observability
# is more important than push delivery.
cb = _NOTIFICATION_CALLBACK
if cb is not None:
try:
cb(message.to_dict())
except Exception:
logger.warning(
"inbox: notification callback raised", exc_info=True
)
def peek(self, limit: int = 10) -> list[InboxMessage]: def peek(self, limit: int = 10) -> list[InboxMessage]:
"""Return up to ``limit`` pending messages without removing them.""" """Return up to ``limit`` pending messages without removing them."""
@ -240,6 +257,35 @@ class InboxState:
_STATE: InboxState | None = None _STATE: InboxState | None = None
# Notification bridge — set by the universal MCP server (a2a_mcp_server.py)
# at startup so that new inbox arrivals can be pushed to notification-
# capable hosts (Claude Code) as MCP `notifications/claude/channel`
# events. Kept module-level (rather than a method on InboxState) so the
# inbox doesn't need to know about MCP — a thin pluggable seam.
#
# Defaults to None: in-container runtimes that don't activate the inbox
# also don't push notifications, and tests start clean. The wheel's
# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge
# tests below.
_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None
def set_notification_callback(cb: Callable[[dict], None] | None) -> None:
"""Register (or clear) the per-message notification callback.
The callback receives ``InboxMessage.to_dict()`` for each new
arrival same shape ``inbox_peek`` returns to the agent, so a
bridge can build its MCP notification payload without re-deriving
fields.
Best-effort: a raising callback does NOT prevent the message from
landing in the queue (see ``InboxState.record``). Pass ``None`` to
clear (used by tests + the wheel's shutdown path).
"""
global _NOTIFICATION_CALLBACK
_NOTIFICATION_CALLBACK = cb
def activate(state: InboxState) -> None: def activate(state: InboxState) -> None:
"""Register an InboxState as the singleton this module exposes. """Register an InboxState as the singleton this module exposes.

View File

@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED():
assert forbidden in desc, ( assert forbidden in desc, (
f"`attachments` description must call out {forbidden!r} as a wrong alternative" f"`attachments` description must call out {forbidden!r} as a wrong alternative"
) )
# ============== Inbox → MCP notification bridge (2026-05-01) ==============
# Notification-capable hosts (Claude Code) get push UX when a new inbound
# message lands; pollers (wait_for_message/inbox_peek) keep working.
# `_build_channel_notification` is the pure shape transformer — wire-up
# in main() composes it with asyncio.run_coroutine_threadsafe.
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
def test_build_channel_notification_content_is_message_text():
"""`content` is what becomes the agent conversation turn —
pulled directly from the inbox message text."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello from canvas",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["params"]["content"] == "hello from canvas"
def test_build_channel_notification_meta_carries_routing_fields():
"""Meta must include kind, peer_id, method, activity_id, ts —
fields the agent or downstream tooling needs to route a reply
(canvas_user /notify, peer_agent /a2a) and to acknowledge
via inbox_pop."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-7",
"text": "ping",
"peer_id": "ws-peer-uuid",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T01:23:45Z",
})
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "ws-peer-uuid"
assert meta["method"] == "message/send"
assert meta["activity_id"] == "act-7"
assert meta["ts"] == "2026-05-01T01:23:45Z"
def test_build_channel_notification_no_id_field():
"""Notifications MUST NOT carry a JSON-RPC `id` field — that's
what distinguishes them from requests. A notification with `id`
would be mis-interpreted as a request and clients would wait
for a response that never comes."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({"text": "x"})
assert "id" not in payload, (
"notifications must omit `id` per JSON-RPC 2.0 spec — "
"presence would make MCP clients await a phantom response"
)
def test_build_channel_notification_handles_missing_fields_gracefully():
"""Some fields may be absent on edge-case messages (e.g. cursor
bootstrapping with no created_at yet). Default to empty strings
so the wire shape stays valid JSON instead of crashing."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({})
assert payload["params"]["content"] == ""
meta = payload["params"]["meta"]
assert meta["activity_id"] == ""
assert meta["peer_id"] == ""
assert meta["kind"] == ""

View File

@ -81,6 +81,89 @@ def test_load_config_model_no_env(tmp_path, monkeypatch):
assert cfg.model == "openai:gpt-4o" assert cfg.model == "openai:gpt-4o"
def test_runtime_config_model_falls_back_to_top_level(tmp_path, monkeypatch):
"""When YAML omits runtime_config.model, fall back to the top-level
resolved model.
Without this fallback, SaaS workspaces silently boot with the
adapter's hard-coded default — claude-code-default reads
``runtime_config.model or "sonnet"``, so even a user who picks Opus
in the canvas Config tab gets Sonnet on the next restart. Root
cause: the CP user-data script regenerates /configs/config.yaml
at every boot with only ``name``, ``runtime``, ``a2a`` keys
(intentionally minimal so it doesn't carry stale state), losing
runtime_config.model. MODEL_PROVIDER is plumbed as an env var, so
picking it up via the top-level resolved ``model`` keeps the
selection sticky across restarts.
"""
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
config_yaml = tmp_path / "config.yaml"
# Top-level model set, runtime_config.model NOT set — exactly the
# shape the CP user-data writes after restart.
config_yaml.write_text(yaml.dump({"model": "anthropic:claude-opus-4-7"}))
cfg = load_config(str(tmp_path))
assert cfg.runtime_config.model == "anthropic:claude-opus-4-7"
def test_runtime_config_model_yaml_wins_over_top_level(tmp_path, monkeypatch):
"""When YAML explicitly sets runtime_config.model, it takes precedence
over the top-level model. Tests the fallback is only a fallback
not a clobber that would break workspaces with intentionally
different runtime_config.model vs top-level model values.
"""
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump(
{
"model": "anthropic:claude-opus-4-7",
"runtime_config": {"model": "openai:gpt-4o"},
}
)
)
cfg = load_config(str(tmp_path))
# Top-level still resolves to its own value.
assert cfg.model == "anthropic:claude-opus-4-7"
# runtime_config.model wins — fallback only fires when YAML is empty.
assert cfg.runtime_config.model == "openai:gpt-4o"
def test_runtime_config_model_picks_up_env_via_top_level(tmp_path, monkeypatch):
"""End-to-end path the canvas Save+Restart relies on: user picks
a model workspace_secrets.MODEL_PROVIDER updated CP user-data
re-renders /configs/config.yaml WITHOUT runtime_config.model
workspace boots with MODEL_PROVIDER env var. The top-level model
resolves from MODEL_PROVIDER (line 277), then runtime_config.model
falls back to that. Adapter sees the user's selection.
This is the regression test for the canvas-side feedback
"Provisioner doesn't read model from config.yaml and doesn't set
MODEL env var. Without MODEL, the adapter defaults to sonnet and
bypasses the mimo routing." (2026-04-30).
"""
monkeypatch.setenv("MODEL_PROVIDER", "minimax/abab7-chat-preview")
config_yaml = tmp_path / "config.yaml"
# CP-shaped minimal config.yaml: only name + runtime + a2a, NO
# top-level model, NO runtime_config.model.
config_yaml.write_text(
yaml.dump(
{
"name": "Test Agent",
"runtime": "claude-code",
"a2a": {"port": 8000, "streaming": True},
}
)
)
cfg = load_config(str(tmp_path))
assert cfg.model == "minimax/abab7-chat-preview"
# The adapter (claude-code-default reads runtime_config.model or "sonnet")
# now sees the user's selected model instead of "sonnet".
assert cfg.runtime_config.model == "minimax/abab7-chat-preview"
def test_delegation_config_defaults(tmp_path): def test_delegation_config_defaults(tmp_path):
"""DelegationConfig nested defaults are applied.""" """DelegationConfig nested defaults are applied."""
config_yaml = tmp_path / "config.yaml" config_yaml = tmp_path / "config.yaml"

View File

@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
def test_default_cursor_path_falls_back_to_default(monkeypatch): def test_default_cursor_path_falls_back_to_default(monkeypatch):
monkeypatch.delenv("CONFIGS_DIR", raising=False) monkeypatch.delenv("CONFIGS_DIR", raising=False)
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor" assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Notification callback bridge — push UX for notification-capable hosts
# ---------------------------------------------------------------------------
#
# `record()` is called from the poller daemon thread when a new activity
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
# pushed a notification — the universal wheel registers a callback via
# `set_notification_callback()` that fires the MCP notification. Pollers
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
@pytest.fixture(autouse=True)
def _reset_notification_callback():
"""Each test starts with no callback registered. Notification
state must not leak across tests same pattern as _reset_singleton."""
inbox.set_notification_callback(None)
yield
inbox.set_notification_callback(None)
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
"""When a callback is registered, record() invokes it with the
canonical to_dict() shape same shape inbox_peek returns to the
agent. Callers can build MCP notification payloads from this
without re-deriving fields."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
assert len(received) == 1
payload = received[0]
assert payload["activity_id"] == "act-1"
assert payload["text"] == "hello"
assert payload["peer_id"] == "ws-peer"
assert payload["kind"] == "peer_agent" # to_dict derives this
assert payload["method"] == "message/send"
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
"""The activity_id dedupe path must short-circuit BEFORE invoking
the callback otherwise a notification-capable host would see
duplicate push events on poller backlog overlap."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1"))
state.record(_msg("act-1")) # dedupe — same id
assert len(received) == 1, (
f"expected 1 callback (dedupe), got {len(received)}"
f"would cause duplicate Claude conversation interrupts"
)
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
serialization error on an exotic message) must NOT prevent the
message from landing in the queue. Notification delivery is
best-effort; inbox correctness is not negotiable."""
def boom(_payload):
raise RuntimeError("simulated callback failure")
inbox.set_notification_callback(boom)
# Must not raise, must still queue the message.
state.record(_msg("act-1"))
queued = state.peek(10)
assert len(queued) == 1
assert queued[0].activity_id == "act-1"
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
"""When no callback is set (in-container path, or before
activation), record() proceeds normally no None-call crash."""
# No set_notification_callback() in this test — autouse fixture
# cleared any previous registration.
state.record(_msg("act-1"))
assert len(state.peek(10)) == 1
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
"""Re-registering the callback replaces the previous — only the
latest callback fires. Test ensures the universal wheel can update
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
first: list[dict] = []
second: list[dict] = []
inbox.set_notification_callback(first.append)
inbox.set_notification_callback(second.append)
state.record(_msg("act-1"))
assert len(first) == 0, "first callback should be unregistered"
assert len(second) == 1, "second callback should receive the event"
def test_set_notification_callback_none_clears(state: inbox.InboxState):
"""Setting None clears the callback — used by tests + the wheel's
shutdown path."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
inbox.set_notification_callback(None)
state.record(_msg("act-1"))
assert received == []