diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index e95c5195..e8e793c0 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -62,6 +62,7 @@ TOP_LEVEL_MODULES = { "configs_dir", "consolidation", "coordinator", + "event_log", "events", "executor_helpers", "heartbeat", diff --git a/workspace/config.py b/workspace/config.py index 6a256579..dce5e8e9 100644 --- a/workspace/config.py +++ b/workspace/config.py @@ -176,26 +176,68 @@ class SecurityScanConfig: operators who require a CVE gate know the gate is absent. Closes #268.""" +@dataclass +class EventLogConfig: + """Settings for the workspace event log (workspace/event_log.py). + + The event log is an append-and-query buffer for runtime events + (turn started, tool invoked, peer message delivered, …) that the + canvas Activity tab and platform-side `/activity` endpoint read. + Defaults are tuned for a long-running workspace: 1-hour TTL and a + 10k-entry cap together hold ~1 MB of events in memory at the + documented per-event size budget (~100 bytes payload). + + Example config.yaml snippet:: + + observability: + event_log: + backend: memory # or "disabled" to opt out + ttl_seconds: 3600 + max_entries: 10000 + """ + + backend: str = "memory" + """``memory`` (default) buffers events in process RAM with the + bounds below; ``disabled`` returns a no-op log so the canvas + Activity tab is silent. Unknown values fall back to ``memory`` — + a typo should not crash boot or silently drop telemetry.""" + + ttl_seconds: int = 3600 + """How long an event survives before TTL eviction. 1 hour covers + a long agentic loop comfortably without leaking; operators + debugging a slow drift may temporarily widen this, but be aware + the bound is RAM, not disk.""" + + max_entries: int = 10_000 + """Hard cap on resident events. Together with ``ttl_seconds`` this + bounds memory: the FIFO eviction drops oldest first, so a query + cursor that falls behind sees a contiguous tail rather than a + gappy log.""" + + @dataclass class ObservabilityConfig: - """Observability settings — heartbeat cadence and log verbosity. + """Observability settings — heartbeat cadence, log verbosity, event log. Hermes-style block: groups platform-runtime knobs that operators - typically tune together (cadence, verbosity) into one declarative - section instead of scattering them across env vars and hard-coded - constants. Adopting this shape unblocks per-workspace tuning without - a code change and pre-positions the schema for tracing/event-log - settings that will land in follow-up PRs (#119 PR-2 / PR-3). + typically tune together (cadence, verbosity, event-log retention) + into one declarative section instead of scattering them across env + vars and hard-coded constants. Adopting this shape unblocks + per-workspace tuning without a code change. - Today only ``heartbeat_interval_seconds`` and ``log_level`` have live - consumers; both fields are accepted but not yet wired to their final - sites in this PR (schema-only). Wiring lands in PR-3 of the series. + The ``event_log`` sub-block is schema-only in this PR (#119 PR-2); + consumer wiring (the canvas Activity tab + `/activity` endpoint + reading from the configured backend) lands in PR-3. Example config.yaml snippet:: observability: heartbeat_interval_seconds: 60 log_level: DEBUG + event_log: + backend: memory + ttl_seconds: 3600 + max_entries: 10000 """ heartbeat_interval_seconds: int = 30 @@ -212,6 +254,9 @@ class ObservabilityConfig: runtime reads ``LOG_LEVEL`` env; PR-3 of the #119 stack switches to this field with env still honored as an override for ops debugging.""" + event_log: EventLogConfig = field(default_factory=EventLogConfig) + """Event-log backend + retention bounds. See ``EventLogConfig``.""" + @dataclass class ComplianceConfig: @@ -337,6 +382,42 @@ def _derive_provider_from_model(model: str) -> str: return "" +_EVENT_LOG_VALID_BACKENDS = {"memory", "disabled"} + + +def _parse_event_log(raw: object) -> "EventLogConfig": + """Coerce the ``observability.event_log`` YAML block into EventLogConfig. + + Lenient like the rest of this parser: a missing block, a non-dict + value, or a bad backend name resolves to defaults rather than + raising at boot. The event_log is observability infra — a typo in + one field should not crash the workspace before any event can fire. + Bounds (ttl_seconds, max_entries) clamp to positives so a 0/-1 + misconfig doesn't disable the log silently; that's what + ``backend: disabled`` is for. + """ + if not isinstance(raw, dict): + return EventLogConfig() + backend = str(raw.get("backend", "memory")).strip().lower() + if backend not in _EVENT_LOG_VALID_BACKENDS: + backend = "memory" + try: + ttl_seconds = int(raw.get("ttl_seconds", 3600)) + except (TypeError, ValueError): + ttl_seconds = 3600 + if ttl_seconds <= 0: + ttl_seconds = 3600 + try: + max_entries = int(raw.get("max_entries", 10_000)) + except (TypeError, ValueError): + max_entries = 10_000 + if max_entries <= 0: + max_entries = 10_000 + return EventLogConfig( + backend=backend, ttl_seconds=ttl_seconds, max_entries=max_entries + ) + + def _clamp_heartbeat(value: object) -> int: """Coerce raw YAML/env input into the [5, 300]-second heartbeat band. @@ -526,6 +607,7 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig: observability_raw.get("heartbeat_interval_seconds", 30) ), log_level=str(observability_raw.get("log_level", "INFO")).upper(), + event_log=_parse_event_log(observability_raw.get("event_log", {})), ), sub_workspaces=raw.get("sub_workspaces", []), effort=str(raw.get("effort", "")), diff --git a/workspace/event_log.py b/workspace/event_log.py new file mode 100644 index 00000000..b6bd58e1 --- /dev/null +++ b/workspace/event_log.py @@ -0,0 +1,249 @@ +"""Workspace event log — append-and-query buffer for runtime events. + +Hermes-style declarative observability primitive. Adapter and platform +code emit semantic events (turn started, tool invoked, peer message +delivered) and external readers — the canvas Activity tab, A2A peers, +and the platform's `/workspaces/:id/activity` endpoint — query them +with a cursor. + +Today's PR ships the in-memory backend only. Redis backend lands in +the follow-up that wires platform-side fan-out (#119 PR-3 follow-up). +The Protocol shape lets a future backend swap in without touching the +emitting sites. + +Eviction is the load-bearing invariant: the workspace runtime is +long-lived, so an unbounded list would leak memory. Every append +prunes by both TTL and max_entries; readers that fall behind past +the eviction frontier see a contiguous tail without an error — the +cursor protocol only guarantees "events with id > since that are +still resident", not "every event ever appended". A reader that +needs at-least-once delivery must poll faster than the eviction TTL. +""" + +from __future__ import annotations + +import threading +import time +from collections import deque +from dataclasses import asdict, dataclass, field +from typing import Any, Deque, Iterable, Optional, Protocol + + +@dataclass(frozen=True) +class Event: + """One immutable entry in the event log. + + ``id`` is a monotonic integer assigned at append time. It SURVIVES + eviction — the counter is never reset when an old event drops out + of the buffer, so a reader's cursor stays valid even if the event + it points to has aged out (the next query just returns the resident + tail). This is the contract that lets a slow reader reconnect + without resetting to id=0. + """ + + id: int + timestamp: float + """Seconds since the Unix epoch — the same shape as ``time.time()`` + so callers can format with ``datetime.fromtimestamp`` without an + extra conversion. Float, not int, because event-bursts within the + same second need stable ordering for downstream merging.""" + + kind: str + """Short tag categorising the event: ``turn.started``, ``tool.invoked``, + ``peer.message.delivered``, etc. Convention is dotted snake_case so + the canvas can group by prefix without a parser.""" + + payload: dict = field(default_factory=dict) + """Arbitrary JSON-serialisable dict. Keep small — the in-memory + backend holds every event in process RAM. Large blobs (file + contents, full transcripts) belong in the platform's blob store + with a reference here, not the value itself.""" + + def to_dict(self) -> dict: + """Plain-dict shape for JSON serialisation in the API layer. + + Wrapping ``dataclasses.asdict`` rather than relying on the + consumer to call it themselves means the wire format stays + owned by this module — a rename of ``kind`` to ``type`` (or + whatever the canvas eventually settles on) flips here, not in + every reader. + """ + return asdict(self) + + +class EventLogBackend(Protocol): + """Backend Protocol — the swap point for memory ↔ redis ↔ disabled. + + Implementations must be safe to call from multiple threads. The + workspace runtime appends from the heartbeat thread, the agent's + main loop, and any A2A executor concurrently; readers run on the + HTTP server thread. A backend that needs locking owns it. + """ + + def append(self, kind: str, payload: Optional[dict] = None) -> Event: + """Add an event and return the persisted record (with id assigned).""" + ... + + def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]: + """Return events with ``id > since`` (or all resident if ``since`` is None). + + Order is ascending by id. ``limit`` caps the returned slice; + if the resident tail is shorter than ``limit``, returns what + is available. + """ + ... + + def clear(self) -> None: + """Drop all entries. Provided for test isolation, not for production callers.""" + ... + + +class InMemoryEventLog: + """Bounded in-memory ring buffer with TTL eviction. + + Two eviction triggers, both checked on every ``append`` (and on + ``query`` for read-side freshness when older entries have aged + past the TTL but no append has happened to evict them): + + - **TTL:** entries older than ``ttl_seconds`` are dropped. + - **max_entries:** when the deque exceeds ``max_entries``, oldest + drop until back at the cap. + + Both bounds are advisory at construction — non-positive values + fall back to permissive defaults rather than disabling the log, + because a misconfigured value should not silently lose events. + To disable the log, use ``DisabledEventLog`` instead. + + The id counter is monotonic across the entire process lifetime; + eviction does not reset it. A query with ``since=last_seen_id`` + returns the resident tail past that cursor, which may be empty if + the reader is too far behind. + """ + + _DEFAULT_TTL_SECONDS = 3600 # 1 hour — covers a long agentic loop without leaking + _DEFAULT_MAX_ENTRIES = 10_000 # ~1 MB at 100 bytes/event, safely under workspace RAM budget + + def __init__( + self, + ttl_seconds: int = _DEFAULT_TTL_SECONDS, + max_entries: int = _DEFAULT_MAX_ENTRIES, + now: Optional[Any] = None, + ) -> None: + self._ttl_seconds: int = ttl_seconds if ttl_seconds > 0 else self._DEFAULT_TTL_SECONDS + self._max_entries: int = max_entries if max_entries > 0 else self._DEFAULT_MAX_ENTRIES + # Injected clock for deterministic TTL tests. Production passes + # ``time.time``; tests pass a callable that returns a controlled value. + self._now = now if callable(now) else time.time + self._lock = threading.Lock() + self._next_id: int = 1 + self._buf: Deque[Event] = deque() + + def append(self, kind: str, payload: Optional[dict] = None) -> Event: + with self._lock: + event = Event( + id=self._next_id, + timestamp=self._now(), + kind=kind, + payload=dict(payload) if payload else {}, + ) + self._next_id += 1 + self._buf.append(event) + self._evict_locked() + return event + + def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]: + with self._lock: + # Read-side TTL sweep — covers the case where appends pause + # but a reader keeps polling. Without this, a stale tail + # would survive forever once writes stop. + self._evict_locked() + cutoff = since if since is not None else 0 + tail: Iterable[Event] = (e for e in self._buf if e.id > cutoff) + if limit is not None and limit >= 0: + if limit == 0: + # Explicit empty-slice probe — used by pagination + # UIs to ask "are there any new events?" without + # paying for the data. Distinct from limit=None + # (no cap) — return empty rather than the first event. + return [] + out: list[Event] = [] + for e in tail: + out.append(e) + if len(out) >= limit: + break + return out + return list(tail) + + def clear(self) -> None: + with self._lock: + self._buf.clear() + # NOTE: do NOT reset _next_id — the cursor contract is that + # ids are monotonic across the lifetime of the process, even + # across explicit clears (which only happen in tests). + + def _evict_locked(self) -> None: + """Caller MUST hold self._lock.""" + if not self._buf: + return + cutoff = self._now() - self._ttl_seconds + while self._buf and self._buf[0].timestamp < cutoff: + self._buf.popleft() + # max_entries bound after TTL — a long buffer that fits the + # window can still be capped if the burst rate exceeded design. + while len(self._buf) > self._max_entries: + self._buf.popleft() + + +class DisabledEventLog: + """No-op backend for ``backend: disabled``. + + Append returns a synthetic event so callers that want the id + don't crash; query always returns empty. The synthetic event is + NOT cached anywhere — the contract for ``backend: disabled`` is + that no state is retained. Operators who pick this backend opt + out of the canvas Activity tab and the `/activity` endpoint. + """ + + def __init__(self) -> None: + self._next_id: int = 1 + self._lock = threading.Lock() + + def append(self, kind: str, payload: Optional[dict] = None) -> Event: + # Single-shot id increment — keeps the returned event ids + # monotonic for callers that compare them, even though we + # never persist anything. + with self._lock: + event = Event( + id=self._next_id, + timestamp=time.time(), + kind=kind, + payload=dict(payload) if payload else {}, + ) + self._next_id += 1 + return event + + def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]: + return [] + + def clear(self) -> None: + return None + + +def create_event_log( + backend: str = "memory", + ttl_seconds: int = InMemoryEventLog._DEFAULT_TTL_SECONDS, + max_entries: int = InMemoryEventLog._DEFAULT_MAX_ENTRIES, +) -> EventLogBackend: + """Factory — pick a backend by name from EventLogConfig. + + Unknown backend strings fall back to ``memory`` rather than + raising at boot. A typo'd config value should degrade to the + safe default, not crash the workspace before any event can be + recorded. The redis backend lands in a follow-up; until then + ``backend: redis`` also resolves to in-memory. + """ + name = (backend or "memory").strip().lower() + if name in ("disabled", "off", "none"): + return DisabledEventLog() + # memory is the default; redis falls through here until it's wired. + return InMemoryEventLog(ttl_seconds=ttl_seconds, max_entries=max_entries) diff --git a/workspace/tests/test_config.py b/workspace/tests/test_config.py index 84f46545..1b6b1ee3 100644 --- a/workspace/tests/test_config.py +++ b/workspace/tests/test_config.py @@ -9,6 +9,7 @@ from config import ( A2AConfig, ComplianceConfig, DelegationConfig, + EventLogConfig, ObservabilityConfig, SandboxConfig, WorkspaceConfig, @@ -672,3 +673,135 @@ def test_observability_log_level_uppercased(tmp_path): cfg = load_config(str(tmp_path)) assert cfg.observability.log_level == "DEBUG" + + +# --------------------------------------------------------------------------- +# EventLogConfig (#119 PR-2) — schema-only parser tests. The runtime is +# exercised separately in test_event_log.py; these tests pin the YAML→ +# dataclass contract for ObservabilityConfig.event_log so the wire shape +# stays stable as backends are added in PR-3. +# --------------------------------------------------------------------------- + + +def test_event_log_dataclass_default(): + """EventLogConfig() — no args — yields the documented defaults.""" + cfg = EventLogConfig() + assert cfg.backend == "memory" + assert cfg.ttl_seconds == 3600 + assert cfg.max_entries == 10_000 + + +def test_event_log_default_when_yaml_omits_block(tmp_path): + """No ``observability.event_log`` key → dataclass defaults.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text(yaml.dump({})) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.backend == "memory" + assert cfg.observability.event_log.ttl_seconds == 3600 + assert cfg.observability.event_log.max_entries == 10_000 + + +def test_event_log_explicit_yaml_override(tmp_path): + """Explicit YAML values flow through load_config to EventLogConfig.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump( + { + "observability": { + "event_log": { + "backend": "disabled", + "ttl_seconds": 60, + "max_entries": 50, + } + } + } + ) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.backend == "disabled" + assert cfg.observability.event_log.ttl_seconds == 60 + assert cfg.observability.event_log.max_entries == 50 + + +def test_event_log_partial_override_keeps_other_defaults(tmp_path): + """Setting only backend preserves ttl + max_entries defaults.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump( + {"observability": {"event_log": {"backend": "disabled"}}} + ) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.backend == "disabled" + assert cfg.observability.event_log.ttl_seconds == 3600 + assert cfg.observability.event_log.max_entries == 10_000 + + +def test_event_log_unknown_backend_falls_back_to_memory(tmp_path): + """A typo ``backend: redis`` (not yet wired) resolves to the + safe default rather than crashing boot. Same lenient-default + contract as the rest of this parser.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump({"observability": {"event_log": {"backend": "redis"}}}) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.backend == "memory" + + +@pytest.mark.parametrize( + "raw_block, expected_ttl, expected_max", + [ + # In-band positives pass through. + ({"ttl_seconds": 1800, "max_entries": 500}, 1800, 500), + # Zero / negative / non-numeric coerce to documented defaults + # (3600 / 10000) — disabling the bound is what + # ``backend: disabled`` is for. + ({"ttl_seconds": 0}, 3600, 10_000), + ({"ttl_seconds": -1}, 3600, 10_000), + ({"ttl_seconds": "not-a-number"}, 3600, 10_000), + ({"max_entries": 0}, 3600, 10_000), + ({"max_entries": -5}, 3600, 10_000), + ({"max_entries": "huge"}, 3600, 10_000), + ], + ids=[ + "in_band_positives", + "zero_ttl_falls_back", + "negative_ttl_falls_back", + "non_numeric_ttl_falls_back", + "zero_max_entries_falls_back", + "negative_max_entries_falls_back", + "non_numeric_max_entries_falls_back", + ], +) +def test_event_log_bounds_clamp(tmp_path, raw_block, expected_ttl, expected_max): + """Out-of-band ttl_seconds / max_entries fall back to defaults + rather than disabling the log silently. ``backend: disabled`` is + the explicit opt-out path.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump({"observability": {"event_log": raw_block}}) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.ttl_seconds == expected_ttl + assert cfg.observability.event_log.max_entries == expected_max + + +def test_event_log_non_dict_block_falls_back_to_default(tmp_path): + """``event_log: "memory"`` (string instead of dict) → defaults. + A scalar value at this key is malformed YAML; coerce to default + instead of raising.""" + config_yaml = tmp_path / "config.yaml" + config_yaml.write_text( + yaml.dump({"observability": {"event_log": "memory"}}) + ) + + cfg = load_config(str(tmp_path)) + assert cfg.observability.event_log.backend == "memory" + assert cfg.observability.event_log.ttl_seconds == 3600 + assert cfg.observability.event_log.max_entries == 10_000 diff --git a/workspace/tests/test_event_log.py b/workspace/tests/test_event_log.py new file mode 100644 index 00000000..481c4292 --- /dev/null +++ b/workspace/tests/test_event_log.py @@ -0,0 +1,345 @@ +"""Tests for workspace/event_log.py — append/query/eviction/disabled backend.""" + +import threading +import time + +import pytest + +from event_log import ( + DisabledEventLog, + Event, + InMemoryEventLog, + create_event_log, +) + + +# --------------------------------------------------------------------------- +# InMemoryEventLog — append + query basics +# --------------------------------------------------------------------------- + + +def test_append_returns_event_with_assigned_id(): + """append() returns the persisted Event with a monotonic id starting at 1.""" + log = InMemoryEventLog() + + e1 = log.append("turn.started", {"task_id": "t1"}) + e2 = log.append("turn.completed", {"task_id": "t1"}) + + assert e1.id == 1 + assert e2.id == 2 + assert e1.kind == "turn.started" + assert e2.kind == "turn.completed" + assert e1.payload == {"task_id": "t1"} + + +def test_append_with_no_payload_yields_empty_dict(): + """payload omitted → empty dict, not None — so JSON serialisers don't choke.""" + log = InMemoryEventLog() + e = log.append("ping") + assert e.payload == {} + assert isinstance(e.payload, dict) + + +def test_append_copies_payload_so_caller_mutations_dont_leak(): + """The persisted payload must NOT alias the caller's dict — otherwise + a downstream mutation of the original silently rewrites history.""" + log = InMemoryEventLog() + payload = {"k": "v"} + e = log.append("evt", payload) + payload["k"] = "MUTATED" + assert e.payload == {"k": "v"} + assert log.query()[0].payload == {"k": "v"} + + +def test_query_no_args_returns_all_resident_events_in_order(): + """query() with no cursor returns every resident event, ascending by id.""" + log = InMemoryEventLog() + log.append("a") + log.append("b") + log.append("c") + + out = log.query() + assert [e.kind for e in out] == ["a", "b", "c"] + assert [e.id for e in out] == [1, 2, 3] + + +def test_query_since_cursor_returns_only_newer_events(): + """query(since=N) returns only events with id > N — strict greater-than.""" + log = InMemoryEventLog() + log.append("a") + log.append("b") + log.append("c") + + out = log.query(since=2) + assert [e.kind for e in out] == ["c"] + assert out[0].id == 3 + + +def test_query_since_at_or_past_tip_returns_empty(): + """A cursor at the current tip (or past it) yields no events.""" + log = InMemoryEventLog() + log.append("a") + log.append("b") + + assert log.query(since=2) == [] + assert log.query(since=999) == [] + + +def test_query_limit_caps_returned_slice(): + """limit caps the slice; unspecified means unlimited.""" + log = InMemoryEventLog() + for i in range(5): + log.append(f"e{i}") + + capped = log.query(limit=2) + assert [e.kind for e in capped] == ["e0", "e1"] + + unlimited = log.query() + assert len(unlimited) == 5 + + +def test_query_limit_zero_returns_empty_list(): + """limit=0 is a valid request for the empty slice (some pagination + UIs probe for "any new events?" with limit=0 + since=cursor).""" + log = InMemoryEventLog() + log.append("a") + assert log.query(limit=0) == [] + + +def test_query_combined_since_and_limit(): + """since + limit compose: skip past cursor, then cap.""" + log = InMemoryEventLog() + for i in range(10): + log.append(f"e{i}") + + out = log.query(since=3, limit=2) + assert [e.id for e in out] == [4, 5] + + +# --------------------------------------------------------------------------- +# Eviction — TTL + max_entries +# --------------------------------------------------------------------------- + + +def test_max_entries_evicts_oldest_first_fifo(): + """Exceeding max_entries evicts in FIFO order — newest survive.""" + log = InMemoryEventLog(max_entries=3) + for i in range(5): + log.append(f"e{i}") + + out = log.query() + assert [e.kind for e in out] == ["e2", "e3", "e4"] + assert [e.id for e in out] == [3, 4, 5] + + +def test_max_entries_evicted_ids_never_resurface_via_cursor(): + """A cursor pointing past evicted ids returns the resident tail. + Important: the reader does NOT see an error — they see "everything + after my cursor that's still here". This is the documented + at-most-once-while-resident contract.""" + log = InMemoryEventLog(max_entries=2) + for i in range(5): + log.append(f"e{i}") + + # Reader's last seen cursor was id=1, but events 1+2 have aged out. + # They should still get the resident tail (4, 5) without a crash. + out = log.query(since=1) + assert [e.id for e in out] == [4, 5] + + +def test_ttl_evicts_entries_older_than_ttl_seconds(): + """TTL eviction triggers on append when the oldest entry has aged + past ttl_seconds. Uses an injected clock so the test is hermetic.""" + clock = [1000.0] + log = InMemoryEventLog(ttl_seconds=10, now=lambda: clock[0]) + + log.append("old") # timestamp 1000 + clock[0] = 1005.0 + log.append("mid") # timestamp 1005 + clock[0] = 1015.0 # past TTL of "old" (1000+10=1010 < 1015) + log.append("new") # this triggers eviction sweep + + out = log.query() + assert [e.kind for e in out] == ["mid", "new"] + + +def test_ttl_evicts_on_query_when_appends_pause(): + """Read-side TTL sweep — covers the case where appends stop but + a reader keeps polling. Without this, a stale tail would survive + forever once writes pause.""" + clock = [1000.0] + log = InMemoryEventLog(ttl_seconds=10, now=lambda: clock[0]) + + log.append("only") + # No more appends. Advance well past TTL. + clock[0] = 2000.0 + + assert log.query() == [] + + +def test_clear_drops_all_but_preserves_id_counter(): + """clear() drops every resident event but does NOT reset the id + counter — the cursor contract is monotonic ids across the + process lifetime, even across clears (which are test-only).""" + log = InMemoryEventLog() + log.append("a") + log.append("b") + + log.clear() + assert log.query() == [] + + e = log.append("c") + assert e.id == 3 # counter resumes, not reset + + +def test_non_positive_ttl_falls_back_to_default(): + """Defensive: a 0 or negative ttl_seconds at construction falls + back to the documented 3600s default. Disabling eviction silently + would leak memory; that's what backend=disabled is for.""" + log = InMemoryEventLog(ttl_seconds=0) + assert log._ttl_seconds == InMemoryEventLog._DEFAULT_TTL_SECONDS + + log2 = InMemoryEventLog(ttl_seconds=-5) + assert log2._ttl_seconds == InMemoryEventLog._DEFAULT_TTL_SECONDS + + +def test_non_positive_max_entries_falls_back_to_default(): + """Same defensive shape for max_entries.""" + log = InMemoryEventLog(max_entries=0) + assert log._max_entries == InMemoryEventLog._DEFAULT_MAX_ENTRIES + + log2 = InMemoryEventLog(max_entries=-1) + assert log2._max_entries == InMemoryEventLog._DEFAULT_MAX_ENTRIES + + +# --------------------------------------------------------------------------- +# Event.to_dict — wire-format ownership pinning +# --------------------------------------------------------------------------- + + +def test_event_to_dict_contains_all_fields(): + """to_dict() returns the JSON-serialisable shape API consumers expect. + Pinning the wire format here means a future rename of ``kind`` flips + in event_log.py rather than in every reader.""" + e = Event(id=42, timestamp=1700.5, kind="turn.started", payload={"x": 1}) + d = e.to_dict() + assert d == {"id": 42, "timestamp": 1700.5, "kind": "turn.started", "payload": {"x": 1}} + + +def test_event_timestamp_is_set_at_append(): + """timestamp on a logged event is the value of the injected clock at + append time, not query time — so the wire timestamp reflects when + the event happened, not when it was read.""" + clock = [1234.5] + # Wide ttl so the read-side TTL sweep doesn't evict the event we + # just wrote when we advance the clock to read it back. + log = InMemoryEventLog(ttl_seconds=100_000, now=lambda: clock[0]) + log.append("evt") + clock[0] = 9999.0 + [e] = log.query() + assert e.timestamp == 1234.5 + + +# --------------------------------------------------------------------------- +# DisabledEventLog — no-op contract +# --------------------------------------------------------------------------- + + +def test_disabled_query_always_empty(): + """Disabled backend never retains anything — query is always [].""" + log = DisabledEventLog() + log.append("a") + log.append("b") + assert log.query() == [] + assert log.query(since=0) == [] + + +def test_disabled_append_returns_event_with_monotonic_ids(): + """Even when nothing is persisted, append returns an Event with a + monotonic id so callers that propagate the id (e.g. for a debug + log) don't crash.""" + log = DisabledEventLog() + e1 = log.append("a") + e2 = log.append("b") + assert e1.id == 1 + assert e2.id == 2 + assert e1.kind == "a" + + +def test_disabled_clear_is_a_no_op(): + """clear() on disabled returns None and changes nothing.""" + log = DisabledEventLog() + log.append("a") + log.clear() + assert log.query() == [] + + +# --------------------------------------------------------------------------- +# create_event_log factory +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "name", ["memory", "MEMORY", " memory ", "", "redis", "unknown"] +) +def test_create_event_log_memory_default(name): + """Default + unknown + redis-not-yet-wired all resolve to in-memory. + A typo or future-backend name should NOT silently disable telemetry.""" + log = create_event_log(backend=name) + assert isinstance(log, InMemoryEventLog) + + +@pytest.mark.parametrize("name", ["disabled", "DISABLED", " off ", "none"]) +def test_create_event_log_disabled_aliases(name): + """``disabled``, ``off``, ``none`` all opt the workspace out.""" + log = create_event_log(backend=name) + assert isinstance(log, DisabledEventLog) + + +def test_create_event_log_passes_bounds_through(): + """ttl_seconds and max_entries flow into the InMemoryEventLog instance.""" + log = create_event_log(backend="memory", ttl_seconds=42, max_entries=99) + assert isinstance(log, InMemoryEventLog) + assert log._ttl_seconds == 42 + assert log._max_entries == 99 + + +# --------------------------------------------------------------------------- +# Concurrency — append from multiple threads under contention +# --------------------------------------------------------------------------- + + +def test_concurrent_appends_assign_unique_monotonic_ids(): + """Multiple writer threads must not collide on the id counter. + Heartbeat thread + main loop + A2A executor all append concurrently + in production; a duplicated id would break cursor-based readers.""" + log = InMemoryEventLog(max_entries=10_000) + n_threads = 8 + n_per_thread = 200 + + def worker(): + for _ in range(n_per_thread): + log.append("e") + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + out = log.query() + ids = [e.id for e in out] + assert len(ids) == n_threads * n_per_thread + assert len(set(ids)) == len(ids) # all unique + assert ids == sorted(ids) # ascending order preserved + + +def test_real_clock_default_uses_time_time(): + """When ``now`` is not passed, the log uses ``time.time`` — sanity + check that the production path is wired and that an event's + timestamp matches the wall clock within a small epsilon.""" + log = InMemoryEventLog() + before = time.time() + e = log.append("evt") + after = time.time() + assert before <= e.timestamp <= after