fix(platform): panic-recovering supervisor for every background goroutine (#92)

Yesterday's scheduler-died incident (#85) was one instance of a systemic
bug: every long-running goroutine in the platform lacks panic recovery
and exposes no liveness signal. In a multi-tenant SaaS deployment, a
single tenant's bad data panicking any subsystem takes down the
subsystem for every tenant, silently, with all standard health probes
still green. That is a scale-of-one sev-1.

This PR:

1. Introduces `platform/internal/supervised/` with two primitives:

   a. RunWithRecover(ctx, name, fn) — runs fn in a recover wrapper.
      On panic logs the stack + exponential-backoff restart (1s → 2s →
      4s → … → 30s cap). On clean return (fn decided to stop) returns.
      On ctx.Done cancels cleanly.

   b. Heartbeat(name) + LastTick(name) + Snapshot() + IsHealthy(names,
      staleThreshold) — shared in-memory liveness registry. Every
      subsystem calls Heartbeat(name) at the end of each tick so
      operators can distinguish "goroutine alive and healthy" from
      "alive but stuck inside a single tick".

2. Wraps every `go X.Start(ctx)` in main.go:
   - broadcaster.Subscribe   (Redis pub/sub relay → WebSocket)
   - registry.StartLivenessMonitor
   - registry.StartHealthSweep
   - scheduler.Start         (the one that died yesterday)
   - channelMgr.Start        (Telegram / Slack)

3. Adds `supervised.Heartbeat("scheduler")` inside the scheduler tick
   loop as the first end-to-end demonstration. Follow-up PRs will add
   heartbeats to the other four subsystems.

4. Adds `GET /admin/liveness` endpoint returning per-subsystem
   last_tick_at + seconds_ago. Operators can poll this and alert on
   any subsystem whose seconds_ago exceeds 2x its cron/tick interval.

5. Unit tests for RunWithRecover (clean return no restart; panic
   restarts with backoff; ctx cancel stops restart loop) and for the
   liveness registry.

Net new code: ~160 lines + ~100 lines of tests. Refactor of main.go:
~10 line changes. No behavior change on happy path; only lifts what
happens on a panic.

Closes #92. Supersedes the local recover added to scheduler.go in
#90 (kept conceptually, but now via the shared helper).
This commit is contained in:
rabbitblood 2026-04-14 20:34:18 -07:00
parent e7275531d8
commit e4535560cf
5 changed files with 301 additions and 5 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
)
@ -67,7 +68,12 @@ func main() {
// Start Redis pub/sub subscriber
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go broadcaster.Subscribe(ctx)
// Every long-running subsystem below is wrapped by supervised.RunWithRecover:
// a panic (e.g. from a single bad tenant row) is logged + the subsystem is
// restarted with exponential backoff instead of silently dying forever.
// Motivation: issue #85 (scheduler silent outage for 12+ hours) and #92
// (systemic — affects every background goroutine).
go supervised.RunWithRecover(ctx, "broadcaster", broadcaster.Subscribe)
// Activity log retention — configurable via env vars
retentionDays := envOr("ACTIVITY_RETENTION_DAYS", "7")
@ -122,21 +128,25 @@ func main() {
}
// Start Liveness Monitor — Redis TTL expiry-based offline detection + auto-restart
go registry.StartLivenessMonitor(ctx, onWorkspaceOffline)
go supervised.RunWithRecover(ctx, "liveness-monitor", func(c context.Context) {
registry.StartLivenessMonitor(c, onWorkspaceOffline)
})
// Proactive container health sweep — detects dead containers faster than Redis TTL.
// Checks all "online" workspaces against Docker every 15 seconds.
if prov != nil {
go registry.StartHealthSweep(ctx, prov, 15*time.Second, onWorkspaceOffline)
go supervised.RunWithRecover(ctx, "health-sweep", func(c context.Context) {
registry.StartHealthSweep(c, prov, 15*time.Second, onWorkspaceOffline)
})
}
// Cron Scheduler — fires A2A messages to workspaces on user-defined schedules
cronSched := scheduler.New(wh, broadcaster)
go cronSched.Start(ctx)
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
channelMgr := channels.NewManager(wh, broadcaster)
go channelMgr.Start(ctx)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)

View File

