Merge pull request #2437 from Molecule-AI/staging

staging → main: auto-promote c901d52
This commit is contained in:
github-actions[bot] 2026-05-01 03:42:34 +00:00 committed by GitHub
commit 76c604fb4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 522 additions and 4 deletions

View File

@ -150,6 +150,13 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
`import X` `import molecule_runtime.X as X` (preserve binding)
`from X import Y` `from molecule_runtime.X import Y`
`from X.sub import Y` `from molecule_runtime.X.sub import Y`
Rejects `import X as Y` because the rewrite would produce
`import molecule_runtime.X as X as Y`, a syntax error. The PR #2433
incident shipped this exact pattern past `Python Lint & Test` (which
runs against pre-rewrite source) but blew up the wheel-smoke gate.
Detecting it here turns the silent build failure into a build-time
error with a clear path: use `from X import ` or plain `import X`.
"""
def repl(m: re.Match) -> str:
indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest")
@ -163,6 +170,26 @@ def rewrite_imports(text: str, regex: re.Pattern) -> str:
# `import X.sub` — rewrite as `import molecule_runtime.X.sub` and
# leave the trailing dot pattern intact for the rest of the line.
return f"{indent}import molecule_runtime.{mod}{rest}"
# Detect `import X as Y` — the regex's `rest` group captures only
# the immediate following char (whitespace, comma, or EOL), so we
# have to peek at the surrounding line context. The match start is
# at the line's `import` keyword; everything after the matched
# name on the same line is what the source author wrote.
line_start = text.rfind("\n", 0, m.start()) + 1
line_end = text.find("\n", m.end())
if line_end == -1:
line_end = len(text)
line_after = text[m.end() - len(rest):line_end]
# Strip comments from consideration so `import X # noqa` doesn't trip.
line_after_no_comment = line_after.split("#", 1)[0]
if re.search(r"^\s*as\s+\w+", line_after_no_comment):
raise ValueError(
f"rewrite_imports: cannot rewrite 'import {mod} as <alias>' on a "
f"workspace module — the regex would produce "
f"'import molecule_runtime.{mod} as {mod} as <alias>', invalid syntax. "
f"Use 'from {mod} import …' or plain 'import {mod}' instead. "
f"Offending line: {text[line_start:line_end]!r}"
)
# Plain `import X` — alias preserves the local name.
return f"{indent}import molecule_runtime.{mod} as {mod}{rest}"
return regex.sub(repl, text)

View File

@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None:
InboxState,
activate as inbox_activate,
get_state as inbox_get_state,
set_notification_callback as inbox_set_notification_callback,
start_poller_thread as inbox_start_poller_thread,
)
assert callable(inbox_activate), "inbox.activate must be callable"
assert callable(inbox_get_state), "inbox.get_state must be callable"
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable"
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
assert callable(get_adapter), "adapters.get_adapter must be callable"

View File

@ -277,12 +277,26 @@ var openTunnelCmd = func(o eicSSHOptions) *exec.Cmd {
// to 22; with CP provisioning today the workspace runs as a native
// process under the ubuntu user, so landing at ubuntu's shell IS the
// terminal experience.
//
// ConnectTimeout=10 is the user-experience guard — without it, ssh waits
// indefinitely for the remote sshd's banner. When the workspace EC2's
// sshd is unresponsive (mid-restart, SG drop, AMI without ec2-instance-
// connect installed) the canvas's xterm shows the user's typed bytes
// echoed back by the workspace-server's *local* PTY (cooked + echo mode
// before ssh finishes its handshake) and then closes silently when CF's
// idle WebSocket timer fires, with no "Connection refused" or "Permission
// denied" output ever reaching the user. Capping at 10s makes the failure
// surface as a real ssh error message in the terminal — caught 2026-04-30
// when hongmingwang's hermes shell hung after the heartbeat-fix redeploy
// and a probe at /workspaces/<id>/terminal sat for 60s with the only
// frame being the local-PTY echo of a single 'X' typed mid-handshake.
var sshCommandCmd = func(o eicSSHOptions) *exec.Cmd {
return exec.Command(
"ssh",
"-i", o.PrivateKeyPath,
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=10",
"-o", "ServerAliveInterval=30",
"-o", "ServerAliveCountMax=3",
"-p", fmt.Sprintf("%d", o.LocalPort),

View File

@ -320,6 +320,7 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
"-i", "/tmp/k",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=10",
"-o", "ServerAliveInterval=30",
"-o", "ServerAliveCountMax=3",
"-p", "2222",
@ -490,3 +491,57 @@ func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
}
}
// TestSSHCommandCmd_ConnectTimeoutPresent pins the user-experience guard
// against ssh-handshake-hang. Without ConnectTimeout, ssh waits forever
// for the remote sshd's banner — which masquerades as a "silently dead"
// shell to the user, because the workspace-server's local PTY is in
// cooked + echo mode before ssh finishes its handshake, so the canvas
// echoes the user's keystrokes back without ever reaching remote bash,
// and Cloudflare eventually closes the WebSocket on idle (~100s) with
// no error frame to surface what went wrong.
//
// Repro 2026-04-30: a 60s probe at hongmingwang's hermes /terminal
// endpoint after the heartbeat-fix redeploy showed only the local-PTY
// echo of a single 'X' typed mid-handshake. Workspace EC2 was up and
// heartbeating but its sshd was unresponsive; ssh hung indefinitely.
//
// Behavior-based: matches the literal `-o ConnectTimeout=N` arg pair so
// this stays pinned even if the rest of the args reorder. Does not pin
// the exact value — operators may tune it — but does pin presence.
func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
t.Parallel()
cmd := sshCommandCmd(eicSSHOptions{
InstanceID: "i-test",
OSUser: "ubuntu",
Region: "us-east-2",
LocalPort: 2222,
PrivateKeyPath: "/tmp/test-key",
})
args := cmd.Args
found := false
for i, a := range args {
if a != "-o" {
continue
}
if i+1 >= len(args) {
continue
}
val := args[i+1]
if len(val) >= len("ConnectTimeout=") &&
val[:len("ConnectTimeout=")] == "ConnectTimeout=" {
found = true
break
}
}
if !found {
t.Errorf("sshCommandCmd is missing `-o ConnectTimeout=N` — without it, "+
"ssh hangs forever when the workspace EC2's sshd is unresponsive "+
"and the canvas terminal silently dies on Cloudflare's idle WS "+
"timeout with no error message reaching the user. See terminal.go "+
"sshCommandCmd comment (2026-04-30 hongmingwang hermes). args=%v",
args)
}
}

View File

@ -17,6 +17,10 @@ import json
import logging
import sys
import inbox # noqa: F401 — bridge wiring lives in main(); the rewriter
# produces `import molecule_runtime.inbox as inbox`
# which preserves this binding for set_notification_callback.
from a2a_tools import (
tool_check_task_status,
tool_commit_memory,
@ -130,6 +134,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
return f"Unknown tool: {name}"
# --- MCP Notification bridge ---
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
def _build_channel_notification(msg: dict) -> dict:
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
envelope expected by Claude Code's channel-bridge contract.
Pure function so the wire shape is unit-testable without spinning
up an asyncio loop. The wire-up in ``main()`` just composes this
with ``asyncio.run_coroutine_threadsafe``.
"""
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"params": {
"content": msg.get("text", ""),
"meta": {
"source": "molecule",
"kind": msg.get("kind", ""),
"peer_id": msg.get("peer_id", ""),
"method": msg.get("method", ""),
"activity_id": msg.get("activity_id", ""),
"ts": msg.get("created_at", ""),
},
},
}
# --- MCP Server (JSON-RPC over stdio) ---
async def main(): # pragma: no cover
@ -148,6 +190,34 @@ async def main(): # pragma: no cover
writer.write(data.encode())
await writer.drain()
# Wire the inbox → MCP notification bridge. Inbox poller (daemon
# thread) calls into here when a new activity row lands; we
# schedule the notification onto the asyncio loop and best-effort
# fire it on the same stdout the responses go to.
loop = asyncio.get_running_loop()
async def _emit_notification(payload: dict) -> None:
data = json.dumps(payload) + "\n"
writer.write(data.encode())
try:
await writer.drain()
except Exception: # noqa: BLE001
# Closed pipe (host disconnected) shouldn't crash the
# inbox poller; let it sit until the host reconnects.
pass
def _on_inbox_message(msg: dict) -> None:
try:
asyncio.run_coroutine_threadsafe(
_emit_notification(_build_channel_notification(msg)),
loop,
)
except RuntimeError:
# Loop closed during shutdown — best-effort, swallow.
pass
inbox.set_notification_callback(_on_inbox_message)
buffer = ""
while True:
try:

