diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index 366b86c2..910ea691 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -150,6 +150,13 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str: `import X` → `import molecule_runtime.X as X` (preserve binding) `from X import Y` → `from molecule_runtime.X import Y` `from X.sub import Y` → `from molecule_runtime.X.sub import Y` + + Rejects `import X as Y` because the rewrite would produce + `import molecule_runtime.X as X as Y`, a syntax error. The PR #2433 + incident shipped this exact pattern past `Python Lint & Test` (which + runs against pre-rewrite source) but blew up the wheel-smoke gate. + Detecting it here turns the silent build failure into a build-time + error with a clear path: use `from X import …` or plain `import X`. """ def repl(m: re.Match) -> str: indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest") @@ -163,6 +170,26 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str: # `import X.sub` — rewrite as `import molecule_runtime.X.sub` and # leave the trailing dot pattern intact for the rest of the line. return f"{indent}import molecule_runtime.{mod}{rest}" + # Detect `import X as Y` — the regex's `rest` group captures only + # the immediate following char (whitespace, comma, or EOL), so we + # have to peek at the surrounding line context. The match start is + # at the line's `import` keyword; everything after the matched + # name on the same line is what the source author wrote. + line_start = text.rfind("\n", 0, m.start()) + 1 + line_end = text.find("\n", m.end()) + if line_end == -1: + line_end = len(text) + line_after = text[m.end() - len(rest):line_end] + # Strip comments from consideration so `import X # noqa` doesn't trip. + line_after_no_comment = line_after.split("#", 1)[0] + if re.search(r"^\s*as\s+\w+", line_after_no_comment): + raise ValueError( + f"rewrite_imports: cannot rewrite 'import {mod} as ' on a " + f"workspace module — the regex would produce " + f"'import molecule_runtime.{mod} as {mod} as ', invalid syntax. " + f"Use 'from {mod} import …' or plain 'import {mod}' instead. " + f"Offending line: {text[line_start:line_end]!r}" + ) # Plain `import X` — alias preserves the local name. return f"{indent}import molecule_runtime.{mod} as {mod}{rest}" return regex.sub(repl, text) diff --git a/scripts/wheel_smoke.py b/scripts/wheel_smoke.py index 04d5235c..e32e4a77 100644 --- a/scripts/wheel_smoke.py +++ b/scripts/wheel_smoke.py @@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None: InboxState, activate as inbox_activate, get_state as inbox_get_state, + set_notification_callback as inbox_set_notification_callback, start_poller_thread as inbox_start_poller_thread, ) assert callable(inbox_activate), "inbox.activate must be callable" assert callable(inbox_get_state), "inbox.get_state must be callable" assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable" + assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable" assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel" assert callable(get_adapter), "adapters.get_adapter must be callable" diff --git a/workspace-server/internal/handlers/terminal.go b/workspace-server/internal/handlers/terminal.go index 62fe74b4..434ae1f0 100644 --- a/workspace-server/internal/handlers/terminal.go +++ b/workspace-server/internal/handlers/terminal.go @@ -277,12 +277,26 @@ var openTunnelCmd = func(o eicSSHOptions) *exec.Cmd { // to 22; with CP provisioning today the workspace runs as a native // process under the ubuntu user, so landing at ubuntu's shell IS the // terminal experience. +// +// ConnectTimeout=10 is the user-experience guard — without it, ssh waits +// indefinitely for the remote sshd's banner. When the workspace EC2's +// sshd is unresponsive (mid-restart, SG drop, AMI without ec2-instance- +// connect installed) the canvas's xterm shows the user's typed bytes +// echoed back by the workspace-server's *local* PTY (cooked + echo mode +// before ssh finishes its handshake) and then closes silently when CF's +// idle WebSocket timer fires, with no "Connection refused" or "Permission +// denied" output ever reaching the user. Capping at 10s makes the failure +// surface as a real ssh error message in the terminal — caught 2026-04-30 +// when hongmingwang's hermes shell hung after the heartbeat-fix redeploy +// and a probe at /workspaces//terminal sat for 60s with the only +// frame being the local-PTY echo of a single 'X' typed mid-handshake. var sshCommandCmd = func(o eicSSHOptions) *exec.Cmd { return exec.Command( "ssh", "-i", o.PrivateKeyPath, "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", + "-o", "ConnectTimeout=10", "-o", "ServerAliveInterval=30", "-o", "ServerAliveCountMax=3", "-p", fmt.Sprintf("%d", o.LocalPort), diff --git a/workspace-server/internal/handlers/terminal_test.go b/workspace-server/internal/handlers/terminal_test.go index 4a3f29fd..34bc76d3 100644 --- a/workspace-server/internal/handlers/terminal_test.go +++ b/workspace-server/internal/handlers/terminal_test.go @@ -320,6 +320,7 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) { "-i", "/tmp/k", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", + "-o", "ConnectTimeout=10", "-o", "ServerAliveInterval=30", "-o", "ServerAliveCountMax=3", "-p", "2222", @@ -490,3 +491,57 @@ func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) { } } +// TestSSHCommandCmd_ConnectTimeoutPresent pins the user-experience guard +// against ssh-handshake-hang. Without ConnectTimeout, ssh waits forever +// for the remote sshd's banner — which masquerades as a "silently dead" +// shell to the user, because the workspace-server's local PTY is in +// cooked + echo mode before ssh finishes its handshake, so the canvas +// echoes the user's keystrokes back without ever reaching remote bash, +// and Cloudflare eventually closes the WebSocket on idle (~100s) with +// no error frame to surface what went wrong. +// +// Repro 2026-04-30: a 60s probe at hongmingwang's hermes /terminal +// endpoint after the heartbeat-fix redeploy showed only the local-PTY +// echo of a single 'X' typed mid-handshake. Workspace EC2 was up and +// heartbeating but its sshd was unresponsive; ssh hung indefinitely. +// +// Behavior-based: matches the literal `-o ConnectTimeout=N` arg pair so +// this stays pinned even if the rest of the args reorder. Does not pin +// the exact value — operators may tune it — but does pin presence. +func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) { + t.Parallel() + + cmd := sshCommandCmd(eicSSHOptions{ + InstanceID: "i-test", + OSUser: "ubuntu", + Region: "us-east-2", + LocalPort: 2222, + PrivateKeyPath: "/tmp/test-key", + }) + + args := cmd.Args + found := false + for i, a := range args { + if a != "-o" { + continue + } + if i+1 >= len(args) { + continue + } + val := args[i+1] + if len(val) >= len("ConnectTimeout=") && + val[:len("ConnectTimeout=")] == "ConnectTimeout=" { + found = true + break + } + } + if !found { + t.Errorf("sshCommandCmd is missing `-o ConnectTimeout=N` — without it, "+ + "ssh hangs forever when the workspace EC2's sshd is unresponsive "+ + "and the canvas terminal silently dies on Cloudflare's idle WS "+ + "timeout with no error message reaching the user. See terminal.go "+ + "sshCommandCmd comment (2026-04-30 hongmingwang hermes). args=%v", + args) + } +} + diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 9e488f42..09512f26 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -17,6 +17,10 @@ import json import logging import sys +import inbox # noqa: F401 — bridge wiring lives in main(); the rewriter +# produces `import molecule_runtime.inbox as inbox` +# which preserves this binding for set_notification_callback. + from a2a_tools import ( tool_check_task_status, tool_commit_memory, @@ -130,6 +134,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str: return f"Unknown tool: {name}" +# --- MCP Notification bridge --- + +# `notifications/claude/channel` matches the contract used by the +# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's +# MCP runtime treats this method as a conversation interrupt — `content` +# becomes the agent turn, `meta` is structured metadata. Notification- +# capable hosts (Claude Code today; any compliant client tomorrow) +# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`) +# still work unchanged. See task #46 + the deprecation path documented +# in workspace/inbox.py:set_notification_callback. +_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel" + + +def _build_channel_notification(msg: dict) -> dict: + """Transform an ``InboxMessage.to_dict()`` into the MCP notification + envelope expected by Claude Code's channel-bridge contract. + + Pure function so the wire shape is unit-testable without spinning + up an asyncio loop. The wire-up in ``main()`` just composes this + with ``asyncio.run_coroutine_threadsafe``. + """ + return { + "jsonrpc": "2.0", + "method": _CHANNEL_NOTIFICATION_METHOD, + "params": { + "content": msg.get("text", ""), + "meta": { + "source": "molecule", + "kind": msg.get("kind", ""), + "peer_id": msg.get("peer_id", ""), + "method": msg.get("method", ""), + "activity_id": msg.get("activity_id", ""), + "ts": msg.get("created_at", ""), + }, + }, + } + + # --- MCP Server (JSON-RPC over stdio) --- async def main(): # pragma: no cover @@ -148,6 +190,34 @@ async def main(): # pragma: no cover writer.write(data.encode()) await writer.drain() + # Wire the inbox → MCP notification bridge. Inbox poller (daemon + # thread) calls into here when a new activity row lands; we + # schedule the notification onto the asyncio loop and best-effort + # fire it on the same stdout the responses go to. + loop = asyncio.get_running_loop() + + async def _emit_notification(payload: dict) -> None: + data = json.dumps(payload) + "\n" + writer.write(data.encode()) + try: + await writer.drain() + except Exception: # noqa: BLE001 + # Closed pipe (host disconnected) shouldn't crash the + # inbox poller; let it sit until the host reconnects. + pass + + def _on_inbox_message(msg: dict) -> None: + try: + asyncio.run_coroutine_threadsafe( + _emit_notification(_build_channel_notification(msg)), + loop, + ) + except RuntimeError: + # Loop closed during shutdown — best-effort, swallow. + pass + + inbox.set_notification_callback(_on_inbox_message) + buffer = "" while True: try: diff --git a/workspace/config.py b/workspace/config.py index 0032ac85..370ada11 100644 --- a/workspace/config.py +++ b/workspace/config.py @@ -323,7 +323,19 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig: args=runtime_raw.get("args", []), required_env=runtime_raw.get("required_env", []), timeout=runtime_raw.get("timeout", 0), - model=runtime_raw.get("model", ""), + # Fall back to top-level resolved `model` (which already honors + # MODEL_PROVIDER env override, line 277) when YAML doesn't carry + # runtime_config.model. Without this fallback, SaaS workspaces + # silently boot with the adapter's hard-coded default — + # claude-code-default reads `runtime_config.model or "sonnet"`, + # so a user who picks Opus in the canvas Config tab gets Sonnet + # on the next CP-driven restart. Root cause: the CP user-data + # script regenerates /configs/config.yaml at every boot with + # only `name`, `runtime`, `a2a` keys (intentionally minimal so + # it doesn't carry stale state), losing runtime_config.model. + # MODEL_PROVIDER is plumbed as an env var, so picking it up via + # the top-level resolved model keeps the selection sticky. + model=runtime_raw.get("model") or model, # Deprecated fields — kept for backward compat auth_token_env=runtime_raw.get("auth_token_env", ""), auth_token_file=runtime_raw.get("auth_token_file", ""), diff --git a/workspace/inbox.py b/workspace/inbox.py index 3674a714..524c1eaa 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -53,7 +53,7 @@ import time from collections import deque from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Callable logger = logging.getLogger(__name__) @@ -173,10 +173,14 @@ class InboxState: logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc) def record(self, message: InboxMessage) -> None: - """Append a message and wake any waiter. + """Append a message, wake any waiter, and fire the notification + callback (if registered) for push-UX-capable hosts. Skips a row whose activity_id we've already queued — defensive - against the poller racing with the consumer + cursor save. + against the poller racing with the consumer + cursor save. The + dedupe short-circuits BEFORE the notification fires, so a + notification-capable host doesn't see duplicate push events on + backlog overlap. """ with self._lock: for existing in self._queue: @@ -184,6 +188,19 @@ class InboxState: return self._queue.append(message) self._arrival.set() + # Fire notification AFTER releasing the lock so the callback + # is free to do anything (including calling back into inbox) + # without deadlock. Best-effort: a raising callback must not + # prevent the message from landing in the queue — observability + # is more important than push delivery. + cb = _NOTIFICATION_CALLBACK + if cb is not None: + try: + cb(message.to_dict()) + except Exception: + logger.warning( + "inbox: notification callback raised", exc_info=True + ) def peek(self, limit: int = 10) -> list[InboxMessage]: """Return up to ``limit`` pending messages without removing them.""" @@ -240,6 +257,35 @@ class InboxState: _STATE: InboxState | None = None +# Notification bridge — set by the universal MCP server (a2a_mcp_server.py) +# at startup so that new inbox arrivals can be pushed to notification- +# capable hosts (Claude Code) as MCP `notifications/claude/channel` +# events. Kept module-level (rather than a method on InboxState) so the +# inbox doesn't need to know about MCP — a thin pluggable seam. +# +# Defaults to None: in-container runtimes that don't activate the inbox +# also don't push notifications, and tests start clean. The wheel's +# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge +# tests below. +_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None + + +def set_notification_callback(cb: Callable[[dict], None] | None) -> None: + """Register (or clear) the per-message notification callback. + + The callback receives ``InboxMessage.to_dict()`` for each new + arrival — same shape ``inbox_peek`` returns to the agent, so a + bridge can build its MCP notification payload without re-deriving + fields. + + Best-effort: a raising callback does NOT prevent the message from + landing in the queue (see ``InboxState.record``). Pass ``None`` to + clear (used by tests + the wheel's shutdown path). + """ + global _NOTIFICATION_CALLBACK + _NOTIFICATION_CALLBACK = cb + + def activate(state: InboxState) -> None: """Register an InboxState as the singleton this module exposes. diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 8969abcb..b08dd3a8 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED(): assert forbidden in desc, ( f"`attachments` description must call out {forbidden!r} as a wrong alternative" ) + + +# ============== Inbox → MCP notification bridge (2026-05-01) ============== +# Notification-capable hosts (Claude Code) get push UX when a new inbound +# message lands; pollers (wait_for_message/inbox_peek) keep working. +# `_build_channel_notification` is the pure shape transformer — wire-up +# in main() composes it with asyncio.run_coroutine_threadsafe. + + +def test_build_channel_notification_method_matches_claude_contract(): + """Method MUST be `notifications/claude/channel` exactly — that's + what Claude Code's MCP runtime listens for as a conversation + interrupt. Same string as the bun channel bridge sends + (server.ts:509) so this is a drop-in replacement.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + assert payload["method"] == "notifications/claude/channel" + assert payload["jsonrpc"] == "2.0" + + +def test_build_channel_notification_content_is_message_text(): + """`content` is what becomes the agent conversation turn — + pulled directly from the inbox message text.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello from canvas", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + assert payload["params"]["content"] == "hello from canvas" + + +def test_build_channel_notification_meta_carries_routing_fields(): + """Meta must include kind, peer_id, method, activity_id, ts — + fields the agent or downstream tooling needs to route a reply + (canvas_user → /notify, peer_agent → /a2a) and to acknowledge + via inbox_pop.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-7", + "text": "ping", + "peer_id": "ws-peer-uuid", + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T01:23:45Z", + }) + meta = payload["params"]["meta"] + + assert meta["source"] == "molecule" + assert meta["kind"] == "peer_agent" + assert meta["peer_id"] == "ws-peer-uuid" + assert meta["method"] == "message/send" + assert meta["activity_id"] == "act-7" + assert meta["ts"] == "2026-05-01T01:23:45Z" + + +def test_build_channel_notification_no_id_field(): + """Notifications MUST NOT carry a JSON-RPC `id` field — that's + what distinguishes them from requests. A notification with `id` + would be mis-interpreted as a request and clients would wait + for a response that never comes.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({"text": "x"}) + + assert "id" not in payload, ( + "notifications must omit `id` per JSON-RPC 2.0 spec — " + "presence would make MCP clients await a phantom response" + ) + + +def test_build_channel_notification_handles_missing_fields_gracefully(): + """Some fields may be absent on edge-case messages (e.g. cursor + bootstrapping with no created_at yet). Default to empty strings + so the wire shape stays valid JSON instead of crashing.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({}) + + assert payload["params"]["content"] == "" + meta = payload["params"]["meta"] + assert meta["activity_id"] == "" + assert meta["peer_id"] == "" + assert meta["kind"] == "" diff --git a/workspace/tests/test_config.py b/workspace/tests/test_config.py index 2a805b3f..c87198ba 100644 --- a/workspace/tests/test_config.py +++ b/workspace/tests/test_config.py @@ -81,6 +81,89 @@ def test_load_config_model_no_env(tmp_path, monkeypatch): assert cfg.model == "openai:gpt-4o" +def test_runtime_config_model_falls_back_to_top_level(tmp_path, monkeypatch): + """When YAML omits runtime_config.model, fall back to the top-level + resolved model. + + Without this fallback, SaaS workspaces silently boot with the + adapter's hard-coded default — claude-code-default reads + ``runtime_config.model or "sonnet"``, so even a user who picks Opus + in the canvas Config tab gets Sonnet on the next restart. Root + cause: the CP user-data script regenerates /configs/config.yaml + at every boot with only ``name``, ``runtime``, ``a2a`` keys + (intentionally minimal so it doesn't carry stale state), losing + runtime_config.model. MODEL_PROVIDER is plumbed as an env var, so + picking it up via the top-level resolved ``model`` keeps the + selection sticky across restarts. + """ + monkeypatch.delenv("MODEL_PROVIDER", raising=False) + config_yaml = tmp_path / "config.yaml" + # Top-level model set, runtime_config.model NOT set — exactly the + # shape the CP user-data writes after restart. + config_yaml.write_text(yaml.dump({"model": "anthropic:claude-opus-4-7"})) + + cfg = load_config(str(tmp_path)) + assert cfg.runtime_config.model == "anthropic:claude-opus-4-7" + + +def test_runtime_config_model_yaml_wins_over_top_level(tmp_path, monkeypatch): + """When YAML explicitly sets runtime_config.model, it takes precedence + over the top-level model. Tests the fallback is only a fallback — + not a clobber that would break workspaces with intentionally + different runtime_config.model vs top-level model values. + """ + monkeypatch.delenv("MODEL_PROVIDER", raising=False) + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump( + { + "model": "anthropic:claude-opus-4-7", + "runtime_config": {"model": "openai:gpt-4o"}, + } + ) + ) + + cfg = load_config(str(tmp_path)) + # Top-level still resolves to its own value. + assert cfg.model == "anthropic:claude-opus-4-7" + # runtime_config.model wins — fallback only fires when YAML is empty. + assert cfg.runtime_config.model == "openai:gpt-4o" + + +def test_runtime_config_model_picks_up_env_via_top_level(tmp_path, monkeypatch): + """End-to-end path the canvas Save+Restart relies on: user picks + a model → workspace_secrets.MODEL_PROVIDER updated → CP user-data + re-renders /configs/config.yaml WITHOUT runtime_config.model → + workspace boots with MODEL_PROVIDER env var. The top-level model + resolves from MODEL_PROVIDER (line 277), then runtime_config.model + falls back to that. Adapter sees the user's selection. + + This is the regression test for the canvas-side feedback + "Provisioner doesn't read model from config.yaml and doesn't set + MODEL env var. Without MODEL, the adapter defaults to sonnet and + bypasses the mimo routing." (2026-04-30). + """ + monkeypatch.setenv("MODEL_PROVIDER", "minimax/abab7-chat-preview") + config_yaml = tmp_path / "config.yaml" + # CP-shaped minimal config.yaml: only name + runtime + a2a, NO + # top-level model, NO runtime_config.model. + config_yaml.write_text( + yaml.dump( + { + "name": "Test Agent", + "runtime": "claude-code", + "a2a": {"port": 8000, "streaming": True}, + } + ) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.model == "minimax/abab7-chat-preview" + # The adapter (claude-code-default reads runtime_config.model or "sonnet") + # now sees the user's selected model instead of "sonnet". + assert cfg.runtime_config.model == "minimax/abab7-chat-preview" + + def test_delegation_config_defaults(tmp_path): """DelegationConfig nested defaults are applied.""" config_yaml = tmp_path / "config.yaml" diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 03bcf8a4..a63297ae 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): def test_default_cursor_path_falls_back_to_default(monkeypatch): monkeypatch.delenv("CONFIGS_DIR", raising=False) assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor" + + +# --------------------------------------------------------------------------- +# Notification callback bridge — push UX for notification-capable hosts +# --------------------------------------------------------------------------- +# +# `record()` is called from the poller daemon thread when a new activity +# row arrives. Notification-capable MCP hosts (Claude Code) want to be +# pushed a notification — the universal wheel registers a callback via +# `set_notification_callback()` that fires the MCP notification. Pollers +# (`wait_for_message`/`inbox_peek`) keep working unchanged. + + +@pytest.fixture(autouse=True) +def _reset_notification_callback(): + """Each test starts with no callback registered. Notification + state must not leak across tests — same pattern as _reset_singleton.""" + inbox.set_notification_callback(None) + yield + inbox.set_notification_callback(None) + + +def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState): + """When a callback is registered, record() invokes it with the + canonical to_dict() shape — same shape inbox_peek returns to the + agent. Callers can build MCP notification payloads from this + without re-deriving fields.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + + state.record(_msg("act-1", peer_id="ws-peer", text="hello")) + + assert len(received) == 1 + payload = received[0] + assert payload["activity_id"] == "act-1" + assert payload["text"] == "hello" + assert payload["peer_id"] == "ws-peer" + assert payload["kind"] == "peer_agent" # to_dict derives this + assert payload["method"] == "message/send" + + +def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState): + """The activity_id dedupe path must short-circuit BEFORE invoking + the callback — otherwise a notification-capable host would see + duplicate push events on poller backlog overlap.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + + state.record(_msg("act-1")) + state.record(_msg("act-1")) # dedupe — same id + + assert len(received) == 1, ( + f"expected 1 callback (dedupe), got {len(received)} — " + f"would cause duplicate Claude conversation interrupts" + ) + + +def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState): + """A raising callback (e.g. asyncio loop closed mid-shutdown, + serialization error on an exotic message) must NOT prevent the + message from landing in the queue. Notification delivery is + best-effort; inbox correctness is not negotiable.""" + + def boom(_payload): + raise RuntimeError("simulated callback failure") + + inbox.set_notification_callback(boom) + + # Must not raise, must still queue the message. + state.record(_msg("act-1")) + + queued = state.peek(10) + assert len(queued) == 1 + assert queued[0].activity_id == "act-1" + + +def test_record_no_callback_registered_is_no_op(state: inbox.InboxState): + """When no callback is set (in-container path, or before + activation), record() proceeds normally — no None-call crash.""" + # No set_notification_callback() in this test — autouse fixture + # cleared any previous registration. + state.record(_msg("act-1")) + assert len(state.peek(10)) == 1 + + +def test_set_notification_callback_replaces_previous(state: inbox.InboxState): + """Re-registering the callback replaces the previous — only the + latest callback fires. Test ensures the universal wheel can update + the bridge if its asyncio loop is replaced (e.g. graceful restart).""" + first: list[dict] = [] + second: list[dict] = [] + inbox.set_notification_callback(first.append) + inbox.set_notification_callback(second.append) + + state.record(_msg("act-1")) + + assert len(first) == 0, "first callback should be unregistered" + assert len(second) == 1, "second callback should receive the event" + + +def test_set_notification_callback_none_clears(state: inbox.InboxState): + """Setting None clears the callback — used by tests + the wheel's + shutdown path.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + inbox.set_notification_callback(None) + + state.record(_msg("act-1")) + + assert received == []