fix(terminal): three-layer defense against watch_patterns notification spam (#15642)

* fix(terminal): three-layer defense against watch_patterns notification spam

Background processes that stack notify_on_complete=True with watch_patterns
can flood the user with duplicate, delayed notifications — matches deliver
asynchronously via the completion queue and continue arriving minutes after
the process has exited. The docstring warning against this (PR #12113) has
proven insufficient; agents still misuse the combination.

Three layered defenses, each sufficient on its own:

1. Mutual exclusion (terminal_tool.py): When both flags are set on a
   background process, drop watch_patterns with a warning. notify_on_complete
   wins because 'let me know when it's done' is the more useful signal and
   fires exactly once. Extracted as _resolve_notification_flag_conflict() so
   the rule is testable in isolation.

2. Suppress-after-exit (process_registry.py): _check_watch_patterns() now
   bails the moment session.exited is True. Post-exit chunks (buffered reads
   draining after the process is gone) no longer produce notifications. This
   is the fix flagged as future work in session 20260418_020302_79881c.

3. Global circuit breaker (process_registry.py): Per-session rate limits don't
   catch the sibling-flood case — N concurrent processes can each stay under
   8/10s and still collectively spam. New WATCH_GLOBAL_MAX_PER_WINDOW=15 cap
   trips a 30-second cooldown across ALL sessions, emits a single
   watch_overflow_tripped event, silently counts dropped events, and emits a
   watch_overflow_released summary when the cooldown ends.

Also updates the tool schema + docstring to document the new behavior.

Tests: 8 new tests covering all three fixes (suppress-after-exit x2,
mutual-exclusion resolver x4, global breaker trip/cooldown/release x2).
All 60 tests across test_watch_patterns.py, test_notify_on_complete.py,
test_terminal_tool.py pass.

Real-world trigger: self-inflicted in session 20260425_051924 — three
concurrent hermes-sweeper review subprocesses each set watch_patterns=
['failed validation', 'errored'] AND notify_on_complete=True, then iterated
over multiple items, producing enough matches per process to defeat the
per-session cap while staying under the global cap that didn't yet exist.

* fix(terminal): aggressive 1-per-15s watch_patterns rate limit + strike-3 promotion

Per Teknium's direction, the watch_patterns rate limit is now much more
aggressive and self-healing.

## New rule — per session

- HARD cap: 1 watch-match notification per 15 seconds per process.
- Any match arriving inside the cooldown window is dropped and counts as
  ONE strike for that window (many drops in the same window still = 1 strike).
- After 3 consecutive strike windows, watch_patterns is permanently disabled
  for the session and the session is auto-promoted to notify_on_complete
  semantics — exactly one notification when the process actually exits.
- A cooldown window that expires with zero drops resets the consecutive
  strike counter — healthy cadence is forgiven.

## Schema + docstring rewritten

The tool schema description now gives the model explicit guidance:
- notify_on_complete is 'the right choice for almost every long-running task'
- watch_patterns is for RARE one-shot signals on LONG-LIVED processes
- Do NOT use watch_patterns with loops/batch jobs — error patterns fire every
  iteration and will hit the strike limit fast
- Mutual exclusion is stated on both parameter descriptions
- 1/15s cooldown and 3-strike promotion are stated in the watch_patterns
  description so the model sees the contract every turn

## Removed

- WATCH_MAX_PER_WINDOW (8/10s) and WATCH_OVERLOAD_KILL_SECONDS (45) — the
  new 1/15s limit subsumes both; keeping them would double-count.
- _watch_window_hits / _watch_window_start / _watch_overload_since fields
  on ProcessSession. Replaced by _watch_last_emit_at / _watch_cooldown_until
  / _watch_strike_candidate / _watch_consecutive_strikes.

## Kept

- Global circuit breaker across all sessions (15/10s → 30s cooldown) as a
  secondary safety net for concurrent siblings. Still valuable when 20
  short-lived processes each fire once — none individually violates the
  per-session limit.
- Suppress-after-exit guard.
- Mutual exclusion resolver at the tool entry point.

## Tests

- 6 new tests in TestPerSessionRateLimit covering: first match delivers,
  second in cooldown suppressed, multi-drop = single strike, 3 strikes
  disables + promotes, clean window resets counter, suppressed count
  carried to next emit.
- Global circuit breaker tests rewritten to use fresh sessions instead of
  hacking removed per-window fields.
- 50/50 watch_patterns + notify_on_complete tests pass.
- 60/60 including test_terminal_tool.py pass.
This commit is contained in:
Teknium 2026-04-25 06:41:58 -07:00 committed by GitHub
parent 6e561ffa6d
commit 97d54f0e4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 492 additions and 131 deletions

View File

@ -19,9 +19,11 @@ from unittest.mock import patch
from tools.process_registry import (
ProcessRegistry,
ProcessSession,
WATCH_MAX_PER_WINDOW,
WATCH_WINDOW_SECONDS,
WATCH_OVERLOAD_KILL_SECONDS,
WATCH_MIN_INTERVAL_SECONDS,
WATCH_STRIKE_LIMIT,
WATCH_GLOBAL_MAX_PER_WINDOW,
WATCH_GLOBAL_WINDOW_SECONDS,
WATCH_GLOBAL_COOLDOWN_SECONDS,
)
@ -129,10 +131,15 @@ class TestCheckWatchPatterns:
assert registry.completion_queue.empty()
def test_hit_counter_increments(self, registry):
"""Each delivered notification increments _watch_hits."""
"""Each delivered notification increments _watch_hits.
With 1/15s rate limit, we need to reset cooldown between calls.
"""
session = _make_session(watch_patterns=["X"])
registry._check_watch_patterns(session, "X\n")
assert session._watch_hits == 1
# Reset cooldown so the second match gets delivered.
session._watch_cooldown_until = 0.0
registry._check_watch_patterns(session, "X\n")
assert session._watch_hits == 2
@ -148,100 +155,114 @@ class TestCheckWatchPatterns:
# =========================================================================
# Rate limiting
# Per-session rate limiting: 1 notification per 15s, 3 strikes → disable
# =========================================================================
class TestRateLimiting:
def test_within_window_limit(self, registry):
"""Notifications within the rate limit all get delivered."""
class TestPerSessionRateLimit:
def test_first_match_delivers(self, registry):
"""A fresh session with no prior cooldown delivers the first match."""
session = _make_session(watch_patterns=["E"])
for i in range(WATCH_MAX_PER_WINDOW):
registry._check_watch_patterns(session, f"E {i}\n")
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW
registry._check_watch_patterns(session, "E first\n")
assert registry.completion_queue.qsize() == 1
evt = registry.completion_queue.get_nowait()
assert evt["type"] == "watch_match"
assert session._watch_hits == 1
# Cooldown is now armed.
assert session._watch_cooldown_until > 0
def test_exceeds_window_limit(self, registry):
"""Notifications beyond the rate limit are suppressed."""
def test_second_match_within_cooldown_is_suppressed(self, registry):
"""A second match inside the 15s cooldown is dropped and counted."""
session = _make_session(watch_patterns=["E"])
for i in range(WATCH_MAX_PER_WINDOW + 5):
registry._check_watch_patterns(session, f"E {i}\n")
# Only WATCH_MAX_PER_WINDOW should be in the queue
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW
assert session._watch_suppressed == 5
def test_window_resets(self, registry):
"""After the window expires, notifications can flow again."""
session = _make_session(watch_patterns=["E"])
# Fill the window
for i in range(WATCH_MAX_PER_WINDOW):
registry._check_watch_patterns(session, f"E {i}\n")
# One more should be suppressed
registry._check_watch_patterns(session, "E extra\n")
registry._check_watch_patterns(session, "E first\n")
assert registry.completion_queue.qsize() == 1
# Immediately trigger another match — well inside cooldown.
registry._check_watch_patterns(session, "E second\n")
# Still only one notification.
assert registry.completion_queue.qsize() == 1
assert session._watch_suppressed == 1
assert session._watch_consecutive_strikes == 1
# Fast-forward past window
session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1
registry._check_watch_patterns(session, "E after reset\n")
# Should deliver now (window reset)
assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW + 1
def test_suppressed_count_in_next_delivery(self, registry):
"""Suppressed count is reported in the next successful delivery."""
def test_many_drops_inside_window_count_as_ONE_strike(self, registry):
"""Multiple suppressions inside the same cooldown window = 1 strike."""
session = _make_session(watch_patterns=["E"])
for i in range(WATCH_MAX_PER_WINDOW):
registry._check_watch_patterns(session, f"E {i}\n")
# Suppress 3 more
for i in range(3):
registry._check_watch_patterns(session, f"E suppressed {i}\n")
assert session._watch_suppressed == 3
registry._check_watch_patterns(session, "E\n")
for _ in range(10):
registry._check_watch_patterns(session, "E\n")
assert session._watch_consecutive_strikes == 1
assert session._watch_suppressed == 10
# Fast-forward past window to allow delivery
session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1
registry._check_watch_patterns(session, "E back\n")
# Drain to the last event
last_evt = None
while not registry.completion_queue.empty():
last_evt = registry.completion_queue.get_nowait()
assert last_evt["suppressed"] == 3
assert session._watch_suppressed == 0 # reset after delivery
# =========================================================================
# Overload kill switch
# =========================================================================
class TestOverloadKillSwitch:
def test_sustained_overload_disables(self, registry):
"""Sustained overload beyond threshold permanently disables watching."""
def test_three_strikes_disables_watch_and_promotes_to_notify(self, registry):
"""Three consecutive strike windows → watch_disabled + notify_on_complete."""
session = _make_session(watch_patterns=["E"])
# Fill the window to trigger rate limit
for i in range(WATCH_MAX_PER_WINDOW):
registry._check_watch_patterns(session, f"E {i}\n")
session.notify_on_complete = False
# Simulate sustained overload: set overload_since to past threshold
session._watch_overload_since = time.time() - WATCH_OVERLOAD_KILL_SECONDS - 1
# Force another suppressed hit
registry._check_watch_patterns(session, "E overload\n")
registry._check_watch_patterns(session, "E overload2\n")
for strike in range(WATCH_STRIKE_LIMIT):
# Emit → arms cooldown.
registry._check_watch_patterns(session, f"E emit {strike}\n")
# Attempt while inside cooldown → one strike, dropped.
registry._check_watch_patterns(session, f"E drop {strike}\n")
# Fast-forward past the cooldown for the NEXT iteration, BUT leave
# the strike candidate set so the cooldown-expiry branch sees
# "this was a strike window" and doesn't reset the counter.
session._watch_cooldown_until = time.time() - 0.01
# After WATCH_STRIKE_LIMIT strikes, the next attempt should find
# the session disabled.
assert session._watch_disabled is True
# Should have a watch_disabled event in the queue
assert session.notify_on_complete is True
# One watch_disabled summary event should be in the queue.
disabled_evts = []
matches = 0
while not registry.completion_queue.empty():
evt = registry.completion_queue.get_nowait()
if evt.get("type") == "watch_disabled":
disabled_evts.append(evt)
elif evt.get("type") == "watch_match":
matches += 1
assert len(disabled_evts) == 1
assert "too many matches" in disabled_evts[0]["message"]
assert "notify_on_complete" in disabled_evts[0]["message"]
# We should have had exactly WATCH_STRIKE_LIMIT emissions before disable.
assert matches == WATCH_STRIKE_LIMIT
def test_overload_resets_on_delivery(self, registry):
"""Overload timer resets when a notification gets through."""
def test_clean_window_resets_strike_counter(self, registry):
"""A cooldown that expires with zero drops resets the consecutive counter."""
session = _make_session(watch_patterns=["E"])
# Start overload tracking
session._watch_overload_since = time.time() - 10
# But window allows delivery → overload should reset
registry._check_watch_patterns(session, "E ok\n")
assert session._watch_overload_since == 0.0
assert session._watch_disabled is False
# Emit + drop inside window → 1 strike.
registry._check_watch_patterns(session, "E emit\n")
registry._check_watch_patterns(session, "E drop\n")
assert session._watch_consecutive_strikes == 1
# Fast-forward past cooldown. No match arrived during the window —
# strike_candidate stays False from the prior window's reset, but
# it was True during that window. On the NEXT emission, the
# cooldown-expiry branch checks strike_candidate. Since we emitted
# at the start of this new window and no drop has happened, the
# reset branch should fire.
session._watch_cooldown_until = time.time() - 0.01
# Clear strike candidate to simulate "this cooldown had no drops".
session._watch_strike_candidate = False
registry._check_watch_patterns(session, "E clean\n")
assert session._watch_consecutive_strikes == 0
def test_suppressed_count_in_next_delivery(self, registry):
"""Suppressed count from a strike window is reported in the next emit."""
session = _make_session(watch_patterns=["E"])
registry._check_watch_patterns(session, "E emit\n")
for _ in range(4):
registry._check_watch_patterns(session, "E drop\n")
assert session._watch_suppressed == 4
# Fast-forward past cooldown.
session._watch_cooldown_until = time.time() - 0.01
# Drain the queue so we can inspect the next emission.
while not registry.completion_queue.empty():
registry.completion_queue.get_nowait()
registry._check_watch_patterns(session, "E back\n")
evt = registry.completion_queue.get_nowait()
assert evt["type"] == "watch_match"
assert evt["suppressed"] == 4
assert session._watch_suppressed == 0 # reset after delivery
# =========================================================================
@ -321,3 +342,150 @@ class TestCodeExecutionBlocked:
def test_watch_patterns_blocked(self):
from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS
assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS
# =========================================================================
# Suppress-after-exit (anti-spam fix)
# =========================================================================
class TestSuppressAfterExit:
def test_match_dropped_once_session_exited(self, registry):
"""watch_patterns notifications stop the moment session.exited is set."""
session = _make_session(watch_patterns=["ERROR"])
# Mark the process as exited BEFORE the late chunk arrives.
session.exited = True
registry._check_watch_patterns(session, "ERROR: late buffer\n")
assert registry.completion_queue.empty()
assert session._watch_hits == 0
def test_match_still_delivered_while_session_running(self, registry):
"""Sanity: while the process is still running, matches still deliver."""
session = _make_session(watch_patterns=["ERROR"])
session.exited = False
registry._check_watch_patterns(session, "ERROR: oh no\n")
assert not registry.completion_queue.empty()
evt = registry.completion_queue.get_nowait()
assert evt["type"] == "watch_match"
# =========================================================================
# Mutual exclusion: notify_on_complete wins over watch_patterns
# =========================================================================
class TestMutualExclusion:
def test_resolver_drops_watch_when_notify_set(self):
"""Both flags set → watch_patterns dropped with a note."""
from tools.terminal_tool import _resolve_notification_flag_conflict
resolved, note = _resolve_notification_flag_conflict(
notify_on_complete=True,
watch_patterns=["ERROR", "DONE"],
background=True,
)
assert resolved is None
assert "notify_on_complete" in note
assert "duplicate notifications" in note
def test_resolver_keeps_watch_when_notify_off(self):
"""notify_on_complete=False → watch_patterns kept intact."""
from tools.terminal_tool import _resolve_notification_flag_conflict
resolved, note = _resolve_notification_flag_conflict(
notify_on_complete=False,
watch_patterns=["ERROR"],
background=True,
)
assert resolved == ["ERROR"]
assert note == ""
def test_resolver_keeps_notify_when_no_watch(self):
"""Only notify_on_complete set → no conflict."""
from tools.terminal_tool import _resolve_notification_flag_conflict
resolved, note = _resolve_notification_flag_conflict(
notify_on_complete=True,
watch_patterns=None,
background=True,
)
assert resolved is None
assert note == ""
def test_resolver_inert_when_not_background(self):
"""Without background=True, the whole thing is a no-op."""
from tools.terminal_tool import _resolve_notification_flag_conflict
resolved, note = _resolve_notification_flag_conflict(
notify_on_complete=True,
watch_patterns=["ERROR"],
background=False,
)
assert resolved == ["ERROR"]
assert note == ""
# =========================================================================
# Global circuit breaker (cross-session overflow blocker)
# =========================================================================
class TestGlobalCircuitBreaker:
def test_trips_after_global_threshold(self, registry):
"""When >N matches fire across sessions in the window, breaker trips."""
sessions = [
_make_session(sid=f"proc_s{i}", watch_patterns=["E"])
for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 3)
]
# Each session fires exactly one match — individually well under the
# per-session cap. But collectively they should trip the global cap.
for s in sessions:
registry._check_watch_patterns(s, "E hit\n")
# Drain the queue and count event types.
watch_matches = 0
overflow_tripped = 0
while not registry.completion_queue.empty():
evt = registry.completion_queue.get_nowait()
if evt.get("type") == "watch_match":
watch_matches += 1
elif evt.get("type") == "watch_overflow_tripped":
overflow_tripped += 1
assert watch_matches == WATCH_GLOBAL_MAX_PER_WINDOW
assert overflow_tripped == 1
assert registry._global_watch_tripped_until > 0
def test_cooldown_suppresses_and_then_releases(self, registry):
"""After trip, further events are suppressed; cooldown expiry emits release."""
# Spawn enough fresh sessions to trip the global breaker.
sessions = [
_make_session(sid=f"proc_t{i}", watch_patterns=["E"])
for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 1)
]
for s in sessions:
registry._check_watch_patterns(s, "E hit\n")
assert registry._global_watch_tripped_until > 0
# Further matches from BRAND-NEW sessions during cooldown are dropped.
q_size_before = registry.completion_queue.qsize()
extra1 = _make_session(sid="proc_extra1", watch_patterns=["E"])
extra2 = _make_session(sid="proc_extra2", watch_patterns=["E"])
registry._check_watch_patterns(extra1, "E hit\n")
registry._check_watch_patterns(extra2, "E hit\n")
assert registry.completion_queue.qsize() == q_size_before # no new events
assert registry._global_watch_suppressed_during_trip >= 2
# Simulate cooldown expiry.
registry._global_watch_tripped_until = time.time() - 1
# Next call admits AND emits the release summary.
released_session = _make_session(sid="proc_after", watch_patterns=["E"])
registry._check_watch_patterns(released_session, "E hit\n")
released = False
admitted = False
while not registry.completion_queue.empty():
evt = registry.completion_queue.get_nowait()
if evt.get("type") == "watch_overflow_released":
released = True
assert evt["suppressed"] >= 2
elif evt.get("type") == "watch_match":
admitted = True
assert released
assert admitted