View File

@ -323,7 +323,19 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
args=runtime_raw.get("args", []),
required_env=runtime_raw.get("required_env", []),
timeout=runtime_raw.get("timeout", 0),
model=runtime_raw.get("model", ""),
# Fall back to top-level resolved `model` (which already honors
# MODEL_PROVIDER env override, line 277) when YAML doesn't carry
# runtime_config.model. Without this fallback, SaaS workspaces
# silently boot with the adapter's hard-coded default —
# claude-code-default reads `runtime_config.model or "sonnet"`,
# so a user who picks Opus in the canvas Config tab gets Sonnet
# on the next CP-driven restart. Root cause: the CP user-data
# script regenerates /configs/config.yaml at every boot with
# only `name`, `runtime`, `a2a` keys (intentionally minimal so
# it doesn't carry stale state), losing runtime_config.model.
# MODEL_PROVIDER is plumbed as an env var, so picking it up via
# the top-level resolved model keeps the selection sticky.
model=runtime_raw.get("model") or model,
# Deprecated fields — kept for backward compat
auth_token_env=runtime_raw.get("auth_token_env", ""),
auth_token_file=runtime_raw.get("auth_token_file", ""),

View File

@ -53,7 +53,7 @@ import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from typing import Any, Callable
logger = logging.getLogger(__name__)
@ -173,10 +173,14 @@ class InboxState:
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
def record(self, message: InboxMessage) -> None:
"""Append a message and wake any waiter.
"""Append a message, wake any waiter, and fire the notification
callback (if registered) for push-UX-capable hosts.
Skips a row whose activity_id we've already queued — defensive
against the poller racing with the consumer + cursor save.
against the poller racing with the consumer + cursor save. The
dedupe short-circuits BEFORE the notification fires, so a
notification-capable host doesn't see duplicate push events on
backlog overlap.
"""
with self._lock:
for existing in self._queue:
@ -184,6 +188,19 @@ class InboxState:
return
self._queue.append(message)
self._arrival.set()
# Fire notification AFTER releasing the lock so the callback
# is free to do anything (including calling back into inbox)
# without deadlock. Best-effort: a raising callback must not
# prevent the message from landing in the queue — observability
# is more important than push delivery.
cb = _NOTIFICATION_CALLBACK
if cb is not None:
try:
cb(message.to_dict())
except Exception:
logger.warning(
"inbox: notification callback raised", exc_info=True
)
def peek(self, limit: int = 10) -> list[InboxMessage]:
"""Return up to ``limit`` pending messages without removing them."""
@ -240,6 +257,35 @@ class InboxState:
_STATE: InboxState | None = None
# Notification bridge — set by the universal MCP server (a2a_mcp_server.py)
# at startup so that new inbox arrivals can be pushed to notification-
# capable hosts (Claude Code) as MCP `notifications/claude/channel`
# events. Kept module-level (rather than a method on InboxState) so the
# inbox doesn't need to know about MCP — a thin pluggable seam.
#
# Defaults to None: in-container runtimes that don't activate the inbox
# also don't push notifications, and tests start clean. The wheel's
# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge
# tests below.
_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None
def set_notification_callback(cb: Callable[[dict], None] | None) -> None:
"""Register (or clear) the per-message notification callback.
The callback receives ``InboxMessage.to_dict()`` for each new
arrival same shape ``inbox_peek`` returns to the agent, so a
bridge can build its MCP notification payload without re-deriving
fields.
Best-effort: a raising callback does NOT prevent the message from
landing in the queue (see ``InboxState.record``). Pass ``None`` to
clear (used by tests + the wheel's shutdown path).
"""
global _NOTIFICATION_CALLBACK
_NOTIFICATION_CALLBACK = cb
def activate(state: InboxState) -> None:
"""Register an InboxState as the singleton this module exposes.

View File

@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED():
assert forbidden in desc, (
f"`attachments` description must call out {forbidden!r} as a wrong alternative"
)
# ============== Inbox → MCP notification bridge (2026-05-01) ==============
# Notification-capable hosts (Claude Code) get push UX when a new inbound
# message lands; pollers (wait_for_message/inbox_peek) keep working.
# `_build_channel_notification` is the pure shape transformer — wire-up
# in main() composes it with asyncio.run_coroutine_threadsafe.
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
def test_build_channel_notification_content_is_message_text():
"""`content` is what becomes the agent conversation turn —
pulled directly from the inbox message text."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello from canvas",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["params"]["content"] == "hello from canvas"
def test_build_channel_notification_meta_carries_routing_fields():
"""Meta must include kind, peer_id, method, activity_id, ts —
fields the agent or downstream tooling needs to route a reply
(canvas_user /notify, peer_agent /a2a) and to acknowledge
via inbox_pop."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-7",
"text": "ping",
"peer_id": "ws-peer-uuid",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T01:23:45Z",
})
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "ws-peer-uuid"
assert meta["method"] == "message/send"
assert meta["activity_id"] == "act-7"
assert meta["ts"] == "2026-05-01T01:23:45Z"
def test_build_channel_notification_no_id_field():
"""Notifications MUST NOT carry a JSON-RPC `id` field — that's
what distinguishes them from requests. A notification with `id`
would be mis-interpreted as a request and clients would wait
for a response that never comes."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({"text": "x"})
assert "id" not in payload, (
"notifications must omit `id` per JSON-RPC 2.0 spec — "
"presence would make MCP clients await a phantom response"
)
def test_build_channel_notification_handles_missing_fields_gracefully():
"""Some fields may be absent on edge-case messages (e.g. cursor
bootstrapping with no created_at yet). Default to empty strings
so the wire shape stays valid JSON instead of crashing."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({})
assert payload["params"]["content"] == ""
meta = payload["params"]["meta"]
assert meta["activity_id"] == ""
assert meta["peer_id"] == ""
assert meta["kind"] == ""

