diff --git a/tests/tools/test_watch_patterns.py b/tests/tools/test_watch_patterns.py index 0621edc1..b2d8677b 100644 --- a/tests/tools/test_watch_patterns.py +++ b/tests/tools/test_watch_patterns.py @@ -19,9 +19,11 @@ from unittest.mock import patch from tools.process_registry import ( ProcessRegistry, ProcessSession, - WATCH_MAX_PER_WINDOW, - WATCH_WINDOW_SECONDS, - WATCH_OVERLOAD_KILL_SECONDS, + WATCH_MIN_INTERVAL_SECONDS, + WATCH_STRIKE_LIMIT, + WATCH_GLOBAL_MAX_PER_WINDOW, + WATCH_GLOBAL_WINDOW_SECONDS, + WATCH_GLOBAL_COOLDOWN_SECONDS, ) @@ -129,10 +131,15 @@ class TestCheckWatchPatterns: assert registry.completion_queue.empty() def test_hit_counter_increments(self, registry): - """Each delivered notification increments _watch_hits.""" + """Each delivered notification increments _watch_hits. + + With 1/15s rate limit, we need to reset cooldown between calls. + """ session = _make_session(watch_patterns=["X"]) registry._check_watch_patterns(session, "X\n") assert session._watch_hits == 1 + # Reset cooldown so the second match gets delivered. + session._watch_cooldown_until = 0.0 registry._check_watch_patterns(session, "X\n") assert session._watch_hits == 2 @@ -148,100 +155,114 @@ class TestCheckWatchPatterns: # ========================================================================= -# Rate limiting +# Per-session rate limiting: 1 notification per 15s, 3 strikes → disable # ========================================================================= -class TestRateLimiting: - def test_within_window_limit(self, registry): - """Notifications within the rate limit all get delivered.""" +class TestPerSessionRateLimit: + def test_first_match_delivers(self, registry): + """A fresh session with no prior cooldown delivers the first match.""" session = _make_session(watch_patterns=["E"]) - for i in range(WATCH_MAX_PER_WINDOW): - registry._check_watch_patterns(session, f"E {i}\n") - assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW + registry._check_watch_patterns(session, "E first\n") + assert registry.completion_queue.qsize() == 1 + evt = registry.completion_queue.get_nowait() + assert evt["type"] == "watch_match" + assert session._watch_hits == 1 + # Cooldown is now armed. + assert session._watch_cooldown_until > 0 - def test_exceeds_window_limit(self, registry): - """Notifications beyond the rate limit are suppressed.""" + def test_second_match_within_cooldown_is_suppressed(self, registry): + """A second match inside the 15s cooldown is dropped and counted.""" session = _make_session(watch_patterns=["E"]) - for i in range(WATCH_MAX_PER_WINDOW + 5): - registry._check_watch_patterns(session, f"E {i}\n") - # Only WATCH_MAX_PER_WINDOW should be in the queue - assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW - assert session._watch_suppressed == 5 - - def test_window_resets(self, registry): - """After the window expires, notifications can flow again.""" - session = _make_session(watch_patterns=["E"]) - # Fill the window - for i in range(WATCH_MAX_PER_WINDOW): - registry._check_watch_patterns(session, f"E {i}\n") - # One more should be suppressed - registry._check_watch_patterns(session, "E extra\n") + registry._check_watch_patterns(session, "E first\n") + assert registry.completion_queue.qsize() == 1 + # Immediately trigger another match — well inside cooldown. + registry._check_watch_patterns(session, "E second\n") + # Still only one notification. + assert registry.completion_queue.qsize() == 1 assert session._watch_suppressed == 1 + assert session._watch_consecutive_strikes == 1 - # Fast-forward past window - session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1 - registry._check_watch_patterns(session, "E after reset\n") - # Should deliver now (window reset) - assert registry.completion_queue.qsize() == WATCH_MAX_PER_WINDOW + 1 - - def test_suppressed_count_in_next_delivery(self, registry): - """Suppressed count is reported in the next successful delivery.""" + def test_many_drops_inside_window_count_as_ONE_strike(self, registry): + """Multiple suppressions inside the same cooldown window = 1 strike.""" session = _make_session(watch_patterns=["E"]) - for i in range(WATCH_MAX_PER_WINDOW): - registry._check_watch_patterns(session, f"E {i}\n") - # Suppress 3 more - for i in range(3): - registry._check_watch_patterns(session, f"E suppressed {i}\n") - assert session._watch_suppressed == 3 + registry._check_watch_patterns(session, "E\n") + for _ in range(10): + registry._check_watch_patterns(session, "E\n") + assert session._watch_consecutive_strikes == 1 + assert session._watch_suppressed == 10 - # Fast-forward past window to allow delivery - session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1 - registry._check_watch_patterns(session, "E back\n") - # Drain to the last event - last_evt = None - while not registry.completion_queue.empty(): - last_evt = registry.completion_queue.get_nowait() - assert last_evt["suppressed"] == 3 - assert session._watch_suppressed == 0 # reset after delivery - - -# ========================================================================= -# Overload kill switch -# ========================================================================= - -class TestOverloadKillSwitch: - def test_sustained_overload_disables(self, registry): - """Sustained overload beyond threshold permanently disables watching.""" + def test_three_strikes_disables_watch_and_promotes_to_notify(self, registry): + """Three consecutive strike windows → watch_disabled + notify_on_complete.""" session = _make_session(watch_patterns=["E"]) - # Fill the window to trigger rate limit - for i in range(WATCH_MAX_PER_WINDOW): - registry._check_watch_patterns(session, f"E {i}\n") + session.notify_on_complete = False - # Simulate sustained overload: set overload_since to past threshold - session._watch_overload_since = time.time() - WATCH_OVERLOAD_KILL_SECONDS - 1 - # Force another suppressed hit - registry._check_watch_patterns(session, "E overload\n") - registry._check_watch_patterns(session, "E overload2\n") + for strike in range(WATCH_STRIKE_LIMIT): + # Emit → arms cooldown. + registry._check_watch_patterns(session, f"E emit {strike}\n") + # Attempt while inside cooldown → one strike, dropped. + registry._check_watch_patterns(session, f"E drop {strike}\n") + # Fast-forward past the cooldown for the NEXT iteration, BUT leave + # the strike candidate set so the cooldown-expiry branch sees + # "this was a strike window" and doesn't reset the counter. + session._watch_cooldown_until = time.time() - 0.01 + # After WATCH_STRIKE_LIMIT strikes, the next attempt should find + # the session disabled. assert session._watch_disabled is True - # Should have a watch_disabled event in the queue + assert session.notify_on_complete is True + # One watch_disabled summary event should be in the queue. disabled_evts = [] + matches = 0 while not registry.completion_queue.empty(): evt = registry.completion_queue.get_nowait() if evt.get("type") == "watch_disabled": disabled_evts.append(evt) + elif evt.get("type") == "watch_match": + matches += 1 assert len(disabled_evts) == 1 - assert "too many matches" in disabled_evts[0]["message"] + assert "notify_on_complete" in disabled_evts[0]["message"] + # We should have had exactly WATCH_STRIKE_LIMIT emissions before disable. + assert matches == WATCH_STRIKE_LIMIT - def test_overload_resets_on_delivery(self, registry): - """Overload timer resets when a notification gets through.""" + def test_clean_window_resets_strike_counter(self, registry): + """A cooldown that expires with zero drops resets the consecutive counter.""" session = _make_session(watch_patterns=["E"]) - # Start overload tracking - session._watch_overload_since = time.time() - 10 - # But window allows delivery → overload should reset - registry._check_watch_patterns(session, "E ok\n") - assert session._watch_overload_since == 0.0 - assert session._watch_disabled is False + # Emit + drop inside window → 1 strike. + registry._check_watch_patterns(session, "E emit\n") + registry._check_watch_patterns(session, "E drop\n") + assert session._watch_consecutive_strikes == 1 + + # Fast-forward past cooldown. No match arrived during the window — + # strike_candidate stays False from the prior window's reset, but + # it was True during that window. On the NEXT emission, the + # cooldown-expiry branch checks strike_candidate. Since we emitted + # at the start of this new window and no drop has happened, the + # reset branch should fire. + session._watch_cooldown_until = time.time() - 0.01 + # Clear strike candidate to simulate "this cooldown had no drops". + session._watch_strike_candidate = False + registry._check_watch_patterns(session, "E clean\n") + assert session._watch_consecutive_strikes == 0 + + def test_suppressed_count_in_next_delivery(self, registry): + """Suppressed count from a strike window is reported in the next emit.""" + session = _make_session(watch_patterns=["E"]) + registry._check_watch_patterns(session, "E emit\n") + for _ in range(4): + registry._check_watch_patterns(session, "E drop\n") + assert session._watch_suppressed == 4 + + # Fast-forward past cooldown. + session._watch_cooldown_until = time.time() - 0.01 + # Drain the queue so we can inspect the next emission. + while not registry.completion_queue.empty(): + registry.completion_queue.get_nowait() + + registry._check_watch_patterns(session, "E back\n") + evt = registry.completion_queue.get_nowait() + assert evt["type"] == "watch_match" + assert evt["suppressed"] == 4 + assert session._watch_suppressed == 0 # reset after delivery # ========================================================================= @@ -321,3 +342,150 @@ class TestCodeExecutionBlocked: def test_watch_patterns_blocked(self): from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS + + +# ========================================================================= +# Suppress-after-exit (anti-spam fix) +# ========================================================================= + +class TestSuppressAfterExit: + def test_match_dropped_once_session_exited(self, registry): + """watch_patterns notifications stop the moment session.exited is set.""" + session = _make_session(watch_patterns=["ERROR"]) + # Mark the process as exited BEFORE the late chunk arrives. + session.exited = True + registry._check_watch_patterns(session, "ERROR: late buffer\n") + assert registry.completion_queue.empty() + assert session._watch_hits == 0 + + def test_match_still_delivered_while_session_running(self, registry): + """Sanity: while the process is still running, matches still deliver.""" + session = _make_session(watch_patterns=["ERROR"]) + session.exited = False + registry._check_watch_patterns(session, "ERROR: oh no\n") + assert not registry.completion_queue.empty() + evt = registry.completion_queue.get_nowait() + assert evt["type"] == "watch_match" + + +# ========================================================================= +# Mutual exclusion: notify_on_complete wins over watch_patterns +# ========================================================================= + +class TestMutualExclusion: + def test_resolver_drops_watch_when_notify_set(self): + """Both flags set → watch_patterns dropped with a note.""" + from tools.terminal_tool import _resolve_notification_flag_conflict + + resolved, note = _resolve_notification_flag_conflict( + notify_on_complete=True, + watch_patterns=["ERROR", "DONE"], + background=True, + ) + assert resolved is None + assert "notify_on_complete" in note + assert "duplicate notifications" in note + + def test_resolver_keeps_watch_when_notify_off(self): + """notify_on_complete=False → watch_patterns kept intact.""" + from tools.terminal_tool import _resolve_notification_flag_conflict + + resolved, note = _resolve_notification_flag_conflict( + notify_on_complete=False, + watch_patterns=["ERROR"], + background=True, + ) + assert resolved == ["ERROR"] + assert note == "" + + def test_resolver_keeps_notify_when_no_watch(self): + """Only notify_on_complete set → no conflict.""" + from tools.terminal_tool import _resolve_notification_flag_conflict + + resolved, note = _resolve_notification_flag_conflict( + notify_on_complete=True, + watch_patterns=None, + background=True, + ) + assert resolved is None + assert note == "" + + def test_resolver_inert_when_not_background(self): + """Without background=True, the whole thing is a no-op.""" + from tools.terminal_tool import _resolve_notification_flag_conflict + + resolved, note = _resolve_notification_flag_conflict( + notify_on_complete=True, + watch_patterns=["ERROR"], + background=False, + ) + assert resolved == ["ERROR"] + assert note == "" + + +# ========================================================================= +# Global circuit breaker (cross-session overflow blocker) +# ========================================================================= + +class TestGlobalCircuitBreaker: + def test_trips_after_global_threshold(self, registry): + """When >N matches fire across sessions in the window, breaker trips.""" + sessions = [ + _make_session(sid=f"proc_s{i}", watch_patterns=["E"]) + for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 3) + ] + # Each session fires exactly one match — individually well under the + # per-session cap. But collectively they should trip the global cap. + for s in sessions: + registry._check_watch_patterns(s, "E hit\n") + + # Drain the queue and count event types. + watch_matches = 0 + overflow_tripped = 0 + while not registry.completion_queue.empty(): + evt = registry.completion_queue.get_nowait() + if evt.get("type") == "watch_match": + watch_matches += 1 + elif evt.get("type") == "watch_overflow_tripped": + overflow_tripped += 1 + assert watch_matches == WATCH_GLOBAL_MAX_PER_WINDOW + assert overflow_tripped == 1 + assert registry._global_watch_tripped_until > 0 + + def test_cooldown_suppresses_and_then_releases(self, registry): + """After trip, further events are suppressed; cooldown expiry emits release.""" + # Spawn enough fresh sessions to trip the global breaker. + sessions = [ + _make_session(sid=f"proc_t{i}", watch_patterns=["E"]) + for i in range(WATCH_GLOBAL_MAX_PER_WINDOW + 1) + ] + for s in sessions: + registry._check_watch_patterns(s, "E hit\n") + assert registry._global_watch_tripped_until > 0 + + # Further matches from BRAND-NEW sessions during cooldown are dropped. + q_size_before = registry.completion_queue.qsize() + extra1 = _make_session(sid="proc_extra1", watch_patterns=["E"]) + extra2 = _make_session(sid="proc_extra2", watch_patterns=["E"]) + registry._check_watch_patterns(extra1, "E hit\n") + registry._check_watch_patterns(extra2, "E hit\n") + assert registry.completion_queue.qsize() == q_size_before # no new events + assert registry._global_watch_suppressed_during_trip >= 2 + + # Simulate cooldown expiry. + registry._global_watch_tripped_until = time.time() - 1 + + # Next call admits AND emits the release summary. + released_session = _make_session(sid="proc_after", watch_patterns=["E"]) + registry._check_watch_patterns(released_session, "E hit\n") + released = False + admitted = False + while not registry.completion_queue.empty(): + evt = registry.completion_queue.get_nowait() + if evt.get("type") == "watch_overflow_released": + released = True + assert evt["suppressed"] >= 2 + elif evt.get("type") == "watch_match": + admitted = True + assert released + assert admitted diff --git a/tools/process_registry.py b/tools/process_registry.py index ec510cae..57709bc2 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -58,10 +58,20 @@ MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning) -# Watch pattern rate limiting -WATCH_MAX_PER_WINDOW = 8 # Max notifications delivered per window -WATCH_WINDOW_SECONDS = 10 # Rolling window length -WATCH_OVERLOAD_KILL_SECONDS = 45 # Sustained overload duration before disabling watch +# Watch pattern rate limiting — PER SESSION. +# Hard rule: at most ONE watch-match notification every WATCH_MIN_INTERVAL_SECONDS. +# Any match arriving inside that cooldown window is dropped and counted as a strike. +# After WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns for that +# session is permanently disabled and the session falls back to notify_on_complete +# semantics (one notification when the process actually exits). +WATCH_MIN_INTERVAL_SECONDS = 15 # Minimum spacing between consecutive watch matches +WATCH_STRIKE_LIMIT = 3 # Strikes in a row → disable watch + promote to notify_on_complete + +# Global circuit breaker — across all sessions. Secondary safety net so concurrent +# siblings can't collectively flood the user even when each is under its own cap. +WATCH_GLOBAL_MAX_PER_WINDOW = 15 +WATCH_GLOBAL_WINDOW_SECONDS = 10 +WATCH_GLOBAL_COOLDOWN_SECONDS = 30 def format_uptime_short(seconds: int) -> str: @@ -105,10 +115,18 @@ class ProcessSession: watch_patterns: List[str] = field(default_factory=list) _watch_hits: int = field(default=0, repr=False) # total matches delivered _watch_suppressed: int = field(default=0, repr=False) # matches dropped by rate limit - _watch_overload_since: float = field(default=0.0, repr=False) # when sustained overload began - _watch_disabled: bool = field(default=False, repr=False) # permanently killed by overload - _watch_window_hits: int = field(default=0, repr=False) # hits in current rate window - _watch_window_start: float = field(default=0.0, repr=False) + _watch_disabled: bool = field(default=False, repr=False) # permanently killed after strike limit + # Per-session rate limit state: at most one match every WATCH_MIN_INTERVAL_SECONDS. + # When an emission happens, _watch_cooldown_until is set to now + interval and + # _watch_strike_candidate becomes True. The next match to arrive before that + # deadline counts as one strike (regardless of how many matches were dropped in + # between — a strike is a window, not a match). After WATCH_STRIKE_LIMIT strikes + # in a row, watch_patterns is disabled and the session promotes to + # notify_on_complete. + _watch_last_emit_at: float = field(default=0.0, repr=False) + _watch_cooldown_until: float = field(default=0.0, repr=False) + _watch_strike_candidate: bool = field(default=False, repr=False) + _watch_consecutive_strikes: int = field(default=0, repr=False) _lock: threading.Lock = field(default_factory=threading.Lock) _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) _pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True) @@ -151,6 +169,15 @@ class ProcessRegistry: # via wait/poll/log. Drain loops skip notifications for these. self._completion_consumed: set = set() + # Global watch-match circuit breaker — across all sessions. + # Prevents sibling processes from collectively flooding the user even + # when each stays under its own per-session cap. + self._global_watch_lock = threading.Lock() + self._global_watch_window_start: float = 0.0 + self._global_watch_window_hits: int = 0 + self._global_watch_tripped_until: float = 0.0 + self._global_watch_suppressed_during_trip: int = 0 + @staticmethod def _clean_shell_noise(text: str) -> str: """Strip shell startup warnings from the beginning of output.""" @@ -163,12 +190,23 @@ class ProcessRegistry: """Scan new output for watch patterns and queue notifications. Called from reader threads with new_text being the freshly-read chunk. - Rate-limited: max WATCH_MAX_PER_WINDOW notifications per WATCH_WINDOW_SECONDS. - If sustained overload exceeds WATCH_OVERLOAD_KILL_SECONDS, watching is - disabled permanently for this process. + + Per-session rate limit: at most ONE watch-match notification per + WATCH_MIN_INTERVAL_SECONDS. Any match arriving inside the cooldown + window is dropped and counts as ONE strike for that window. After + WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns is + disabled for this session and the session is promoted to + notify_on_complete semantics — one notification when the process + actually exits, no more mid-process spam. """ if not session.watch_patterns or session._watch_disabled: return + # Suppress-after-exit: once the reader loop has declared the process + # exited, any late chunk we still see is post-exit noise. Dropping these + # prevents the "stale notifications delivered minutes after the process + # ended" spam when completion_queue consumers run async. + if session.exited: + return # Scan new text line-by-line for pattern matches matched_lines = [] @@ -185,55 +223,80 @@ class ProcessRegistry: return now = time.time() + should_disable = False with session._lock: - # Reset window if it's expired - if now - session._watch_window_start >= WATCH_WINDOW_SECONDS: - session._watch_window_hits = 0 - session._watch_window_start = now - - # Check rate limit - if session._watch_window_hits >= WATCH_MAX_PER_WINDOW: + # Case 1: still inside the cooldown from the last emission. + # Count this as a strike for the current window (only once per window) + # and drop the event. If we've hit the strike limit, disable watch + # and promote to notify_on_complete. + if session._watch_cooldown_until and now < session._watch_cooldown_until: session._watch_suppressed += len(matched_lines) + if not session._watch_strike_candidate: + # First drop in this window — count one strike. + session._watch_strike_candidate = True + session._watch_consecutive_strikes += 1 + if session._watch_consecutive_strikes >= WATCH_STRIKE_LIMIT: + session._watch_disabled = True + # Promote to notify_on_complete so the agent still gets + # exactly one notification when the process actually ends. + session.notify_on_complete = True + should_disable = True + return_early = True + else: + # Case 2: cooldown has expired. + # Decide whether this window was a "clean" one (no drops) or a + # strike window. If no strike candidate was set during the prior + # cooldown, reset the consecutive-strike counter — we're back to + # healthy emission cadence. + if ( + session._watch_cooldown_until + and not session._watch_strike_candidate + ): + session._watch_consecutive_strikes = 0 + session._watch_strike_candidate = False - # Track sustained overload for kill switch - if session._watch_overload_since == 0.0: - session._watch_overload_since = now - elif now - session._watch_overload_since > WATCH_OVERLOAD_KILL_SECONDS: - session._watch_disabled = True - self.completion_queue.put({ - "session_id": session.id, - "session_key": session.session_key, - "command": session.command, - "type": "watch_disabled", - "suppressed": session._watch_suppressed, - "platform": session.watcher_platform, - "chat_id": session.watcher_chat_id, - "user_id": session.watcher_user_id, - "user_name": session.watcher_user_name, - "thread_id": session.watcher_thread_id, - "message": ( - f"Watch patterns disabled for process {session.id} — " - f"too many matches ({session._watch_suppressed} suppressed). " - f"Use process(action='poll') to check output manually." - ), - }) - return + # Emit the notification and start a new cooldown window. + session._watch_last_emit_at = now + session._watch_cooldown_until = now + WATCH_MIN_INTERVAL_SECONDS + session._watch_hits += 1 + suppressed = session._watch_suppressed + session._watch_suppressed = 0 + return_early = False - # Under the rate limit — deliver notification - session._watch_window_hits += 1 - session._watch_hits += 1 - # Clear overload tracker since we got a delivery through - session._watch_overload_since = 0.0 - - # Include suppressed count if any events were dropped - suppressed = session._watch_suppressed - session._watch_suppressed = 0 + if return_early: + if should_disable: + # Emit exactly one "watch disabled, falling back to notify_on_complete" + # summary event so the agent/user sees why things went quiet. + self.completion_queue.put({ + "session_id": session.id, + "session_key": session.session_key, + "command": session.command, + "type": "watch_disabled", + "suppressed": session._watch_suppressed, + "platform": session.watcher_platform, + "chat_id": session.watcher_chat_id, + "user_id": session.watcher_user_id, + "user_name": session.watcher_user_name, + "thread_id": session.watcher_thread_id, + "message": ( + f"Watch patterns disabled for process {session.id} — " + f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered " + f"(min spacing {WATCH_MIN_INTERVAL_SECONDS}s). " + f"Falling back to notify_on_complete semantics; you'll get " + f"exactly one notification when the process exits." + ), + }) + return # Trim matched output to a reasonable size output = "\n".join(matched_lines[:20]) if len(output) > 2000: output = output[:2000] + "\n...(truncated)" + # Global circuit breaker — across all sessions (secondary safety net). + if not self._global_watch_admit(now): + return + self.completion_queue.put({ "session_id": session.id, "session_key": session.session_key, @@ -249,6 +312,93 @@ class ProcessRegistry: "thread_id": session.watcher_thread_id, }) + def _global_watch_admit(self, now: float) -> bool: + """Return True if this watch_match event is allowed through the global breaker. + + Semantics: + - If we're currently in a cooldown period, drop the event and count it. + - Otherwise, slide the rolling window and check the global cap. + - If the cap is exceeded, trip the breaker for WATCH_GLOBAL_COOLDOWN_SECONDS + and emit ONE summary event so the agent/user sees "N notifications were + suppressed" instead of getting them individually. + - When the cooldown ends, emit a release summary and reset counters. + """ + with self._global_watch_lock: + # Handle cooldown expiry first so we can emit the release summary. + if self._global_watch_tripped_until and now >= self._global_watch_tripped_until: + suppressed = self._global_watch_suppressed_during_trip + self._global_watch_tripped_until = 0.0 + self._global_watch_suppressed_during_trip = 0 + self._global_watch_window_start = now + self._global_watch_window_hits = 0 + if suppressed > 0: + # Queue a summary event outside the lock (below). + release_msg = { + "session_id": "", + "session_key": "", + "command": "", + "type": "watch_overflow_released", + "suppressed": suppressed, + "message": ( + f"Watch-pattern notifications resumed. " + f"{suppressed} match event(s) were suppressed during the flood." + ), + "platform": "", + "chat_id": "", + "user_id": "", + "user_name": "", + "thread_id": "", + } + else: + release_msg = None + else: + release_msg = None + + # Still in cooldown — drop and count. + if self._global_watch_tripped_until and now < self._global_watch_tripped_until: + self._global_watch_suppressed_during_trip += 1 + admit = False + trip_now = None + else: + # Slide the window. + if now - self._global_watch_window_start >= WATCH_GLOBAL_WINDOW_SECONDS: + self._global_watch_window_start = now + self._global_watch_window_hits = 0 + + if self._global_watch_window_hits >= WATCH_GLOBAL_MAX_PER_WINDOW: + # Trip the breaker. + self._global_watch_tripped_until = now + WATCH_GLOBAL_COOLDOWN_SECONDS + self._global_watch_suppressed_during_trip += 1 + trip_now = now + admit = False + else: + self._global_watch_window_hits += 1 + trip_now = None + admit = True + + # Queue summary events outside the lock. + if release_msg is not None: + self.completion_queue.put(release_msg) + if trip_now is not None: + self.completion_queue.put({ + "session_id": "", + "session_key": "", + "command": "", + "type": "watch_overflow_tripped", + "message": ( + f"Watch-pattern overflow: >{WATCH_GLOBAL_MAX_PER_WINDOW} " + f"notifications in {WATCH_GLOBAL_WINDOW_SECONDS}s across all processes. " + f"Suppressing further watch_match events for " + f"{WATCH_GLOBAL_COOLDOWN_SECONDS}s." + ), + "platform": "", + "chat_id": "", + "user_id": "", + "user_name": "", + "thread_id": "", + }) + return admit + @staticmethod def _is_host_pid_alive(pid: Optional[int]) -> bool: """Best-effort liveness check for host-visible PIDs.""" diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index b288d4ad..b0f81b88 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1388,6 +1388,33 @@ def _foreground_background_guidance(command: str) -> str | None: return None +def _resolve_notification_flag_conflict( + *, + notify_on_complete: bool, + watch_patterns, + background: bool, +) -> tuple: + """Decide what to do when both notify_on_complete and watch_patterns are set. + + These flags produce duplicate, delayed notifications when combined — one + notification per watch-pattern match AND one on process exit, with async + delivery that can spam the user long after the process ends. When both are + set, we drop watch_patterns in favor of notify_on_complete (the more useful + "let me know when it's done" signal) and return a human-readable note. + + Returns: + (watch_patterns_to_use, conflict_note). conflict_note is "" when there + is no conflict. + """ + if background and notify_on_complete and watch_patterns: + note = ( + "watch_patterns ignored because notify_on_complete=True; " + "these two flags produce duplicate notifications when combined" + ) + return None, note + return watch_patterns, "" + + def terminal_tool( command: str, background: bool = False, @@ -1410,8 +1437,8 @@ def terminal_tool( force: If True, skip dangerous command check (use after user confirms) workdir: Working directory for this command (optional, uses session cwd if not set) pty: If True, use pseudo-terminal for interactive CLI tools (local backend only) - notify_on_complete: If True and background=True, auto-notify the agent when the process exits - watch_patterns: List of strings to watch for in background output; fires a notification on first match per pattern. Use ONLY for mid-process signals (errors, readiness markers) that appear before exit. For end-of-run markers use notify_on_complete instead — stacking both produces duplicate, delayed notifications. + notify_on_complete: If True and background=True, you'll be notified exactly once when the process exits. The right choice for almost every long task. MUTUALLY EXCLUSIVE with watch_patterns. + watch_patterns: List of strings to watch for in background output. HARD rate limit: 1 notification per 15s per process. After 3 strike windows in a row, watch_patterns is disabled and the session is auto-promoted to notify_on_complete. Use ONLY for rare, one-shot mid-process signals on long-lived processes (server readiness, migration-done markers). NEVER use in loops/batch jobs — error patterns there will hit the strike limit and get disabled. MUTUALLY EXCLUSIVE with notify_on_complete — set one, not both. Returns: str: JSON string with output, exit_code, and error fields @@ -1701,6 +1728,22 @@ def terminal_tool( proc_session.watcher_user_name = _gw_user_name proc_session.watcher_thread_id = _gw_thread_id + # Mutual exclusion: if both notify_on_complete and watch_patterns + # are set, drop watch_patterns. The combination produces duplicate + # notifications (one per match + one on exit) that deliver + # asynchronously and can spam the user long after the process ends. + # notify_on_complete is the more useful signal for "let me know + # when the task finishes"; watch_patterns should be reserved for + # standalone mid-process signals on long-lived processes. + watch_patterns, conflict_note = _resolve_notification_flag_conflict( + notify_on_complete=bool(notify_on_complete), + watch_patterns=watch_patterns, + background=bool(background), + ) + if conflict_note: + logger.warning("background proc %s: %s", proc_session.id, conflict_note) + result_data["watch_patterns_ignored"] = conflict_note + # Mark for agent notification on completion if notify_on_complete and background: proc_session.notify_on_complete = True @@ -2039,13 +2082,13 @@ TERMINAL_SCHEMA = { }, "notify_on_complete": { "type": "boolean", - "description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.", + "description": "When true (and background=true), you'll be automatically notified exactly once when the process finishes. **This is the right choice for almost every long-running task** — tests, builds, deployments, multi-item batch jobs, anything that takes over a minute and has a defined end. Use this and keep working on other things; the system notifies you on exit. MUTUALLY EXCLUSIVE with watch_patterns — when both are set, watch_patterns is dropped.", "default": False }, "watch_patterns": { "type": "array", "items": {"type": "string"}, - "description": "Strings to watch for in background process output. Fires a notification the first time each pattern matches a line of output. **Use ONLY for mid-process signals** you want to react to before the process exits — errors, readiness markers, intermediate step markers (e.g. [\"ERROR\", \"Traceback\", \"listening on port\"]). Do NOT use for end-of-run markers (summary headers, 'DONE', 'PASS' printed right before exit) — use `notify_on_complete` for that instead. Stacking end-of-run patterns on top of `notify_on_complete` produces duplicate, delayed notifications that arrive after you've already moved on, since delivery is asynchronous and continues after the process exits." + "description": "Strings to watch for in background process output. HARD RATE LIMIT: at most 1 notification per 15 seconds per process — matches arriving inside the cooldown are dropped. After 3 consecutive 15-second windows with dropped matches, watch_patterns is automatically disabled for that process and promoted to notify_on_complete behavior (one notification on exit, no more mid-process spam). USE ONLY for truly rare, one-shot mid-process signals on LONG-LIVED processes that will never exit on their own — e.g. ['Application startup complete'] on a server so you know when to hit its endpoint, or ['migration done'] on a daemon. DO NOT use for: (1) end-of-run markers like 'DONE'/'PASS' — use notify_on_complete instead; (2) error patterns like 'ERROR'/'Traceback' in loops or multi-item batch jobs — they fire on every iteration and you'll hit the strike limit fast; (3) anything you'd ever combine with notify_on_complete. When in doubt, choose notify_on_complete. MUTUALLY EXCLUSIVE with notify_on_complete — set one, not both." } }, "required": ["command"]