View File

@ -58,10 +58,20 @@ MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer
FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes
MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning)
# Watch pattern rate limiting
WATCH_MAX_PER_WINDOW = 8 # Max notifications delivered per window
WATCH_WINDOW_SECONDS = 10 # Rolling window length
WATCH_OVERLOAD_KILL_SECONDS = 45 # Sustained overload duration before disabling watch
# Watch pattern rate limiting — PER SESSION.
# Hard rule: at most ONE watch-match notification every WATCH_MIN_INTERVAL_SECONDS.
# Any match arriving inside that cooldown window is dropped and counted as a strike.
# After WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns for that
# session is permanently disabled and the session falls back to notify_on_complete
# semantics (one notification when the process actually exits).
WATCH_MIN_INTERVAL_SECONDS = 15 # Minimum spacing between consecutive watch matches
WATCH_STRIKE_LIMIT = 3 # Strikes in a row → disable watch + promote to notify_on_complete
# Global circuit breaker — across all sessions. Secondary safety net so concurrent
# siblings can't collectively flood the user even when each is under its own cap.
WATCH_GLOBAL_MAX_PER_WINDOW = 15
WATCH_GLOBAL_WINDOW_SECONDS = 10
WATCH_GLOBAL_COOLDOWN_SECONDS = 30
def format_uptime_short(seconds: int) -> str:
@ -105,10 +115,18 @@ class ProcessSession:
watch_patterns: List[str] = field(default_factory=list)
_watch_hits: int = field(default=0, repr=False) # total matches delivered
_watch_suppressed: int = field(default=0, repr=False) # matches dropped by rate limit
_watch_overload_since: float = field(default=0.0, repr=False) # when sustained overload began
_watch_disabled: bool = field(default=False, repr=False) # permanently killed by overload
_watch_window_hits: int = field(default=0, repr=False) # hits in current rate window
_watch_window_start: float = field(default=0.0, repr=False)
_watch_disabled: bool = field(default=False, repr=False) # permanently killed after strike limit
# Per-session rate limit state: at most one match every WATCH_MIN_INTERVAL_SECONDS.
# When an emission happens, _watch_cooldown_until is set to now + interval and
# _watch_strike_candidate becomes True. The next match to arrive before that
# deadline counts as one strike (regardless of how many matches were dropped in
# between — a strike is a window, not a match). After WATCH_STRIKE_LIMIT strikes
# in a row, watch_patterns is disabled and the session promotes to
# notify_on_complete.
_watch_last_emit_at: float = field(default=0.0, repr=False)
_watch_cooldown_until: float = field(default=0.0, repr=False)
_watch_strike_candidate: bool = field(default=False, repr=False)
_watch_consecutive_strikes: int = field(default=0, repr=False)
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
@ -151,6 +169,15 @@ class ProcessRegistry:
# via wait/poll/log. Drain loops skip notifications for these.
self._completion_consumed: set = set()
# Global watch-match circuit breaker — across all sessions.
# Prevents sibling processes from collectively flooding the user even
# when each stays under its own per-session cap.
self._global_watch_lock = threading.Lock()
self._global_watch_window_start: float = 0.0
self._global_watch_window_hits: int = 0
self._global_watch_tripped_until: float = 0.0
self._global_watch_suppressed_during_trip: int = 0
@staticmethod
def _clean_shell_noise(text: str) -> str:
"""Strip shell startup warnings from the beginning of output."""
@ -163,12 +190,23 @@ class ProcessRegistry:
"""Scan new output for watch patterns and queue notifications.
Called from reader threads with new_text being the freshly-read chunk.
Rate-limited: max WATCH_MAX_PER_WINDOW notifications per WATCH_WINDOW_SECONDS.
If sustained overload exceeds WATCH_OVERLOAD_KILL_SECONDS, watching is
disabled permanently for this process.
Per-session rate limit: at most ONE watch-match notification per
WATCH_MIN_INTERVAL_SECONDS. Any match arriving inside the cooldown
window is dropped and counts as ONE strike for that window. After
WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns is
disabled for this session and the session is promoted to
notify_on_complete semantics one notification when the process
actually exits, no more mid-process spam.
"""
if not session.watch_patterns or session._watch_disabled:
return
# Suppress-after-exit: once the reader loop has declared the process
# exited, any late chunk we still see is post-exit noise. Dropping these
# prevents the "stale notifications delivered minutes after the process
# ended" spam when completion_queue consumers run async.
if session.exited:
return
# Scan new text line-by-line for pattern matches
matched_lines = []
@ -185,55 +223,80 @@ class ProcessRegistry:
return
now = time.time()
should_disable = False
with session._lock:
# Reset window if it's expired
if now - session._watch_window_start >= WATCH_WINDOW_SECONDS:
session._watch_window_hits = 0
session._watch_window_start = now
# Check rate limit
if session._watch_window_hits >= WATCH_MAX_PER_WINDOW:
# Case 1: still inside the cooldown from the last emission.
# Count this as a strike for the current window (only once per window)
# and drop the event. If we've hit the strike limit, disable watch
# and promote to notify_on_complete.
if session._watch_cooldown_until and now < session._watch_cooldown_until:
session._watch_suppressed += len(matched_lines)
if not session._watch_strike_candidate:
# First drop in this window — count one strike.
session._watch_strike_candidate = True
session._watch_consecutive_strikes += 1
if session._watch_consecutive_strikes >= WATCH_STRIKE_LIMIT:
session._watch_disabled = True
# Promote to notify_on_complete so the agent still gets
# exactly one notification when the process actually ends.
session.notify_on_complete = True
should_disable = True
return_early = True
else:
# Case 2: cooldown has expired.
# Decide whether this window was a "clean" one (no drops) or a
# strike window. If no strike candidate was set during the prior
# cooldown, reset the consecutive-strike counter — we're back to
# healthy emission cadence.
if (
session._watch_cooldown_until
and not session._watch_strike_candidate
):
session._watch_consecutive_strikes = 0
session._watch_strike_candidate = False
# Track sustained overload for kill switch
if session._watch_overload_since == 0.0:
session._watch_overload_since = now
elif now - session._watch_overload_since > WATCH_OVERLOAD_KILL_SECONDS:
session._watch_disabled = True
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_disabled",
"suppressed": session._watch_suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message": (
f"Watch patterns disabled for process {session.id}"
f"too many matches ({session._watch_suppressed} suppressed). "
f"Use process(action='poll') to check output manually."
),
})
return
# Emit the notification and start a new cooldown window.
session._watch_last_emit_at = now
session._watch_cooldown_until = now + WATCH_MIN_INTERVAL_SECONDS
session._watch_hits += 1
suppressed = session._watch_suppressed
session._watch_suppressed = 0
return_early = False
# Under the rate limit — deliver notification
session._watch_window_hits += 1
session._watch_hits += 1
# Clear overload tracker since we got a delivery through
session._watch_overload_since = 0.0
# Include suppressed count if any events were dropped
suppressed = session._watch_suppressed
session._watch_suppressed = 0
if return_early:
if should_disable:
# Emit exactly one "watch disabled, falling back to notify_on_complete"
# summary event so the agent/user sees why things went quiet.
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
"command": session.command,
"type": "watch_disabled",
"suppressed": session._watch_suppressed,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"user_id": session.watcher_user_id,
"user_name": session.watcher_user_name,
"thread_id": session.watcher_thread_id,
"message": (
f"Watch patterns disabled for process {session.id}"
f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered "
f"(min spacing {WATCH_MIN_INTERVAL_SECONDS}s). "
f"Falling back to notify_on_complete semantics; you'll get "
f"exactly one notification when the process exits."
),
})
return
# Trim matched output to a reasonable size
output = "\n".join(matched_lines[:20])
if len(output) > 2000:
output = output[:2000] + "\n...(truncated)"
# Global circuit breaker — across all sessions (secondary safety net).
if not self._global_watch_admit(now):
return
self.completion_queue.put({
"session_id": session.id,
"session_key": session.session_key,
@ -249,6 +312,93 @@ class ProcessRegistry:
"thread_id": session.watcher_thread_id,
})
def _global_watch_admit(self, now: float) -> bool:
"""Return True if this watch_match event is allowed through the global breaker.
Semantics:
- If we're currently in a cooldown period, drop the event and count it.
- Otherwise, slide the rolling window and check the global cap.
- If the cap is exceeded, trip the breaker for WATCH_GLOBAL_COOLDOWN_SECONDS
and emit ONE summary event so the agent/user sees "N notifications were
suppressed" instead of getting them individually.
- When the cooldown ends, emit a release summary and reset counters.
"""
with self._global_watch_lock:
# Handle cooldown expiry first so we can emit the release summary.
if self._global_watch_tripped_until and now >= self._global_watch_tripped_until:
suppressed = self._global_watch_suppressed_during_trip
self._global_watch_tripped_until = 0.0
self._global_watch_suppressed_during_trip = 0
self._global_watch_window_start = now
self._global_watch_window_hits = 0
if suppressed > 0:
# Queue a summary event outside the lock (below).
release_msg = {
"session_id": "",
"session_key": "",
"command": "",
"type": "watch_overflow_released",
"suppressed": suppressed,
"message": (
f"Watch-pattern notifications resumed. "
f"{suppressed} match event(s) were suppressed during the flood."
),
"platform": "",
"chat_id": "",
"user_id": "",
"user_name": "",
"thread_id": "",
}
else:
release_msg = None
else:
release_msg = None
# Still in cooldown — drop and count.
if self._global_watch_tripped_until and now < self._global_watch_tripped_until:
self._global_watch_suppressed_during_trip += 1
admit = False
trip_now = None
else:
# Slide the window.
if now - self._global_watch_window_start >= WATCH_GLOBAL_WINDOW_SECONDS:
self._global_watch_window_start = now
self._global_watch_window_hits = 0
if self._global_watch_window_hits >= WATCH_GLOBAL_MAX_PER_WINDOW:
# Trip the breaker.
self._global_watch_tripped_until = now + WATCH_GLOBAL_COOLDOWN_SECONDS
self._global_watch_suppressed_during_trip += 1
trip_now = now
admit = False
else:
self._global_watch_window_hits += 1
trip_now = None
admit = True
# Queue summary events outside the lock.
if release_msg is not None:
self.completion_queue.put(release_msg)
if trip_now is not None:
self.completion_queue.put({
"session_id": "",
"session_key": "",
"command": "",
"type": "watch_overflow_tripped",
"message": (
f"Watch-pattern overflow: >{WATCH_GLOBAL_MAX_PER_WINDOW} "
f"notifications in {WATCH_GLOBAL_WINDOW_SECONDS}s across all processes. "
f"Suppressing further watch_match events for "
f"{WATCH_GLOBAL_COOLDOWN_SECONDS}s."
),
"platform": "",
"chat_id": "",
"user_id": "",
"user_name": "",
"thread_id": "",
})
return admit
@staticmethod
def _is_host_pid_alive(pid: Optional[int]) -> bool:
"""Best-effort liveness check for host-visible PIDs."""