View File

@ -81,6 +81,89 @@ def test_load_config_model_no_env(tmp_path, monkeypatch):
assert cfg.model == "openai:gpt-4o"
def test_runtime_config_model_falls_back_to_top_level(tmp_path, monkeypatch):
"""When YAML omits runtime_config.model, fall back to the top-level
resolved model.
Without this fallback, SaaS workspaces silently boot with the
adapter's hard-coded default — claude-code-default reads
``runtime_config.model or "sonnet"``, so even a user who picks Opus
in the canvas Config tab gets Sonnet on the next restart. Root
cause: the CP user-data script regenerates /configs/config.yaml
at every boot with only ``name``, ``runtime``, ``a2a`` keys
(intentionally minimal so it doesn't carry stale state), losing
runtime_config.model. MODEL_PROVIDER is plumbed as an env var, so
picking it up via the top-level resolved ``model`` keeps the
selection sticky across restarts.
"""
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
config_yaml = tmp_path / "config.yaml"
# Top-level model set, runtime_config.model NOT set — exactly the
# shape the CP user-data writes after restart.
config_yaml.write_text(yaml.dump({"model": "anthropic:claude-opus-4-7"}))
cfg = load_config(str(tmp_path))
assert cfg.runtime_config.model == "anthropic:claude-opus-4-7"
def test_runtime_config_model_yaml_wins_over_top_level(tmp_path, monkeypatch):
"""When YAML explicitly sets runtime_config.model, it takes precedence
over the top-level model. Tests the fallback is only a fallback
not a clobber that would break workspaces with intentionally
different runtime_config.model vs top-level model values.
"""
monkeypatch.delenv("MODEL_PROVIDER", raising=False)
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump(
{
"model": "anthropic:claude-opus-4-7",
"runtime_config": {"model": "openai:gpt-4o"},
}
)
)
cfg = load_config(str(tmp_path))
# Top-level still resolves to its own value.
assert cfg.model == "anthropic:claude-opus-4-7"
# runtime_config.model wins — fallback only fires when YAML is empty.
assert cfg.runtime_config.model == "openai:gpt-4o"
def test_runtime_config_model_picks_up_env_via_top_level(tmp_path, monkeypatch):
"""End-to-end path the canvas Save+Restart relies on: user picks
a model workspace_secrets.MODEL_PROVIDER updated CP user-data
re-renders /configs/config.yaml WITHOUT runtime_config.model
workspace boots with MODEL_PROVIDER env var. The top-level model
resolves from MODEL_PROVIDER (line 277), then runtime_config.model
falls back to that. Adapter sees the user's selection.
This is the regression test for the canvas-side feedback
"Provisioner doesn't read model from config.yaml and doesn't set
MODEL env var. Without MODEL, the adapter defaults to sonnet and
bypasses the mimo routing." (2026-04-30).
"""
monkeypatch.setenv("MODEL_PROVIDER", "minimax/abab7-chat-preview")
config_yaml = tmp_path / "config.yaml"
# CP-shaped minimal config.yaml: only name + runtime + a2a, NO
# top-level model, NO runtime_config.model.
config_yaml.write_text(
yaml.dump(
{
"name": "Test Agent",
"runtime": "claude-code",
"a2a": {"port": 8000, "streaming": True},
}
)
)
cfg = load_config(str(tmp_path))
assert cfg.model == "minimax/abab7-chat-preview"
# The adapter (claude-code-default reads runtime_config.model or "sonnet")
# now sees the user's selected model instead of "sonnet".
assert cfg.runtime_config.model == "minimax/abab7-chat-preview"
def test_delegation_config_defaults(tmp_path):
"""DelegationConfig nested defaults are applied."""
config_yaml = tmp_path / "config.yaml"

