Merge pull request #2551 from Molecule-AI/feat/wire-observability-config
feat(workspace): wire observability heartbeat + log_level into consumers (#119 PR-3a)
This commit is contained in:
commit
e2b58f0fbc
@ -131,7 +131,7 @@ def _persist_inbound_secret_from_heartbeat(resp) -> None:
|
||||
)
|
||||
|
||||
|
||||
HEARTBEAT_INTERVAL = 30 # seconds
|
||||
HEARTBEAT_INTERVAL = 30 # seconds — fallback default when no per-instance value is passed
|
||||
MAX_CONSECUTIVE_FAILURES = 10
|
||||
MAX_SEEN_DELEGATION_IDS = 200
|
||||
SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to prevent loops
|
||||
@ -142,9 +142,22 @@ DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delega
|
||||
|
||||
|
||||
class HeartbeatLoop:
|
||||
def __init__(self, platform_url: str, workspace_id: str):
|
||||
def __init__(
|
||||
self,
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
interval_seconds: int = HEARTBEAT_INTERVAL,
|
||||
):
|
||||
self.platform_url = platform_url
|
||||
self.workspace_id = workspace_id
|
||||
# Per-instance interval — main.py threads ObservabilityConfig.
|
||||
# heartbeat_interval_seconds (clamped to [5, 300] at parse time)
|
||||
# in here so operators can tune cadence per-workspace via the
|
||||
# `observability:` block in config.yaml. Defaults to the
|
||||
# legacy module constant so callers that haven't been updated
|
||||
# yet (and tests that construct HeartbeatLoop directly with the
|
||||
# 2-arg signature) keep their existing 30s behavior.
|
||||
self._interval_seconds = interval_seconds
|
||||
self.start_time = time.time()
|
||||
self.error_count = 0
|
||||
self.request_count = 0
|
||||
@ -280,13 +293,15 @@ class HeartbeatLoop:
|
||||
except Exception as e:
|
||||
logger.debug("Delegation check failed: %s", e)
|
||||
|
||||
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
||||
await asyncio.sleep(self._interval_seconds)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Heartbeat loop error: %s — retrying in 30s", e)
|
||||
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
||||
logger.error(
|
||||
"Heartbeat loop error: %s — retrying in %ds", e, self._interval_seconds
|
||||
)
|
||||
await asyncio.sleep(self._interval_seconds)
|
||||
finally:
|
||||
if client:
|
||||
try:
|
||||
|
||||
@ -107,8 +107,16 @@ async def main(): # pragma: no cover
|
||||
else:
|
||||
print("Governance: disabled (set governance.enabled: true in config.yaml to activate)")
|
||||
|
||||
# 2. Create heartbeat (passed to adapter for task tracking)
|
||||
heartbeat = HeartbeatLoop(platform_url, workspace_id)
|
||||
# 2. Create heartbeat (passed to adapter for task tracking).
|
||||
# interval is sourced from observability.heartbeat_interval_seconds
|
||||
# in config.yaml — clamped to [5, 300] at parse time. Operators
|
||||
# who want a faster crash-detection signal lower it; ones who want
|
||||
# to reduce platform write load raise it.
|
||||
heartbeat = HeartbeatLoop(
|
||||
platform_url,
|
||||
workspace_id,
|
||||
interval_seconds=config.observability.heartbeat_interval_seconds,
|
||||
)
|
||||
|
||||
# 3. Get adapter for this runtime
|
||||
runtime = config.runtime or "langgraph"
|
||||
@ -458,11 +466,20 @@ async def main(): # pragma: no cover
|
||||
|
||||
built_app = make_trace_middleware(starlette_app)
|
||||
|
||||
# uvicorn expects the level name in lowercase ("debug" / "info" /
|
||||
# "warning" / "error" / "critical"). config.observability.log_level
|
||||
# is uppercased at parse time (config.py.load_config) for the
|
||||
# Python ``logging`` module's convention; lower it here so both
|
||||
# consumers get the form they expect from one source of truth.
|
||||
# An ``LOG_LEVEL`` env var still wins as an ops-side debugging
|
||||
# override — set it on the workspace process to bypass YAML
|
||||
# without a config edit + restart cycle.
|
||||
uvicorn_log_level = os.environ.get("LOG_LEVEL", config.observability.log_level).lower()
|
||||
server_config = uvicorn.Config(
|
||||
built_app,
|
||||
host="0.0.0.0",
|
||||
port=port,
|
||||
log_level="info",
|
||||
log_level=uvicorn_log_level,
|
||||
)
|
||||
server = uvicorn.Server(server_config)
|
||||
|
||||
|
||||
@ -501,3 +501,43 @@ async def test_heartbeat_loop_persists_secret_from_response(monkeypatch):
|
||||
assert saved == ["from-heartbeat"], (
|
||||
"in-container heartbeat must persist platform_inbound_secret from 200 response"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# observability.heartbeat_interval_seconds wiring (#119 PR-3) — pin that the
|
||||
# per-instance interval flows from ObservabilityConfig through the
|
||||
# constructor to the asyncio.sleep call. Tests below use the public
|
||||
# attribute, but the attribute IS the wire because it's read directly by
|
||||
# the loop body.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_init_default_interval_matches_legacy_constant():
|
||||
"""When the 2-arg constructor is used (legacy callers, existing tests),
|
||||
the per-instance interval falls back to the module-level
|
||||
HEARTBEAT_INTERVAL constant — preserves backward compat without a
|
||||
behavior change for code that hasn't been updated to pass the
|
||||
observability-driven value."""
|
||||
from heartbeat import HEARTBEAT_INTERVAL
|
||||
|
||||
hb = HeartbeatLoop("http://localhost:8080", "ws-1")
|
||||
assert hb._interval_seconds == HEARTBEAT_INTERVAL
|
||||
|
||||
|
||||
def test_init_accepts_explicit_interval():
|
||||
"""Passing interval_seconds threads ObservabilityConfig.heartbeat_interval_seconds
|
||||
through to the loop. The integration site (workspace/main.py) does
|
||||
this with the value from config.observability.heartbeat_interval_seconds."""
|
||||
hb = HeartbeatLoop("http://localhost:8080", "ws-1", interval_seconds=60)
|
||||
assert hb._interval_seconds == 60
|
||||
|
||||
|
||||
def test_init_accepts_floor_of_5():
|
||||
"""The config parser clamps to [5, 300]; the constructor itself accepts
|
||||
any positive int — clamping is the parser's job, not the loop's. This
|
||||
test pins that no defensive re-clamp happens here (which would
|
||||
silently break operators who deliberately want 5s in dev)."""
|
||||
hb = HeartbeatLoop("http://localhost:8080", "ws-1", interval_seconds=5)
|
||||
assert hb._interval_seconds == 5
|
||||
hb2 = HeartbeatLoop("http://localhost:8080", "ws-1", interval_seconds=300)
|
||||
assert hb2._interval_seconds == 300
|
||||
|
||||
Loading…
Reference in New Issue
Block a user