molecule-core/platform/internal/supervised/supervised_test.go
rabbitblood 76a36e8062 fix(platform): panic-recovering supervisor for every background goroutine (#92)
Yesterday's scheduler-died incident (#85) was one instance of a systemic
bug: every long-running goroutine in the platform lacks panic recovery
and exposes no liveness signal. In a multi-tenant SaaS deployment, a
single tenant's bad data panicking any subsystem takes down the
subsystem for every tenant, silently, with all standard health probes
still green. That is a scale-of-one sev-1.

This PR:

1. Introduces `platform/internal/supervised/` with two primitives:

   a. RunWithRecover(ctx, name, fn) — runs fn in a recover wrapper.
      On panic logs the stack + exponential-backoff restart (1s → 2s →
      4s → … → 30s cap). On clean return (fn decided to stop) returns.
      On ctx.Done cancels cleanly.

   b. Heartbeat(name) + LastTick(name) + Snapshot() + IsHealthy(names,
      staleThreshold) — shared in-memory liveness registry. Every
      subsystem calls Heartbeat(name) at the end of each tick so
      operators can distinguish "goroutine alive and healthy" from
      "alive but stuck inside a single tick".

2. Wraps every `go X.Start(ctx)` in main.go:
   - broadcaster.Subscribe   (Redis pub/sub relay → WebSocket)
   - registry.StartLivenessMonitor
   - registry.StartHealthSweep
   - scheduler.Start         (the one that died yesterday)
   - channelMgr.Start        (Telegram / Slack)

3. Adds `supervised.Heartbeat("scheduler")` inside the scheduler tick
   loop as the first end-to-end demonstration. Follow-up PRs will add
   heartbeats to the other four subsystems.

4. Adds `GET /admin/liveness` endpoint returning per-subsystem
   last_tick_at + seconds_ago. Operators can poll this and alert on
   any subsystem whose seconds_ago exceeds 2x its cron/tick interval.

5. Unit tests for RunWithRecover (clean return no restart; panic
   restarts with backoff; ctx cancel stops restart loop) and for the
   liveness registry.

Net new code: ~160 lines + ~100 lines of tests. Refactor of main.go:
~10 line changes. No behavior change on happy path; only lifts what
happens on a panic.

Closes #92. Supersedes the local recover added to scheduler.go in
#90 (kept conceptually, but now via the shared helper).
2026-04-14 20:34:18 -07:00

121 lines
3.0 KiB
Go

package supervised
import (
"context"
"sync/atomic"
"testing"
"time"
)
func TestRunWithRecover_CleanReturnDoesNotRestart(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
RunWithRecover(ctx, "clean", func(c context.Context) {
atomic.AddInt32(&calls, 1)
// Return immediately — no panic, not blocked on ctx.
})
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("RunWithRecover did not return after clean fn exit")
}
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("fn called %d times on clean return; want 1", got)
}
}
func TestRunWithRecover_PanicRestartsWithBackoff(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
go RunWithRecover(ctx, "panic-test", func(c context.Context) {
atomic.AddInt32(&calls, 1)
if atomic.LoadInt32(&calls) < 3 {
panic("deliberate")
}
// On 3rd call, wait for ctx.Done so we can inspect calls cleanly.
<-c.Done()
})
// Give it time to panic + restart at least twice (1s + 2s backoffs).
time.Sleep(4 * time.Second)
cancel()
got := atomic.LoadInt32(&calls)
if got < 3 {
t.Errorf("fn called %d times after 4s of restarts; want >= 3", got)
}
}
func TestRunWithRecover_CtxDoneStopsRestart(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
RunWithRecover(ctx, "ctx-done", func(c context.Context) {
atomic.AddInt32(&calls, 1)
panic("always")
})
close(done)
}()
time.Sleep(100 * time.Millisecond)
cancel()
select {
case <-done:
case <-time.After(35 * time.Second):
t.Fatal("RunWithRecover did not return after ctx cancel")
}
}
func TestLivenessRegistry(t *testing.T) {
// Heartbeat records; LastTick reads back.
before := time.Now()
Heartbeat("testsubsys-A")
after := time.Now()
last := LastTick("testsubsys-A")
if last.Before(before) || last.After(after) {
t.Errorf("LastTick=%v outside [%v, %v]", last, before, after)
}
// Unknown subsystem → zero time.
if !LastTick("nonexistent-subsys").IsZero() {
t.Errorf("LastTick for unknown subsystem should be zero")
}
// IsHealthy: fresh heartbeat → healthy; stale → not healthy.
Heartbeat("testsubsys-B")
healthy, stale := IsHealthy([]string{"testsubsys-A", "testsubsys-B"}, time.Minute)
if !healthy || len(stale) != 0 {
t.Errorf("expected healthy, got healthy=%v stale=%v", healthy, stale)
}
// Force staleness by asking for an impossibly tight threshold.
time.Sleep(10 * time.Millisecond)
healthy, stale = IsHealthy([]string{"testsubsys-A"}, time.Nanosecond)
if healthy || len(stale) != 1 {
t.Errorf("expected stale testsubsys-A, got healthy=%v stale=%v", healthy, stale)
}
}
func TestSnapshotIsCopy(t *testing.T) {
Heartbeat("snap-test")
s1 := Snapshot()
// Mutating the returned map must not affect the registry.
s1["snap-test"] = time.Time{}
if LastTick("snap-test").IsZero() {
t.Errorf("Snapshot returned a live reference; should be a copy")
}
}