feat(workspace): event_log module + EventLogConfig (#119 PR-2)

Adds workspace/event_log.py with an in-memory EventLog backend and a
disabled no-op variant, plus EventLogConfig nested in
ObservabilityConfig (backend / ttl_seconds / max_entries).

The event log is the append-and-query buffer that the canvas Activity
tab and platform `/activity` endpoint will read in PR-3 of the #119
stack. Two backends ship in this PR:

  - InMemoryEventLog: bounded ring buffer with TTL eviction, monotonic
    ids that survive eviction so cursors don't break, thread-safe for
    concurrent appends from heartbeat + main loop + A2A executor.
  - DisabledEventLog: no-op for `backend: disabled` — opts the
    workspace out without crashing callers that propagate event ids.

Schema-only PR — no consumers wired yet. Wiring lands in PR-3.

Test coverage:
  - 34 new test_event_log.py tests (100% line coverage on event_log.py)
  - 9 new test_config.py tests for EventLogConfig parsing
  - Concurrency stress with 8 threads × 200 appends — verifies unique
    monotonic ids under contention
  - TTL + max_entries eviction with injected clock (no time.sleep)
  - Disabled backend contract pinned

Closes #207.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-03 00:17:12 -07:00
parent c4f64a11a8
commit 0fc2531250
4 changed files with 818 additions and 9 deletions

View File

@ -176,26 +176,68 @@ class SecurityScanConfig:
operators who require a CVE gate know the gate is absent. Closes #268."""
@dataclass
class EventLogConfig:
"""Settings for the workspace event log (workspace/event_log.py).
The event log is an append-and-query buffer for runtime events
(turn started, tool invoked, peer message delivered, ) that the
canvas Activity tab and platform-side `/activity` endpoint read.
Defaults are tuned for a long-running workspace: 1-hour TTL and a
10k-entry cap together hold ~1 MB of events in memory at the
documented per-event size budget (~100 bytes payload).
Example config.yaml snippet::
observability:
event_log:
backend: memory # or "disabled" to opt out
ttl_seconds: 3600
max_entries: 10000
"""
backend: str = "memory"
"""``memory`` (default) buffers events in process RAM with the
bounds below; ``disabled`` returns a no-op log so the canvas
Activity tab is silent. Unknown values fall back to ``memory``
a typo should not crash boot or silently drop telemetry."""
ttl_seconds: int = 3600
"""How long an event survives before TTL eviction. 1 hour covers
a long agentic loop comfortably without leaking; operators
debugging a slow drift may temporarily widen this, but be aware
the bound is RAM, not disk."""
max_entries: int = 10_000
"""Hard cap on resident events. Together with ``ttl_seconds`` this
bounds memory: the FIFO eviction drops oldest first, so a query
cursor that falls behind sees a contiguous tail rather than a
gappy log."""
@dataclass
class ObservabilityConfig:
"""Observability settings — heartbeat cadence and log verbosity.
"""Observability settings — heartbeat cadence, log verbosity, event log.
Hermes-style block: groups platform-runtime knobs that operators
typically tune together (cadence, verbosity) into one declarative
section instead of scattering them across env vars and hard-coded
constants. Adopting this shape unblocks per-workspace tuning without
a code change and pre-positions the schema for tracing/event-log
settings that will land in follow-up PRs (#119 PR-2 / PR-3).
typically tune together (cadence, verbosity, event-log retention)
into one declarative section instead of scattering them across env
vars and hard-coded constants. Adopting this shape unblocks
per-workspace tuning without a code change.
Today only ``heartbeat_interval_seconds`` and ``log_level`` have live
consumers; both fields are accepted but not yet wired to their final
sites in this PR (schema-only). Wiring lands in PR-3 of the series.
The ``event_log`` sub-block is schema-only in this PR (#119 PR-2);
consumer wiring (the canvas Activity tab + `/activity` endpoint
reading from the configured backend) lands in PR-3.
Example config.yaml snippet::
observability:
heartbeat_interval_seconds: 60
log_level: DEBUG
event_log:
backend: memory
ttl_seconds: 3600
max_entries: 10000
"""
heartbeat_interval_seconds: int = 30
@ -212,6 +254,9 @@ class ObservabilityConfig:
runtime reads ``LOG_LEVEL`` env; PR-3 of the #119 stack switches to
this field with env still honored as an override for ops debugging."""
event_log: EventLogConfig = field(default_factory=EventLogConfig)
"""Event-log backend + retention bounds. See ``EventLogConfig``."""
@dataclass
class ComplianceConfig:
@ -337,6 +382,42 @@ def _derive_provider_from_model(model: str) -> str:
return ""
_EVENT_LOG_VALID_BACKENDS = {"memory", "disabled"}
def _parse_event_log(raw: object) -> "EventLogConfig":
"""Coerce the ``observability.event_log`` YAML block into EventLogConfig.
Lenient like the rest of this parser: a missing block, a non-dict
value, or a bad backend name resolves to defaults rather than
raising at boot. The event_log is observability infra a typo in
one field should not crash the workspace before any event can fire.
Bounds (ttl_seconds, max_entries) clamp to positives so a 0/-1
misconfig doesn't disable the log silently; that's what
``backend: disabled`` is for.
"""
if not isinstance(raw, dict):
return EventLogConfig()
backend = str(raw.get("backend", "memory")).strip().lower()
if backend not in _EVENT_LOG_VALID_BACKENDS:
backend = "memory"
try:
ttl_seconds = int(raw.get("ttl_seconds", 3600))
except (TypeError, ValueError):
ttl_seconds = 3600
if ttl_seconds <= 0:
ttl_seconds = 3600
try:
max_entries = int(raw.get("max_entries", 10_000))
except (TypeError, ValueError):
max_entries = 10_000
if max_entries <= 0:
max_entries = 10_000
return EventLogConfig(
backend=backend, ttl_seconds=ttl_seconds, max_entries=max_entries
)
def _clamp_heartbeat(value: object) -> int:
"""Coerce raw YAML/env input into the [5, 300]-second heartbeat band.
@ -526,6 +607,7 @@ def load_config(config_path: Optional[str] = None) -> WorkspaceConfig:
observability_raw.get("heartbeat_interval_seconds", 30)
),
log_level=str(observability_raw.get("log_level", "INFO")).upper(),
event_log=_parse_event_log(observability_raw.get("event_log", {})),
),
sub_workspaces=raw.get("sub_workspaces", []),
effort=str(raw.get("effort", "")),

249
workspace/event_log.py Normal file
View File

@ -0,0 +1,249 @@
"""Workspace event log — append-and-query buffer for runtime events.
Hermes-style declarative observability primitive. Adapter and platform
code emit semantic events (turn started, tool invoked, peer message
delivered) and external readers the canvas Activity tab, A2A peers,
and the platform's `/workspaces/:id/activity` endpoint — query them
with a cursor.
Today's PR ships the in-memory backend only. Redis backend lands in
the follow-up that wires platform-side fan-out (#119 PR-3 follow-up).
The Protocol shape lets a future backend swap in without touching the
emitting sites.
Eviction is the load-bearing invariant: the workspace runtime is
long-lived, so an unbounded list would leak memory. Every append
prunes by both TTL and max_entries; readers that fall behind past
the eviction frontier see a contiguous tail without an error the
cursor protocol only guarantees "events with id > since that are
still resident", not "every event ever appended". A reader that
needs at-least-once delivery must poll faster than the eviction TTL.
"""
from __future__ import annotations
import threading
import time
from collections import deque
from dataclasses import asdict, dataclass, field
from typing import Any, Deque, Iterable, Optional, Protocol
@dataclass(frozen=True)
class Event:
"""One immutable entry in the event log.
``id`` is a monotonic integer assigned at append time. It SURVIVES
eviction the counter is never reset when an old event drops out
of the buffer, so a reader's cursor stays valid even if the event
it points to has aged out (the next query just returns the resident
tail). This is the contract that lets a slow reader reconnect
without resetting to id=0.
"""
id: int
timestamp: float
"""Seconds since the Unix epoch — the same shape as ``time.time()``
so callers can format with ``datetime.fromtimestamp`` without an
extra conversion. Float, not int, because event-bursts within the
same second need stable ordering for downstream merging."""
kind: str
"""Short tag categorising the event: ``turn.started``, ``tool.invoked``,
``peer.message.delivered``, etc. Convention is dotted snake_case so
the canvas can group by prefix without a parser."""
payload: dict = field(default_factory=dict)
"""Arbitrary JSON-serialisable dict. Keep small — the in-memory
backend holds every event in process RAM. Large blobs (file
contents, full transcripts) belong in the platform's blob store
with a reference here, not the value itself."""
def to_dict(self) -> dict:
"""Plain-dict shape for JSON serialisation in the API layer.
Wrapping ``dataclasses.asdict`` rather than relying on the
consumer to call it themselves means the wire format stays
owned by this module a rename of ``kind`` to ``type`` (or
whatever the canvas eventually settles on) flips here, not in
every reader.
"""
return asdict(self)
class EventLogBackend(Protocol):
"""Backend Protocol — the swap point for memory ↔ redis ↔ disabled.
Implementations must be safe to call from multiple threads. The
workspace runtime appends from the heartbeat thread, the agent's
main loop, and any A2A executor concurrently; readers run on the
HTTP server thread. A backend that needs locking owns it.
"""
def append(self, kind: str, payload: Optional[dict] = None) -> Event:
"""Add an event and return the persisted record (with id assigned)."""
...
def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]:
"""Return events with ``id > since`` (or all resident if ``since`` is None).
Order is ascending by id. ``limit`` caps the returned slice;
if the resident tail is shorter than ``limit``, returns what
is available.
"""
...
def clear(self) -> None:
"""Drop all entries. Provided for test isolation, not for production callers."""
...
class InMemoryEventLog:
"""Bounded in-memory ring buffer with TTL eviction.
Two eviction triggers, both checked on every ``append`` (and on
``query`` for read-side freshness when older entries have aged
past the TTL but no append has happened to evict them):
- **TTL:** entries older than ``ttl_seconds`` are dropped.
- **max_entries:** when the deque exceeds ``max_entries``, oldest
drop until back at the cap.
Both bounds are advisory at construction non-positive values
fall back to permissive defaults rather than disabling the log,
because a misconfigured value should not silently lose events.
To disable the log, use ``DisabledEventLog`` instead.
The id counter is monotonic across the entire process lifetime;
eviction does not reset it. A query with ``since=last_seen_id``
returns the resident tail past that cursor, which may be empty if
the reader is too far behind.
"""
_DEFAULT_TTL_SECONDS = 3600 # 1 hour — covers a long agentic loop without leaking
_DEFAULT_MAX_ENTRIES = 10_000 # ~1 MB at 100 bytes/event, safely under workspace RAM budget
def __init__(
self,
ttl_seconds: int = _DEFAULT_TTL_SECONDS,
max_entries: int = _DEFAULT_MAX_ENTRIES,
now: Optional[Any] = None,
) -> None:
self._ttl_seconds: int = ttl_seconds if ttl_seconds > 0 else self._DEFAULT_TTL_SECONDS
self._max_entries: int = max_entries if max_entries > 0 else self._DEFAULT_MAX_ENTRIES
# Injected clock for deterministic TTL tests. Production passes
# ``time.time``; tests pass a callable that returns a controlled value.
self._now = now if callable(now) else time.time
self._lock = threading.Lock()
self._next_id: int = 1
self._buf: Deque[Event] = deque()
def append(self, kind: str, payload: Optional[dict] = None) -> Event:
with self._lock:
event = Event(
id=self._next_id,
timestamp=self._now(),
kind=kind,
payload=dict(payload) if payload else {},
)
self._next_id += 1
self._buf.append(event)
self._evict_locked()
return event
def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]:
with self._lock:
# Read-side TTL sweep — covers the case where appends pause
# but a reader keeps polling. Without this, a stale tail
# would survive forever once writes stop.
self._evict_locked()
cutoff = since if since is not None else 0
tail: Iterable[Event] = (e for e in self._buf if e.id > cutoff)
if limit is not None and limit >= 0:
if limit == 0:
# Explicit empty-slice probe — used by pagination
# UIs to ask "are there any new events?" without
# paying for the data. Distinct from limit=None
# (no cap) — return empty rather than the first event.
return []
out: list[Event] = []
for e in tail:
out.append(e)
if len(out) >= limit:
break
return out
return list(tail)
def clear(self) -> None:
with self._lock:
self._buf.clear()
# NOTE: do NOT reset _next_id — the cursor contract is that
# ids are monotonic across the lifetime of the process, even
# across explicit clears (which only happen in tests).
def _evict_locked(self) -> None:
"""Caller MUST hold self._lock."""
if not self._buf:
return
cutoff = self._now() - self._ttl_seconds
while self._buf and self._buf[0].timestamp < cutoff:
self._buf.popleft()
# max_entries bound after TTL — a long buffer that fits the
# window can still be capped if the burst rate exceeded design.
while len(self._buf) > self._max_entries:
self._buf.popleft()
class DisabledEventLog:
"""No-op backend for ``backend: disabled``.
Append returns a synthetic event so callers that want the id
don't crash; query always returns empty. The synthetic event is
NOT cached anywhere the contract for ``backend: disabled`` is
that no state is retained. Operators who pick this backend opt
out of the canvas Activity tab and the `/activity` endpoint.
"""
def __init__(self) -> None:
self._next_id: int = 1
self._lock = threading.Lock()
def append(self, kind: str, payload: Optional[dict] = None) -> Event:
# Single-shot id increment — keeps the returned event ids
# monotonic for callers that compare them, even though we
# never persist anything.
with self._lock:
event = Event(
id=self._next_id,
timestamp=time.time(),
kind=kind,
payload=dict(payload) if payload else {},
)
self._next_id += 1
return event
def query(self, since: Optional[int] = None, limit: Optional[int] = None) -> list[Event]:
return []
def clear(self) -> None:
return None
def create_event_log(
backend: str = "memory",
ttl_seconds: int = InMemoryEventLog._DEFAULT_TTL_SECONDS,
max_entries: int = InMemoryEventLog._DEFAULT_MAX_ENTRIES,
) -> EventLogBackend:
"""Factory — pick a backend by name from EventLogConfig.
Unknown backend strings fall back to ``memory`` rather than
raising at boot. A typo'd config value should degrade to the
safe default, not crash the workspace before any event can be
recorded. The redis backend lands in a follow-up; until then
``backend: redis`` also resolves to in-memory.
"""
name = (backend or "memory").strip().lower()
if name in ("disabled", "off", "none"):
return DisabledEventLog()
# memory is the default; redis falls through here until it's wired.
return InMemoryEventLog(ttl_seconds=ttl_seconds, max_entries=max_entries)

View File

@ -9,6 +9,7 @@ from config import (
A2AConfig,
ComplianceConfig,
DelegationConfig,
EventLogConfig,
ObservabilityConfig,
SandboxConfig,
WorkspaceConfig,
@ -672,3 +673,135 @@ def test_observability_log_level_uppercased(tmp_path):
cfg = load_config(str(tmp_path))
assert cfg.observability.log_level == "DEBUG"
# ---------------------------------------------------------------------------
# EventLogConfig (#119 PR-2) — schema-only parser tests. The runtime is
# exercised separately in test_event_log.py; these tests pin the YAML→
# dataclass contract for ObservabilityConfig.event_log so the wire shape
# stays stable as backends are added in PR-3.
# ---------------------------------------------------------------------------
def test_event_log_dataclass_default():
"""EventLogConfig() — no args — yields the documented defaults."""
cfg = EventLogConfig()
assert cfg.backend == "memory"
assert cfg.ttl_seconds == 3600
assert cfg.max_entries == 10_000
def test_event_log_default_when_yaml_omits_block(tmp_path):
"""No ``observability.event_log`` key → dataclass defaults."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(yaml.dump({}))
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.backend == "memory"
assert cfg.observability.event_log.ttl_seconds == 3600
assert cfg.observability.event_log.max_entries == 10_000
def test_event_log_explicit_yaml_override(tmp_path):
"""Explicit YAML values flow through load_config to EventLogConfig."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump(
{
"observability": {
"event_log": {
"backend": "disabled",
"ttl_seconds": 60,
"max_entries": 50,
}
}
}
)
)
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.backend == "disabled"
assert cfg.observability.event_log.ttl_seconds == 60
assert cfg.observability.event_log.max_entries == 50
def test_event_log_partial_override_keeps_other_defaults(tmp_path):
"""Setting only backend preserves ttl + max_entries defaults."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump(
{"observability": {"event_log": {"backend": "disabled"}}}
)
)
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.backend == "disabled"
assert cfg.observability.event_log.ttl_seconds == 3600
assert cfg.observability.event_log.max_entries == 10_000
def test_event_log_unknown_backend_falls_back_to_memory(tmp_path):
"""A typo ``backend: redis`` (not yet wired) resolves to the
safe default rather than crashing boot. Same lenient-default
contract as the rest of this parser."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump({"observability": {"event_log": {"backend": "redis"}}})
)
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.backend == "memory"
@pytest.mark.parametrize(
"raw_block, expected_ttl, expected_max",
[
# In-band positives pass through.
({"ttl_seconds": 1800, "max_entries": 500}, 1800, 500),
# Zero / negative / non-numeric coerce to documented defaults
# (3600 / 10000) — disabling the bound is what
# ``backend: disabled`` is for.
({"ttl_seconds": 0}, 3600, 10_000),
({"ttl_seconds": -1}, 3600, 10_000),
({"ttl_seconds": "not-a-number"}, 3600, 10_000),
({"max_entries": 0}, 3600, 10_000),
({"max_entries": -5}, 3600, 10_000),
({"max_entries": "huge"}, 3600, 10_000),
],
ids=[
"in_band_positives",
"zero_ttl_falls_back",
"negative_ttl_falls_back",
"non_numeric_ttl_falls_back",
"zero_max_entries_falls_back",
"negative_max_entries_falls_back",
"non_numeric_max_entries_falls_back",
],
)
def test_event_log_bounds_clamp(tmp_path, raw_block, expected_ttl, expected_max):
"""Out-of-band ttl_seconds / max_entries fall back to defaults
rather than disabling the log silently. ``backend: disabled`` is
the explicit opt-out path."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump({"observability": {"event_log": raw_block}})
)
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.ttl_seconds == expected_ttl
assert cfg.observability.event_log.max_entries == expected_max
def test_event_log_non_dict_block_falls_back_to_default(tmp_path):
"""``event_log: "memory"`` (string instead of dict) → defaults.
A scalar value at this key is malformed YAML; coerce to default
instead of raising."""
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
yaml.dump({"observability": {"event_log": "memory"}})
)
cfg = load_config(str(tmp_path))
assert cfg.observability.event_log.backend == "memory"
assert cfg.observability.event_log.ttl_seconds == 3600
assert cfg.observability.event_log.max_entries == 10_000