View File

@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
def test_default_cursor_path_falls_back_to_default(monkeypatch):
monkeypatch.delenv("CONFIGS_DIR", raising=False)
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Notification callback bridge — push UX for notification-capable hosts
# ---------------------------------------------------------------------------
#
# `record()` is called from the poller daemon thread when a new activity
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
# pushed a notification — the universal wheel registers a callback via
# `set_notification_callback()` that fires the MCP notification. Pollers
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
@pytest.fixture(autouse=True)
def _reset_notification_callback():
"""Each test starts with no callback registered. Notification
state must not leak across tests same pattern as _reset_singleton."""
inbox.set_notification_callback(None)
yield
inbox.set_notification_callback(None)
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
"""When a callback is registered, record() invokes it with the
canonical to_dict() shape same shape inbox_peek returns to the
agent. Callers can build MCP notification payloads from this
without re-deriving fields."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
assert len(received) == 1
payload = received[0]
assert payload["activity_id"] == "act-1"
assert payload["text"] == "hello"
assert payload["peer_id"] == "ws-peer"
assert payload["kind"] == "peer_agent" # to_dict derives this
assert payload["method"] == "message/send"
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
"""The activity_id dedupe path must short-circuit BEFORE invoking
the callback otherwise a notification-capable host would see
duplicate push events on poller backlog overlap."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1"))
state.record(_msg("act-1")) # dedupe — same id
assert len(received) == 1, (
f"expected 1 callback (dedupe), got {len(received)}"
f"would cause duplicate Claude conversation interrupts"
)
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
serialization error on an exotic message) must NOT prevent the
message from landing in the queue. Notification delivery is
best-effort; inbox correctness is not negotiable."""
def boom(_payload):
raise RuntimeError("simulated callback failure")
inbox.set_notification_callback(boom)
# Must not raise, must still queue the message.
state.record(_msg("act-1"))
queued = state.peek(10)
assert len(queued) == 1
assert queued[0].activity_id == "act-1"
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
"""When no callback is set (in-container path, or before
activation), record() proceeds normally no None-call crash."""
# No set_notification_callback() in this test — autouse fixture
# cleared any previous registration.
state.record(_msg("act-1"))
assert len(state.peek(10)) == 1
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
"""Re-registering the callback replaces the previous — only the
latest callback fires. Test ensures the universal wheel can update
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
first: list[dict] = []
second: list[dict] = []
inbox.set_notification_callback(first.append)
inbox.set_notification_callback(second.append)
state.record(_msg("act-1"))
assert len(first) == 0, "first callback should be unregistered"
assert len(second) == 1, "second callback should receive the event"
def test_set_notification_callback_none_clears(state: inbox.InboxState):
"""Setting None clears the callback — used by tests + the wheel's
shutdown path."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
inbox.set_notification_callback(None)
state.record(_msg("act-1"))
assert received == []