@ -15,6 +15,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/docker/docker/client"
"github.com/gin-contrib/cors"
@ -63,6 +64,23 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
c.JSON(200, gin.H{"status": "ok"})
})
// /admin/liveness — per-subsystem last-tick timestamps. Operators read this
// to catch stuck-but-not-crashed goroutines (the failure mode that caused
// the 12h scheduler outage of 2026-04-14, issue #85). Any subsystem whose
// last tick is older than 2× its expected interval is stale.
r.GET("/admin/liveness", func(c *gin.Context) {
snap := supervised.Snapshot()
out := make(map[string]interface{}, len(snap))
now := time.Now()
for name, last := range snap {
out[name] = gin.H{
"last_tick_at": last,
"seconds_ago": int(now.Sub(last).Seconds()),
}
}
c.JSON(200, gin.H{"subsystems": out})
})
// Prometheus metrics — exempt from rate limiter via separate registration
// (registered before Use(limiter) takes effect on this specific route — the
// middleware.Middleware() still records it for observability).

View File

@ -12,6 +12,7 @@ import (
cronlib "github.com/robfig/cron/v3"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
)
const (
@ -60,6 +61,10 @@ func (s *Scheduler) Start(ctx context.Context) {
log.Printf("Scheduler: started (poll interval=%s)", pollInterval)
// Heartbeat before the first tick so /admin/liveness doesn't flag stale
// during the initial 30s interval after startup.
supervised.Heartbeat("scheduler")
for {
select {
case <-ctx.Done():
@ -67,6 +72,7 @@ func (s *Scheduler) Start(ctx context.Context) {
return
case <-ticker.C:
s.tick(ctx)
supervised.Heartbeat("scheduler")
}
}
}

View File

@ -0,0 +1,142 @@
// Package supervised provides a panic-recovering supervisor for long-running
// background goroutines on the platform. Every "go X.Start(ctx)" invocation
// in main.go should go through [RunWithRecover] so a single panic from one
// tenant's data cannot silently kill a subsystem that serves every tenant.
//
// Incident that motivated this (issue #85, 2026-04-14):
//
// The cron scheduler goroutine died silently at 14:21 UTC and stayed dead
// for 12+ hours. Platform restart didn't recover it. Root cause: no
// defer recover() in the tick loop. Observable signals (HTTP 200, container
// healthy, DB healthy) all stayed green — only the subsystem was dead.
//
// In a multi-tenant SaaS deployment the blast radius is every tenant
// simultaneously, which is exactly the class of failure we cannot afford.
package supervised
import (
"context"
"log"
"runtime/debug"
"sync"
"time"
)
// Default backoff bounds for RunWithRecover restarts.
const (
initialBackoff = 1 * time.Second
maxBackoff = 30 * time.Second
)
// RunWithRecover runs fn in a recover wrapper. If fn panics, the panic is
// logged with its stack trace and fn is restarted after an exponential
// backoff (capped at maxBackoff). The loop exits cleanly when ctx is done.
//
// fn is expected to be a long-running loop (e.g. "for { select { ticker ... } }").
// If fn returns without panicking (e.g. ctx.Done), RunWithRecover returns.
//
// go supervised.RunWithRecover(ctx, "scheduler", func(c context.Context) {
// scheduler.Start(c)
// })
//
// name is used in log lines and by the liveness registry below.
func RunWithRecover(ctx context.Context, name string, fn func(context.Context)) {
backoff := initialBackoff
for {
select {
case <-ctx.Done():
log.Printf("supervised[%s]: context done; stopping", name)
return
default:
}
panicked := runOnce(ctx, name, fn)
// Clean return → the goroutine decided to stop (likely ctx.Done inside fn).
// Don't restart.
if !panicked {
log.Printf("supervised[%s]: returned cleanly; not restarting", name)
return
}
// Panic → back off and restart.
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
// runOnce invokes fn with recover. Returns true iff fn panicked.
func runOnce(ctx context.Context, name string, fn func(context.Context)) (panicked bool) {
defer func() {
if r := recover(); r != nil {
panicked = true
log.Printf("supervised[%s]: PANIC recovered: %v\n%s", name, r, debug.Stack())
}
}()
fn(ctx)
return false
}
// --- Liveness registry -----------------------------------------------------
//
// Each subsystem calls Heartbeat(name) at the end of each tick / iteration.
// Operators read the registry via /admin/liveness to detect stuck-but-not-
// crashed subsystems (e.g. a tick that deadlocks without panicking).
var (
livenessMu sync.RWMutex
lastTicks = map[string]time.Time{}
)
// Heartbeat records that subsystem `name` is alive as of now.
func Heartbeat(name string) {
livenessMu.Lock()
lastTicks[name] = time.Now()
livenessMu.Unlock()
}
// LastTick returns the wall-clock time of the most recent Heartbeat for
// subsystem `name`. Returns the zero time if the subsystem has never
// heartbeated.
func LastTick(name string) time.Time {
livenessMu.RLock()
defer livenessMu.RUnlock()
return lastTicks[name]
}
// Snapshot returns a copy of every subsystem's last-tick time, for admin
// endpoints.
func Snapshot() map[string]time.Time {
livenessMu.RLock()
defer livenessMu.RUnlock()
out := make(map[string]time.Time, len(lastTicks))
for k, v := range lastTicks {
out[k] = v
}
return out
}
// IsHealthy returns true iff every subsystem in `expected` has tickled
// within `staleThreshold` ago. Use from /health (or a strict variant of it)
// to surface stuck subsystems to an external orchestrator.
func IsHealthy(expected []string, staleThreshold time.Duration) (healthy bool, stale []string) {
livenessMu.RLock()
defer livenessMu.RUnlock()
now := time.Now()
for _, name := range expected {
last, ok := lastTicks[name]
if !ok || now.Sub(last) > staleThreshold {
stale = append(stale, name)
}
}
return len(stale) == 0, stale
}

View File

@ -0,0 +1,120 @@
package supervised
import (
"context"
"sync/atomic"
"testing"
"time"
)
func TestRunWithRecover_CleanReturnDoesNotRestart(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
RunWithRecover(ctx, "clean", func(c context.Context) {
atomic.AddInt32(&calls, 1)
// Return immediately — no panic, not blocked on ctx.
})
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("RunWithRecover did not return after clean fn exit")
}
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("fn called %d times on clean return; want 1", got)
}
}
func TestRunWithRecover_PanicRestartsWithBackoff(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
go RunWithRecover(ctx, "panic-test", func(c context.Context) {
atomic.AddInt32(&calls, 1)
if atomic.LoadInt32(&calls) < 3 {
panic("deliberate")
}
// On 3rd call, wait for ctx.Done so we can inspect calls cleanly.
<-c.Done()
})
// Give it time to panic + restart at least twice (1s + 2s backoffs).
time.Sleep(4 * time.Second)
cancel()
got := atomic.LoadInt32(&calls)
if got < 3 {
t.Errorf("fn called %d times after 4s of restarts; want >= 3", got)
}
}
func TestRunWithRecover_CtxDoneStopsRestart(t *testing.T) {
var calls int32
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
RunWithRecover(ctx, "ctx-done", func(c context.Context) {
atomic.AddInt32(&calls, 1)
panic("always")
})
close(done)
}()
time.Sleep(100 * time.Millisecond)
cancel()
select {
case <-done:
case <-time.After(35 * time.Second):
t.Fatal("RunWithRecover did not return after ctx cancel")
}
}
func TestLivenessRegistry(t *testing.T) {
// Heartbeat records; LastTick reads back.
before := time.Now()
Heartbeat("testsubsys-A")
after := time.Now()
last := LastTick("testsubsys-A")
if last.Before(before) || last.After(after) {
t.Errorf("LastTick=%v outside [%v, %v]", last, before, after)
}
// Unknown subsystem → zero time.
if !LastTick("nonexistent-subsys").IsZero() {
t.Errorf("LastTick for unknown subsystem should be zero")
}
// IsHealthy: fresh heartbeat → healthy; stale → not healthy.
Heartbeat("testsubsys-B")
healthy, stale := IsHealthy([]string{"testsubsys-A", "testsubsys-B"}, time.Minute)
if !healthy || len(stale) != 0 {
t.Errorf("expected healthy, got healthy=%v stale=%v", healthy, stale)
}
// Force staleness by asking for an impossibly tight threshold.
time.Sleep(10 * time.Millisecond)
healthy, stale = IsHealthy([]string{"testsubsys-A"}, time.Nanosecond)
if healthy || len(stale) != 1 {
t.Errorf("expected stale testsubsys-A, got healthy=%v stale=%v", healthy, stale)
}
}
func TestSnapshotIsCopy(t *testing.T) {
Heartbeat("snap-test")
s1 := Snapshot()
// Mutating the returned map must not affect the registry.
s1["snap-test"] = time.Time{}
if LastTick("snap-test").IsZero() {
t.Errorf("Snapshot returned a live reference; should be a copy")
}
}