From e4535560cf72aa133b84d8186ac257ab86d19f2f Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Tue, 14 Apr 2026 20:34:18 -0700 Subject: [PATCH] fix(platform): panic-recovering supervisor for every background goroutine (#92) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Yesterday's scheduler-died incident (#85) was one instance of a systemic bug: every long-running goroutine in the platform lacks panic recovery and exposes no liveness signal. In a multi-tenant SaaS deployment, a single tenant's bad data panicking any subsystem takes down the subsystem for every tenant, silently, with all standard health probes still green. That is a scale-of-one sev-1. This PR: 1. Introduces `platform/internal/supervised/` with two primitives: a. RunWithRecover(ctx, name, fn) — runs fn in a recover wrapper. On panic logs the stack + exponential-backoff restart (1s → 2s → 4s → … → 30s cap). On clean return (fn decided to stop) returns. On ctx.Done cancels cleanly. b. Heartbeat(name) + LastTick(name) + Snapshot() + IsHealthy(names, staleThreshold) — shared in-memory liveness registry. Every subsystem calls Heartbeat(name) at the end of each tick so operators can distinguish "goroutine alive and healthy" from "alive but stuck inside a single tick". 2. Wraps every `go X.Start(ctx)` in main.go: - broadcaster.Subscribe (Redis pub/sub relay → WebSocket) - registry.StartLivenessMonitor - registry.StartHealthSweep - scheduler.Start (the one that died yesterday) - channelMgr.Start (Telegram / Slack) 3. Adds `supervised.Heartbeat("scheduler")` inside the scheduler tick loop as the first end-to-end demonstration. Follow-up PRs will add heartbeats to the other four subsystems. 4. Adds `GET /admin/liveness` endpoint returning per-subsystem last_tick_at + seconds_ago. Operators can poll this and alert on any subsystem whose seconds_ago exceeds 2x its cron/tick interval. 5. Unit tests for RunWithRecover (clean return no restart; panic restarts with backoff; ctx cancel stops restart loop) and for the liveness registry. Net new code: ~160 lines + ~100 lines of tests. Refactor of main.go: ~10 line changes. No behavior change on happy path; only lifts what happens on a panic. Closes #92. Supersedes the local recover added to scheduler.go in #90 (kept conceptually, but now via the shared helper). --- platform/cmd/server/main.go | 20 ++- platform/internal/router/router.go | 18 +++ platform/internal/scheduler/scheduler.go | 6 + platform/internal/supervised/supervised.go | 142 ++++++++++++++++++ .../internal/supervised/supervised_test.go | 120 +++++++++++++++ 5 files changed, 301 insertions(+), 5 deletions(-) create mode 100644 platform/internal/supervised/supervised.go create mode 100644 platform/internal/supervised/supervised_test.go diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index 45eeffc4..12dcc710 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -20,6 +20,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/router" "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" ) @@ -67,7 +68,12 @@ func main() { // Start Redis pub/sub subscriber ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go broadcaster.Subscribe(ctx) + // Every long-running subsystem below is wrapped by supervised.RunWithRecover: + // a panic (e.g. from a single bad tenant row) is logged + the subsystem is + // restarted with exponential backoff instead of silently dying forever. + // Motivation: issue #85 (scheduler silent outage for 12+ hours) and #92 + // (systemic — affects every background goroutine). + go supervised.RunWithRecover(ctx, "broadcaster", broadcaster.Subscribe) // Activity log retention — configurable via env vars retentionDays := envOr("ACTIVITY_RETENTION_DAYS", "7") @@ -122,21 +128,25 @@ func main() { } // Start Liveness Monitor — Redis TTL expiry-based offline detection + auto-restart - go registry.StartLivenessMonitor(ctx, onWorkspaceOffline) + go supervised.RunWithRecover(ctx, "liveness-monitor", func(c context.Context) { + registry.StartLivenessMonitor(c, onWorkspaceOffline) + }) // Proactive container health sweep — detects dead containers faster than Redis TTL. // Checks all "online" workspaces against Docker every 15 seconds. if prov != nil { - go registry.StartHealthSweep(ctx, prov, 15*time.Second, onWorkspaceOffline) + go supervised.RunWithRecover(ctx, "health-sweep", func(c context.Context) { + registry.StartHealthSweep(c, prov, 15*time.Second, onWorkspaceOffline) + }) } // Cron Scheduler — fires A2A messages to workspaces on user-defined schedules cronSched := scheduler.New(wh, broadcaster) - go cronSched.Start(ctx) + go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start) // Channel Manager — social channel integrations (Telegram, Slack, etc.) channelMgr := channels.NewManager(wh, broadcaster) - go channelMgr.Start(ctx) + go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) // Router r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr) diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index b8b9730b..c422e194 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -15,6 +15,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" "github.com/docker/docker/client" "github.com/gin-contrib/cors" @@ -63,6 +64,23 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi c.JSON(200, gin.H{"status": "ok"}) }) + // /admin/liveness — per-subsystem last-tick timestamps. Operators read this + // to catch stuck-but-not-crashed goroutines (the failure mode that caused + // the 12h scheduler outage of 2026-04-14, issue #85). Any subsystem whose + // last tick is older than 2× its expected interval is stale. + r.GET("/admin/liveness", func(c *gin.Context) { + snap := supervised.Snapshot() + out := make(map[string]interface{}, len(snap)) + now := time.Now() + for name, last := range snap { + out[name] = gin.H{ + "last_tick_at": last, + "seconds_ago": int(now.Sub(last).Seconds()), + } + } + c.JSON(200, gin.H{"subsystems": out}) + }) + // Prometheus metrics — exempt from rate limiter via separate registration // (registered before Use(limiter) takes effect on this specific route — the // middleware.Middleware() still records it for observability). diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 235eed4d..a23092cb 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -12,6 +12,7 @@ import ( cronlib "github.com/robfig/cron/v3" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" ) const ( @@ -60,6 +61,10 @@ func (s *Scheduler) Start(ctx context.Context) { log.Printf("Scheduler: started (poll interval=%s)", pollInterval) + // Heartbeat before the first tick so /admin/liveness doesn't flag stale + // during the initial 30s interval after startup. + supervised.Heartbeat("scheduler") + for { select { case <-ctx.Done(): @@ -67,6 +72,7 @@ func (s *Scheduler) Start(ctx context.Context) { return case <-ticker.C: s.tick(ctx) + supervised.Heartbeat("scheduler") } } } diff --git a/platform/internal/supervised/supervised.go b/platform/internal/supervised/supervised.go new file mode 100644 index 00000000..8de84258 --- /dev/null +++ b/platform/internal/supervised/supervised.go @@ -0,0 +1,142 @@ +// Package supervised provides a panic-recovering supervisor for long-running +// background goroutines on the platform. Every "go X.Start(ctx)" invocation +// in main.go should go through [RunWithRecover] so a single panic from one +// tenant's data cannot silently kill a subsystem that serves every tenant. +// +// Incident that motivated this (issue #85, 2026-04-14): +// +// The cron scheduler goroutine died silently at 14:21 UTC and stayed dead +// for 12+ hours. Platform restart didn't recover it. Root cause: no +// defer recover() in the tick loop. Observable signals (HTTP 200, container +// healthy, DB healthy) all stayed green — only the subsystem was dead. +// +// In a multi-tenant SaaS deployment the blast radius is every tenant +// simultaneously, which is exactly the class of failure we cannot afford. +package supervised + +import ( + "context" + "log" + "runtime/debug" + "sync" + "time" +) + +// Default backoff bounds for RunWithRecover restarts. +const ( + initialBackoff = 1 * time.Second + maxBackoff = 30 * time.Second +) + +// RunWithRecover runs fn in a recover wrapper. If fn panics, the panic is +// logged with its stack trace and fn is restarted after an exponential +// backoff (capped at maxBackoff). The loop exits cleanly when ctx is done. +// +// fn is expected to be a long-running loop (e.g. "for { select { ticker ... } }"). +// If fn returns without panicking (e.g. ctx.Done), RunWithRecover returns. +// +// go supervised.RunWithRecover(ctx, "scheduler", func(c context.Context) { +// scheduler.Start(c) +// }) +// +// name is used in log lines and by the liveness registry below. +func RunWithRecover(ctx context.Context, name string, fn func(context.Context)) { + backoff := initialBackoff + for { + select { + case <-ctx.Done(): + log.Printf("supervised[%s]: context done; stopping", name) + return + default: + } + + panicked := runOnce(ctx, name, fn) + + // Clean return → the goroutine decided to stop (likely ctx.Done inside fn). + // Don't restart. + if !panicked { + log.Printf("supervised[%s]: returned cleanly; not restarting", name) + return + } + + // Panic → back off and restart. + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +// runOnce invokes fn with recover. Returns true iff fn panicked. +func runOnce(ctx context.Context, name string, fn func(context.Context)) (panicked bool) { + defer func() { + if r := recover(); r != nil { + panicked = true + log.Printf("supervised[%s]: PANIC recovered: %v\n%s", name, r, debug.Stack()) + } + }() + fn(ctx) + return false +} + +// --- Liveness registry ----------------------------------------------------- +// +// Each subsystem calls Heartbeat(name) at the end of each tick / iteration. +// Operators read the registry via /admin/liveness to detect stuck-but-not- +// crashed subsystems (e.g. a tick that deadlocks without panicking). + +var ( + livenessMu sync.RWMutex + lastTicks = map[string]time.Time{} +) + +// Heartbeat records that subsystem `name` is alive as of now. +func Heartbeat(name string) { + livenessMu.Lock() + lastTicks[name] = time.Now() + livenessMu.Unlock() +} + +// LastTick returns the wall-clock time of the most recent Heartbeat for +// subsystem `name`. Returns the zero time if the subsystem has never +// heartbeated. +func LastTick(name string) time.Time { + livenessMu.RLock() + defer livenessMu.RUnlock() + return lastTicks[name] +} + +// Snapshot returns a copy of every subsystem's last-tick time, for admin +// endpoints. +func Snapshot() map[string]time.Time { + livenessMu.RLock() + defer livenessMu.RUnlock() + out := make(map[string]time.Time, len(lastTicks)) + for k, v := range lastTicks { + out[k] = v + } + return out +} + +// IsHealthy returns true iff every subsystem in `expected` has tickled +// within `staleThreshold` ago. Use from /health (or a strict variant of it) +// to surface stuck subsystems to an external orchestrator. +func IsHealthy(expected []string, staleThreshold time.Duration) (healthy bool, stale []string) { + livenessMu.RLock() + defer livenessMu.RUnlock() + now := time.Now() + for _, name := range expected { + last, ok := lastTicks[name] + if !ok || now.Sub(last) > staleThreshold { + stale = append(stale, name) + } + } + return len(stale) == 0, stale +} diff --git a/platform/internal/supervised/supervised_test.go b/platform/internal/supervised/supervised_test.go new file mode 100644 index 00000000..ba96df85 --- /dev/null +++ b/platform/internal/supervised/supervised_test.go @@ -0,0 +1,120 @@ +package supervised + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +func TestRunWithRecover_CleanReturnDoesNotRestart(t *testing.T) { + var calls int32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + RunWithRecover(ctx, "clean", func(c context.Context) { + atomic.AddInt32(&calls, 1) + // Return immediately — no panic, not blocked on ctx. + }) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("RunWithRecover did not return after clean fn exit") + } + + if got := atomic.LoadInt32(&calls); got != 1 { + t.Errorf("fn called %d times on clean return; want 1", got) + } +} + +func TestRunWithRecover_PanicRestartsWithBackoff(t *testing.T) { + var calls int32 + ctx, cancel := context.WithCancel(context.Background()) + + go RunWithRecover(ctx, "panic-test", func(c context.Context) { + atomic.AddInt32(&calls, 1) + if atomic.LoadInt32(&calls) < 3 { + panic("deliberate") + } + // On 3rd call, wait for ctx.Done so we can inspect calls cleanly. + <-c.Done() + }) + + // Give it time to panic + restart at least twice (1s + 2s backoffs). + time.Sleep(4 * time.Second) + cancel() + + got := atomic.LoadInt32(&calls) + if got < 3 { + t.Errorf("fn called %d times after 4s of restarts; want >= 3", got) + } +} + +func TestRunWithRecover_CtxDoneStopsRestart(t *testing.T) { + var calls int32 + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + RunWithRecover(ctx, "ctx-done", func(c context.Context) { + atomic.AddInt32(&calls, 1) + panic("always") + }) + close(done) + }() + + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case <-done: + case <-time.After(35 * time.Second): + t.Fatal("RunWithRecover did not return after ctx cancel") + } +} + +func TestLivenessRegistry(t *testing.T) { + // Heartbeat records; LastTick reads back. + before := time.Now() + Heartbeat("testsubsys-A") + after := time.Now() + + last := LastTick("testsubsys-A") + if last.Before(before) || last.After(after) { + t.Errorf("LastTick=%v outside [%v, %v]", last, before, after) + } + + // Unknown subsystem → zero time. + if !LastTick("nonexistent-subsys").IsZero() { + t.Errorf("LastTick for unknown subsystem should be zero") + } + + // IsHealthy: fresh heartbeat → healthy; stale → not healthy. + Heartbeat("testsubsys-B") + healthy, stale := IsHealthy([]string{"testsubsys-A", "testsubsys-B"}, time.Minute) + if !healthy || len(stale) != 0 { + t.Errorf("expected healthy, got healthy=%v stale=%v", healthy, stale) + } + + // Force staleness by asking for an impossibly tight threshold. + time.Sleep(10 * time.Millisecond) + healthy, stale = IsHealthy([]string{"testsubsys-A"}, time.Nanosecond) + if healthy || len(stale) != 1 { + t.Errorf("expected stale testsubsys-A, got healthy=%v stale=%v", healthy, stale) + } +} + +func TestSnapshotIsCopy(t *testing.T) { + Heartbeat("snap-test") + s1 := Snapshot() + // Mutating the returned map must not affect the registry. + s1["snap-test"] = time.Time{} + if LastTick("snap-test").IsZero() { + t.Errorf("Snapshot returned a live reference; should be a copy") + } +}