View File

@ -1388,6 +1388,33 @@ def _foreground_background_guidance(command: str) -> str | None:
return None
def _resolve_notification_flag_conflict(
*,
notify_on_complete: bool,
watch_patterns,
background: bool,
) -> tuple:
"""Decide what to do when both notify_on_complete and watch_patterns are set.
These flags produce duplicate, delayed notifications when combined one
notification per watch-pattern match AND one on process exit, with async
delivery that can spam the user long after the process ends. When both are
set, we drop watch_patterns in favor of notify_on_complete (the more useful
"let me know when it's done" signal) and return a human-readable note.
Returns:
(watch_patterns_to_use, conflict_note). conflict_note is "" when there
is no conflict.
"""
if background and notify_on_complete and watch_patterns:
note = (
"watch_patterns ignored because notify_on_complete=True; "
"these two flags produce duplicate notifications when combined"
)
return None, note
return watch_patterns, ""
def terminal_tool(
command: str,
background: bool = False,
@ -1410,8 +1437,8 @@ def terminal_tool(
force: If True, skip dangerous command check (use after user confirms)
workdir: Working directory for this command (optional, uses session cwd if not set)
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
notify_on_complete: If True and background=True, auto-notify the agent when the process exits
watch_patterns: List of strings to watch for in background output; fires a notification on first match per pattern. Use ONLY for mid-process signals (errors, readiness markers) that appear before exit. For end-of-run markers use notify_on_complete instead stacking both produces duplicate, delayed notifications.
notify_on_complete: If True and background=True, you'll be notified exactly once when the process exits. The right choice for almost every long task. MUTUALLY EXCLUSIVE with watch_patterns.
watch_patterns: List of strings to watch for in background output. HARD rate limit: 1 notification per 15s per process. After 3 strike windows in a row, watch_patterns is disabled and the session is auto-promoted to notify_on_complete. Use ONLY for rare, one-shot mid-process signals on long-lived processes (server readiness, migration-done markers). NEVER use in loops/batch jobs error patterns there will hit the strike limit and get disabled. MUTUALLY EXCLUSIVE with notify_on_complete set one, not both.
Returns:
str: JSON string with output, exit_code, and error fields
@ -1701,6 +1728,22 @@ def terminal_tool(
proc_session.watcher_user_name = _gw_user_name
proc_session.watcher_thread_id = _gw_thread_id
# Mutual exclusion: if both notify_on_complete and watch_patterns
# are set, drop watch_patterns. The combination produces duplicate
# notifications (one per match + one on exit) that deliver
# asynchronously and can spam the user long after the process ends.
# notify_on_complete is the more useful signal for "let me know
# when the task finishes"; watch_patterns should be reserved for
# standalone mid-process signals on long-lived processes.
watch_patterns, conflict_note = _resolve_notification_flag_conflict(
notify_on_complete=bool(notify_on_complete),
watch_patterns=watch_patterns,
background=bool(background),
)
if conflict_note:
logger.warning("background proc %s: %s", proc_session.id, conflict_note)
result_data["watch_patterns_ignored"] = conflict_note
# Mark for agent notification on completion
if notify_on_complete and background:
proc_session.notify_on_complete = True
@ -2039,13 +2082,13 @@ TERMINAL_SCHEMA = {
},
"notify_on_complete": {
"type": "boolean",
"description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.",
"description": "When true (and background=true), you'll be automatically notified exactly once when the process finishes. **This is the right choice for almost every long-running task** — tests, builds, deployments, multi-item batch jobs, anything that takes over a minute and has a defined end. Use this and keep working on other things; the system notifies you on exit. MUTUALLY EXCLUSIVE with watch_patterns — when both are set, watch_patterns is dropped.",
"default": False
},
"watch_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "Strings to watch for in background process output. Fires a notification the first time each pattern matches a line of output. **Use ONLY for mid-process signals** you want to react to before the process exits — errors, readiness markers, intermediate step markers (e.g. [\"ERROR\", \"Traceback\", \"listening on port\"]). Do NOT use for end-of-run markers (summary headers, 'DONE', 'PASS' printed right before exit) — use `notify_on_complete` for that instead. Stacking end-of-run patterns on top of `notify_on_complete` produces duplicate, delayed notifications that arrive after you've already moved on, since delivery is asynchronous and continues after the process exits."
"description": "Strings to watch for in background process output. HARD RATE LIMIT: at most 1 notification per 15 seconds per process — matches arriving inside the cooldown are dropped. After 3 consecutive 15-second windows with dropped matches, watch_patterns is automatically disabled for that process and promoted to notify_on_complete behavior (one notification on exit, no more mid-process spam). USE ONLY for truly rare, one-shot mid-process signals on LONG-LIVED processes that will never exit on their own — e.g. ['Application startup complete'] on a server so you know when to hit its endpoint, or ['migration done'] on a daemon. DO NOT use for: (1) end-of-run markers like 'DONE'/'PASS' — use notify_on_complete instead; (2) error patterns like 'ERROR'/'Traceback' in loops or multi-item batch jobs — they fire on every iteration and you'll hit the strike limit fast; (3) anything you'd ever combine with notify_on_complete. When in doubt, choose notify_on_complete. MUTUALLY EXCLUSIVE with notify_on_complete — set one, not both."
}
},
"required": ["command"]