fix(cli): surface self-improvement review summaries from bg thread

When the self-improvement background review fires after a turn, it runs
in a bg thread and emits a '  💾 <summary>' line to announce what it
saved to memory or skills. Two problems made this invisible to users
even when the review successfully modified a skill:

1. The print went through `_cprint` (prompt_toolkit's print_formatted_text)
   on a bg thread while the CLI's PromptSession was live. Direct
   print_formatted_text races with the input-area redraw and the line
   can land behind/above the prompt, scrolled off without the user
   seeing it.

2. The message said only '💾 Skill created.' / '💾 Memory updated'
   with no indication that the self-improvement loop was the one doing
   this. Users who did catch the line couldn't tell the background
   review from some other agent action.

Fixes:

- `_cprint` now detects when it's called from a non-app thread with a
  running prompt_toolkit Application, and routes through
  `run_in_terminal` via `loop.call_soon_threadsafe`. That pauses the
  input, prints the line above the prompt, and redraws — the normal
  prompt_toolkit contract for bg-thread output. Direct-print fallback
  preserved for the no-app / same-thread / import-error paths. Affects
  every bg-thread emission, not just the review summary (curator
  summaries and auxiliary failure prints benefit too).

- The summary now reads '  💾 Self-improvement review: <summary>' in
  both the CLI and the gateway `background_review_callback` path, so
  the origin is unambiguous.

Tests:
- New `tests/cli/test_cprint_bg_thread.py` covers all five routing
  branches (no app, app-not-running, cross-thread schedule, same-thread
  direct, app-loop-attribute-error, import-error).
- New case in `tests/run_agent/test_background_review.py` asserts the
  attributed prefix shows up in both `_safe_print` and
  `background_review_callback`.

Live E2E: exercised _cprint from a bg thread inside a real Application
event loop; confirmed get_app_or_none() sees the app, call_soon_threadsafe
schedules run_in_terminal, and the inner _pt_print runs.
This commit is contained in:
Teknium 2026-04-30 13:01:17 -07:00
parent c868425467
commit 80a676658c
4 changed files with 341 additions and 3 deletions

67
cli.py
View File

@ -1240,8 +1240,73 @@ def _cprint(text: str):
Raw ANSI escapes written via print() are swallowed by patch_stdout's
StdoutProxy. Routing through print_formatted_text(ANSI(...)) lets
prompt_toolkit parse the escapes and render real colors.
When called from a background thread while a prompt_toolkit
``Application`` is running (the common case for the self-improvement
background review's ``💾 …`` summary, curator summaries, and other
bg-thread emissions), a direct ``_pt_print`` races with the input
area's redraw and the line can end up visually buried behind the
prompt. Route those cases through ``run_in_terminal`` via
``loop.call_soon_threadsafe``, which pauses the input area, prints
the line above it, and redraws the prompt cleanly.
"""
_pt_print(_PT_ANSI(text))
try:
from prompt_toolkit.application import get_app_or_none, run_in_terminal
except Exception:
_pt_print(_PT_ANSI(text))
return
app = None
try:
app = get_app_or_none()
except Exception:
app = None
# No active app, or we're already on the app's main thread: the
# direct prompt_toolkit print is safe and matches existing behavior
# (spinner frames, streamed tokens, tool activity prefixes, …).
if app is None or not getattr(app, "_is_running", False):
_pt_print(_PT_ANSI(text))
return
try:
loop = app.loop # type: ignore[attr-defined]
except Exception:
loop = None
if loop is None:
_pt_print(_PT_ANSI(text))
return
import asyncio as _asyncio
try:
current_loop = _asyncio.get_event_loop_policy().get_event_loop()
except Exception:
current_loop = None
# Same thread as the app's loop → safe to print directly.
if current_loop is loop and loop.is_running():
_pt_print(_PT_ANSI(text))
return
# Cross-thread emission: ask the app's event loop to schedule a
# ``run_in_terminal`` that wraps ``_pt_print``. This hides the
# prompt, prints, and redraws. Fire-and-forget — if scheduling
# fails we fall back to a direct print so the line isn't lost.
def _schedule():
try:
run_in_terminal(lambda: _pt_print(_PT_ANSI(text)))
except Exception:
try:
_pt_print(_PT_ANSI(text))
except Exception:
pass
try:
loop.call_soon_threadsafe(_schedule)
except Exception:
try:
_pt_print(_PT_ANSI(text))
except Exception:
pass
# ---------------------------------------------------------------------------

View File

@ -3593,11 +3593,15 @@ class AIAgent:
if actions:
summary = " · ".join(dict.fromkeys(actions))
self._safe_print(f" 💾 {summary}")
self._safe_print(
f" 💾 Self-improvement review: {summary}"
)
_bg_cb = self.background_review_callback
if _bg_cb:
try:
_bg_cb(f"💾 {summary}")
_bg_cb(
f"💾 Self-improvement review: {summary}"
)
except Exception:
pass

View File

@ -0,0 +1,206 @@
"""Tests for cli._cprint's bg-thread cooperation with prompt_toolkit.
Background: when a prompt_toolkit Application is running, a bg thread that
calls ``_pt_print`` directly can race with the input-area redraw and the
printed line can end up visually buried behind the prompt. ``_cprint`` now
routes cross-thread prints through ``run_in_terminal`` via
``loop.call_soon_threadsafe`` so the self-improvement background review's
``💾 Self-improvement review: `` summary actually surfaces to the user.
These tests verify the routing logic without spinning up a real PT app.
"""
from __future__ import annotations
import sys
import types
from types import SimpleNamespace
import cli
def test_cprint_no_app_direct_print(monkeypatch):
"""No active app → direct _pt_print, no run_in_terminal involvement."""
calls = []
monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x)))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: ("ANSI", t))
# Patch the prompt_toolkit import the function performs internally.
fake_pt_app = types.ModuleType("prompt_toolkit.application")
fake_pt_app.get_app_or_none = lambda: None
fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",))
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
cli._cprint("hello")
assert calls == [("pt_print", ("ANSI", "hello"))]
def test_cprint_app_not_running_direct_print(monkeypatch):
"""App exists but not running (e.g. teardown) → direct print."""
calls = []
monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x)))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
fake_app = SimpleNamespace(_is_running=False, loop=None)
fake_pt_app = types.ModuleType("prompt_toolkit.application")
fake_pt_app.get_app_or_none = lambda: fake_app
fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",))
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
cli._cprint("x")
assert calls == [("pt_print", "x")]
def test_cprint_bg_thread_schedules_on_app_loop(monkeypatch):
"""App running + different thread → schedules via call_soon_threadsafe."""
scheduled = []
direct_prints = []
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
class FakeLoop:
def is_running(self):
return True
def call_soon_threadsafe(self, cb, *args):
scheduled.append(cb)
fake_loop = FakeLoop()
# Install a fake "current loop" that is NOT the app's loop, so the
# cross-thread branch is taken.
fake_current_loop = SimpleNamespace(is_running=lambda: True)
fake_asyncio = types.ModuleType("asyncio")
class _Policy:
def get_event_loop(self):
return fake_current_loop
fake_asyncio.get_event_loop_policy = lambda: _Policy()
monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio)
fake_app = SimpleNamespace(_is_running=True, loop=fake_loop)
fake_pt_app = types.ModuleType("prompt_toolkit.application")
fake_pt_app.get_app_or_none = lambda: fake_app
run_in_terminal_calls = []
def _fake_run_in_terminal(func, **kw):
run_in_terminal_calls.append(func)
# Simulate run_in_terminal actually calling func (as the real PT
# impl would once the app loop tick picks it up).
func()
return None
fake_pt_app.run_in_terminal = _fake_run_in_terminal
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
cli._cprint("💾 Self-improvement review: Skill updated")
# call_soon_threadsafe must have been called with a scheduling cb.
assert len(scheduled) == 1
# Invoking the scheduled callback should hit run_in_terminal.
scheduled[0]()
assert len(run_in_terminal_calls) == 1
# And run_in_terminal's inner func should have emitted a pt_print.
assert direct_prints == ["💾 Self-improvement review: Skill updated"]
def test_cprint_same_thread_as_app_loop_direct_print(monkeypatch):
"""App running on same thread → direct print (no scheduling)."""
direct_prints = []
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
class FakeLoop:
def is_running(self):
return True
def call_soon_threadsafe(self, cb, *args):
raise AssertionError(
"call_soon_threadsafe must not be used on the app's own thread"
)
fake_loop = FakeLoop()
fake_asyncio = types.ModuleType("asyncio")
class _Policy:
def get_event_loop(self):
return fake_loop # same as app loop
fake_asyncio.get_event_loop_policy = lambda: _Policy()
monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio)
fake_app = SimpleNamespace(_is_running=True, loop=fake_loop)
fake_pt_app = types.ModuleType("prompt_toolkit.application")
fake_pt_app.get_app_or_none = lambda: fake_app
fake_pt_app.run_in_terminal = lambda *a, **kw: None
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
cli._cprint("x")
assert direct_prints == ["x"]
def test_cprint_swallows_app_loop_attr_error(monkeypatch):
"""Loop missing on app → fall back to direct print, no crash."""
direct_prints = []
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
class WeirdApp:
_is_running = True
@property
def loop(self):
raise RuntimeError("no loop for you")
fake_pt_app = types.ModuleType("prompt_toolkit.application")
fake_pt_app.get_app_or_none = lambda: WeirdApp()
fake_pt_app.run_in_terminal = lambda *a, **kw: None
monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app)
cli._cprint("fallback")
assert direct_prints == ["fallback"]
def test_cprint_swallows_prompt_toolkit_import_error(monkeypatch):
"""If prompt_toolkit.application itself fails to import, fall back."""
direct_prints = []
monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x))
monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t)
# Drop cached prompt_toolkit.application AND install a meta-path finder
# that raises ImportError on re-import.
monkeypatch.delitem(sys.modules, "prompt_toolkit.application", raising=False)
class _BlockFinder:
def find_module(self, name, path=None):
if name == "prompt_toolkit.application":
return self
return None
def load_module(self, name):
raise ImportError("blocked for test")
def find_spec(self, name, path=None, target=None):
if name == "prompt_toolkit.application":
# Returning a bogus spec that will fail on load works too,
# but raising here keeps the test simple.
raise ImportError("blocked for test")
return None
blocker = _BlockFinder()
sys.meta_path.insert(0, blocker)
try:
cli._cprint("fallback2")
finally:
sys.meta_path.remove(blocker)
assert direct_prints == ["fallback2"]

View File

@ -127,3 +127,66 @@ def test_background_review_installs_auto_deny_approval_callback(monkeypatch):
"Background review leaked its approval callback into the worker "
"thread's TLS slot; a recycled thread-id could reuse it."
)
def test_background_review_summary_is_attributed_to_self_improvement_loop(monkeypatch):
"""The CLI/gateway emission must identify the self-improvement loop.
Users who miss the line in their terminal have no way to tell that the
background review was what modified their skill/memory stores. The
summary prefix ``💾 Self-improvement review: `` makes the origin
explicit so both the CLI and gateway deliveries are unambiguous.
"""
import json
captured_prints: list = []
captured_bg_callback: list = []
class FakeReviewAgent:
def __init__(self, **kwargs):
# Simulate a review that successfully updated memory so
# _summarize_background_review_actions returns a real action.
self._session_messages = [
{
"role": "tool",
"tool_call_id": "call_bg",
"content": json.dumps(
{"success": True, "message": "Entry added", "target": "memory"}
),
}
]
def run_conversation(self, **kwargs):
pass
def shutdown_memory_provider(self):
pass
def close(self):
pass
monkeypatch.setattr(run_agent_module, "AIAgent", FakeReviewAgent)
monkeypatch.setattr(run_agent_module.threading, "Thread", ImmediateThread)
agent = _bare_agent()
agent._safe_print = lambda *a, **kw: captured_prints.append(" ".join(str(x) for x in a))
agent.background_review_callback = lambda msg: captured_bg_callback.append(msg)
AIAgent._spawn_background_review(
agent,
messages_snapshot=[{"role": "user", "content": "hi"}],
review_memory=True,
)
# Exactly one summary should have been emitted, and it must identify
# the self-improvement review explicitly.
assert len(captured_prints) == 1, captured_prints
printed = captured_prints[0]
assert "Self-improvement review" in printed, printed
assert "Memory updated" in printed, printed
# Gateway path gets the same prefix.
assert len(captured_bg_callback) == 1
assert captured_bg_callback[0].startswith("💾 Self-improvement review:"), (
captured_bg_callback[0]
)