View File

@ -0,0 +1,345 @@
"""Tests for workspace/event_log.py — append/query/eviction/disabled backend."""
import threading
import time
import pytest
from event_log import (
DisabledEventLog,
Event,
InMemoryEventLog,
create_event_log,
)
# ---------------------------------------------------------------------------
# InMemoryEventLog — append + query basics
# ---------------------------------------------------------------------------
def test_append_returns_event_with_assigned_id():
"""append() returns the persisted Event with a monotonic id starting at 1."""
log = InMemoryEventLog()
e1 = log.append("turn.started", {"task_id": "t1"})
e2 = log.append("turn.completed", {"task_id": "t1"})
assert e1.id == 1
assert e2.id == 2
assert e1.kind == "turn.started"
assert e2.kind == "turn.completed"
assert e1.payload == {"task_id": "t1"}
def test_append_with_no_payload_yields_empty_dict():
"""payload omitted → empty dict, not None — so JSON serialisers don't choke."""
log = InMemoryEventLog()
e = log.append("ping")
assert e.payload == {}
assert isinstance(e.payload, dict)
def test_append_copies_payload_so_caller_mutations_dont_leak():
"""The persisted payload must NOT alias the caller's dict — otherwise
a downstream mutation of the original silently rewrites history."""
log = InMemoryEventLog()
payload = {"k": "v"}
e = log.append("evt", payload)
payload["k"] = "MUTATED"
assert e.payload == {"k": "v"}
assert log.query()[0].payload == {"k": "v"}
def test_query_no_args_returns_all_resident_events_in_order():
"""query() with no cursor returns every resident event, ascending by id."""
log = InMemoryEventLog()
log.append("a")
log.append("b")
log.append("c")
out = log.query()
assert [e.kind for e in out] == ["a", "b", "c"]
assert [e.id for e in out] == [1, 2, 3]
def test_query_since_cursor_returns_only_newer_events():
"""query(since=N) returns only events with id > N — strict greater-than."""
log = InMemoryEventLog()
log.append("a")
log.append("b")
log.append("c")
out = log.query(since=2)
assert [e.kind for e in out] == ["c"]
assert out[0].id == 3
def test_query_since_at_or_past_tip_returns_empty():
"""A cursor at the current tip (or past it) yields no events."""
log = InMemoryEventLog()
log.append("a")
log.append("b")
assert log.query(since=2) == []
assert log.query(since=999) == []
def test_query_limit_caps_returned_slice():
"""limit caps the slice; unspecified means unlimited."""
log = InMemoryEventLog()
for i in range(5):
log.append(f"e{i}")
capped = log.query(limit=2)
assert [e.kind for e in capped] == ["e0", "e1"]
unlimited = log.query()
assert len(unlimited) == 5
def test_query_limit_zero_returns_empty_list():
"""limit=0 is a valid request for the empty slice (some pagination
UIs probe for "any new events?" with limit=0 + since=cursor)."""
log = InMemoryEventLog()
log.append("a")
assert log.query(limit=0) == []
def test_query_combined_since_and_limit():
"""since + limit compose: skip past cursor, then cap."""
log = InMemoryEventLog()
for i in range(10):
log.append(f"e{i}")
out = log.query(since=3, limit=2)
assert [e.id for e in out] == [4, 5]
# ---------------------------------------------------------------------------
# Eviction — TTL + max_entries
# ---------------------------------------------------------------------------
def test_max_entries_evicts_oldest_first_fifo():
"""Exceeding max_entries evicts in FIFO order — newest survive."""
log = InMemoryEventLog(max_entries=3)
for i in range(5):
log.append(f"e{i}")
out = log.query()
assert [e.kind for e in out] == ["e2", "e3", "e4"]
assert [e.id for e in out] == [3, 4, 5]
def test_max_entries_evicted_ids_never_resurface_via_cursor():
"""A cursor pointing past evicted ids returns the resident tail.
Important: the reader does NOT see an error they see "everything
after my cursor that's still here". This is the documented
at-most-once-while-resident contract."""
log = InMemoryEventLog(max_entries=2)
for i in range(5):
log.append(f"e{i}")
# Reader's last seen cursor was id=1, but events 1+2 have aged out.
# They should still get the resident tail (4, 5) without a crash.
out = log.query(since=1)
assert [e.id for e in out] == [4, 5]
def test_ttl_evicts_entries_older_than_ttl_seconds():
"""TTL eviction triggers on append when the oldest entry has aged
past ttl_seconds. Uses an injected clock so the test is hermetic."""
clock = [1000.0]
log = InMemoryEventLog(ttl_seconds=10, now=lambda: clock[0])
log.append("old") # timestamp 1000
clock[0] = 1005.0
log.append("mid") # timestamp 1005
clock[0] = 1015.0 # past TTL of "old" (1000+10=1010 < 1015)
log.append("new") # this triggers eviction sweep
out = log.query()
assert [e.kind for e in out] == ["mid", "new"]
def test_ttl_evicts_on_query_when_appends_pause():
"""Read-side TTL sweep — covers the case where appends stop but
a reader keeps polling. Without this, a stale tail would survive
forever once writes pause."""
clock = [1000.0]
log = InMemoryEventLog(ttl_seconds=10, now=lambda: clock[0])
log.append("only")
# No more appends. Advance well past TTL.
clock[0] = 2000.0
assert log.query() == []
def test_clear_drops_all_but_preserves_id_counter():
"""clear() drops every resident event but does NOT reset the id
counter the cursor contract is monotonic ids across the
process lifetime, even across clears (which are test-only)."""
log = InMemoryEventLog()
log.append("a")
log.append("b")
log.clear()
assert log.query() == []
e = log.append("c")
assert e.id == 3 # counter resumes, not reset
def test_non_positive_ttl_falls_back_to_default():
"""Defensive: a 0 or negative ttl_seconds at construction falls
back to the documented 3600s default. Disabling eviction silently
would leak memory; that's what backend=disabled is for."""
log = InMemoryEventLog(ttl_seconds=0)
assert log._ttl_seconds == InMemoryEventLog._DEFAULT_TTL_SECONDS
log2 = InMemoryEventLog(ttl_seconds=-5)
assert log2._ttl_seconds == InMemoryEventLog._DEFAULT_TTL_SECONDS
def test_non_positive_max_entries_falls_back_to_default():
"""Same defensive shape for max_entries."""
log = InMemoryEventLog(max_entries=0)
assert log._max_entries == InMemoryEventLog._DEFAULT_MAX_ENTRIES
log2 = InMemoryEventLog(max_entries=-1)
assert log2._max_entries == InMemoryEventLog._DEFAULT_MAX_ENTRIES
# ---------------------------------------------------------------------------
# Event.to_dict — wire-format ownership pinning
# ---------------------------------------------------------------------------
def test_event_to_dict_contains_all_fields():
"""to_dict() returns the JSON-serialisable shape API consumers expect.
Pinning the wire format here means a future rename of ``kind`` flips
in event_log.py rather than in every reader."""
e = Event(id=42, timestamp=1700.5, kind="turn.started", payload={"x": 1})
d = e.to_dict()
assert d == {"id": 42, "timestamp": 1700.5, "kind": "turn.started", "payload": {"x": 1}}
def test_event_timestamp_is_set_at_append():
"""timestamp on a logged event is the value of the injected clock at
append time, not query time so the wire timestamp reflects when
the event happened, not when it was read."""
clock = [1234.5]
# Wide ttl so the read-side TTL sweep doesn't evict the event we
# just wrote when we advance the clock to read it back.
log = InMemoryEventLog(ttl_seconds=100_000, now=lambda: clock[0])
log.append("evt")
clock[0] = 9999.0
[e] = log.query()
assert e.timestamp == 1234.5
# ---------------------------------------------------------------------------
# DisabledEventLog — no-op contract
# ---------------------------------------------------------------------------
def test_disabled_query_always_empty():
"""Disabled backend never retains anything — query is always []."""
log = DisabledEventLog()
log.append("a")
log.append("b")
assert log.query() == []
assert log.query(since=0) == []
def test_disabled_append_returns_event_with_monotonic_ids():
"""Even when nothing is persisted, append returns an Event with a
monotonic id so callers that propagate the id (e.g. for a debug
log) don't crash."""
log = DisabledEventLog()
e1 = log.append("a")
e2 = log.append("b")
assert e1.id == 1
assert e2.id == 2
assert e1.kind == "a"
def test_disabled_clear_is_a_no_op():
"""clear() on disabled returns None and changes nothing."""
log = DisabledEventLog()
log.append("a")
log.clear()
assert log.query() == []
# ---------------------------------------------------------------------------
# create_event_log factory
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"name", ["memory", "MEMORY", " memory ", "", "redis", "unknown"]
)
def test_create_event_log_memory_default(name):
"""Default + unknown + redis-not-yet-wired all resolve to in-memory.
A typo or future-backend name should NOT silently disable telemetry."""
log = create_event_log(backend=name)
assert isinstance(log, InMemoryEventLog)
@pytest.mark.parametrize("name", ["disabled", "DISABLED", " off ", "none"])
def test_create_event_log_disabled_aliases(name):
"""``disabled``, ``off``, ``none`` all opt the workspace out."""
log = create_event_log(backend=name)
assert isinstance(log, DisabledEventLog)
def test_create_event_log_passes_bounds_through():
"""ttl_seconds and max_entries flow into the InMemoryEventLog instance."""
log = create_event_log(backend="memory", ttl_seconds=42, max_entries=99)
assert isinstance(log, InMemoryEventLog)
assert log._ttl_seconds == 42
assert log._max_entries == 99
# ---------------------------------------------------------------------------
# Concurrency — append from multiple threads under contention
# ---------------------------------------------------------------------------
def test_concurrent_appends_assign_unique_monotonic_ids():
"""Multiple writer threads must not collide on the id counter.
Heartbeat thread + main loop + A2A executor all append concurrently
in production; a duplicated id would break cursor-based readers."""
log = InMemoryEventLog(max_entries=10_000)
n_threads = 8
n_per_thread = 200
def worker():
for _ in range(n_per_thread):
log.append("e")
threads = [threading.Thread(target=worker) for _ in range(n_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
out = log.query()
ids = [e.id for e in out]
assert len(ids) == n_threads * n_per_thread
assert len(set(ids)) == len(ids) # all unique
assert ids == sorted(ids) # ascending order preserved
def test_real_clock_default_uses_time_time():
"""When ``now`` is not passed, the log uses ``time.time`` — sanity
check that the production path is wired and that an event's
timestamp matches the wall clock within a small epsilon."""
log = InMemoryEventLog()
before = time.time()
e = log.append("evt")
after = time.time()
assert before <= e.timestamp <= after