Merge pull request #95 from Molecule-AI/fix/supervised-goroutines
fix(platform): panic-recovering supervisor for every background goroutine (#92)
This commit is contained in:
commit
0c7d84d6ce
@ -20,6 +20,7 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
)
|
||||
|
||||
@ -67,7 +68,12 @@ func main() {
|
||||
// Start Redis pub/sub subscriber
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go broadcaster.Subscribe(ctx)
|
||||
// Every long-running subsystem below is wrapped by supervised.RunWithRecover:
|
||||
// a panic (e.g. from a single bad tenant row) is logged + the subsystem is
|
||||
// restarted with exponential backoff instead of silently dying forever.
|
||||
// Motivation: issue #85 (scheduler silent outage for 12+ hours) and #92
|
||||
// (systemic — affects every background goroutine).
|
||||
go supervised.RunWithRecover(ctx, "broadcaster", broadcaster.Subscribe)
|
||||
|
||||
// Activity log retention — configurable via env vars
|
||||
retentionDays := envOr("ACTIVITY_RETENTION_DAYS", "7")
|
||||
@ -122,21 +128,25 @@ func main() {
|
||||
}
|
||||
|
||||
// Start Liveness Monitor — Redis TTL expiry-based offline detection + auto-restart
|
||||
go registry.StartLivenessMonitor(ctx, onWorkspaceOffline)
|
||||
go supervised.RunWithRecover(ctx, "liveness-monitor", func(c context.Context) {
|
||||
registry.StartLivenessMonitor(c, onWorkspaceOffline)
|
||||
})
|
||||
|
||||
// Proactive container health sweep — detects dead containers faster than Redis TTL.
|
||||
// Checks all "online" workspaces against Docker every 15 seconds.
|
||||
if prov != nil {
|
||||
go registry.StartHealthSweep(ctx, prov, 15*time.Second, onWorkspaceOffline)
|
||||
go supervised.RunWithRecover(ctx, "health-sweep", func(c context.Context) {
|
||||
registry.StartHealthSweep(c, prov, 15*time.Second, onWorkspaceOffline)
|
||||
})
|
||||
}
|
||||
|
||||
// Cron Scheduler — fires A2A messages to workspaces on user-defined schedules
|
||||
cronSched := scheduler.New(wh, broadcaster)
|
||||
go cronSched.Start(ctx)
|
||||
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
|
||||
|
||||
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
|
||||
channelMgr := channels.NewManager(wh, broadcaster)
|
||||
go channelMgr.Start(ctx)
|
||||
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
||||
|
||||
// Router
|
||||
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/gin-contrib/cors"
|
||||
@ -63,6 +64,23 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
c.JSON(200, gin.H{"status": "ok"})
|
||||
})
|
||||
|
||||
// /admin/liveness — per-subsystem last-tick timestamps. Operators read this
|
||||
// to catch stuck-but-not-crashed goroutines (the failure mode that caused
|
||||
// the 12h scheduler outage of 2026-04-14, issue #85). Any subsystem whose
|
||||
// last tick is older than 2× its expected interval is stale.
|
||||
r.GET("/admin/liveness", func(c *gin.Context) {
|
||||
snap := supervised.Snapshot()
|
||||
out := make(map[string]interface{}, len(snap))
|
||||
now := time.Now()
|
||||
for name, last := range snap {
|
||||
out[name] = gin.H{
|
||||
"last_tick_at": last,
|
||||
"seconds_ago": int(now.Sub(last).Seconds()),
|
||||
}
|
||||
}
|
||||
c.JSON(200, gin.H{"subsystems": out})
|
||||
})
|
||||
|
||||
// Prometheus metrics — exempt from rate limiter via separate registration
|
||||
// (registered before Use(limiter) takes effect on this specific route — the
|
||||
// middleware.Middleware() still records it for observability).
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
cronlib "github.com/robfig/cron/v3"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -105,8 +106,9 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Mark a tick immediately on startup so Healthy() returns true before
|
||||
// the first ticker fires (avoids spurious unhealthy on fresh start).
|
||||
// Heartbeat + initial lastTickAt so /admin/liveness and Healthy() both
|
||||
// pass during the first 30s interval after startup.
|
||||
supervised.Heartbeat("scheduler")
|
||||
s.mu.Lock()
|
||||
s.lastTickAt = time.Now()
|
||||
s.mu.Unlock()
|
||||
@ -118,6 +120,7 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
tickWithRecover()
|
||||
supervised.Heartbeat("scheduler")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -125,7 +128,16 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
// tick queries all due schedules and fires each in a goroutine.
|
||||
// Waits for all goroutines to finish before returning so the next tick
|
||||
// doesn't re-fire schedules whose next_run_at hasn't been updated yet.
|
||||
//
|
||||
// Heartbeat is called at three points to keep /admin/liveness fresh during
|
||||
// long-running fires (some prompts take minutes — without these heartbeats
|
||||
// the scheduler looks "stale" the whole time it's working):
|
||||
// - immediately on entering tick (proves we're past the ticker.C wait)
|
||||
// - inside each per-fire goroutine (every fire bumps the heartbeat)
|
||||
// - implicitly via the post-tick heartbeat in Start()
|
||||
func (s *Scheduler) tick(ctx context.Context) {
|
||||
supervised.Heartbeat("scheduler")
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, workspace_id, name, cron_expr, timezone, prompt
|
||||
FROM workspace_schedules
|
||||
@ -158,7 +170,9 @@ func (s *Scheduler) tick(ctx context.Context) {
|
||||
s2.Name, s2.WorkspaceID, r)
|
||||
}
|
||||
}()
|
||||
supervised.Heartbeat("scheduler")
|
||||
s.fireSchedule(ctx, s2)
|
||||
supervised.Heartbeat("scheduler")
|
||||
}(sched)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
|
||||
142
platform/internal/supervised/supervised.go
Normal file
142
platform/internal/supervised/supervised.go
Normal file
@ -0,0 +1,142 @@
|
||||
// Package supervised provides a panic-recovering supervisor for long-running
|
||||
// background goroutines on the platform. Every "go X.Start(ctx)" invocation
|
||||
// in main.go should go through [RunWithRecover] so a single panic from one
|
||||
// tenant's data cannot silently kill a subsystem that serves every tenant.
|
||||
//
|
||||
// Incident that motivated this (issue #85, 2026-04-14):
|
||||
//
|
||||
// The cron scheduler goroutine died silently at 14:21 UTC and stayed dead
|
||||
// for 12+ hours. Platform restart didn't recover it. Root cause: no
|
||||
// defer recover() in the tick loop. Observable signals (HTTP 200, container
|
||||
// healthy, DB healthy) all stayed green — only the subsystem was dead.
|
||||
//
|
||||
// In a multi-tenant SaaS deployment the blast radius is every tenant
|
||||
// simultaneously, which is exactly the class of failure we cannot afford.
|
||||
package supervised
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Default backoff bounds for RunWithRecover restarts.
|
||||
const (
|
||||
initialBackoff = 1 * time.Second
|
||||
maxBackoff = 30 * time.Second
|
||||
)
|
||||
|
||||
// RunWithRecover runs fn in a recover wrapper. If fn panics, the panic is
|
||||
// logged with its stack trace and fn is restarted after an exponential
|
||||
// backoff (capped at maxBackoff). The loop exits cleanly when ctx is done.
|
||||
//
|
||||
// fn is expected to be a long-running loop (e.g. "for { select { ticker ... } }").
|
||||
// If fn returns without panicking (e.g. ctx.Done), RunWithRecover returns.
|
||||
//
|
||||
// go supervised.RunWithRecover(ctx, "scheduler", func(c context.Context) {
|
||||
// scheduler.Start(c)
|
||||
// })
|
||||
//
|
||||
// name is used in log lines and by the liveness registry below.
|
||||
func RunWithRecover(ctx context.Context, name string, fn func(context.Context)) {
|
||||
backoff := initialBackoff
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("supervised[%s]: context done; stopping", name)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
panicked := runOnce(ctx, name, fn)
|
||||
|
||||
// Clean return → the goroutine decided to stop (likely ctx.Done inside fn).
|
||||
// Don't restart.
|
||||
if !panicked {
|
||||
log.Printf("supervised[%s]: returned cleanly; not restarting", name)
|
||||
return
|
||||
}
|
||||
|
||||
// Panic → back off and restart.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runOnce invokes fn with recover. Returns true iff fn panicked.
|
||||
func runOnce(ctx context.Context, name string, fn func(context.Context)) (panicked bool) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicked = true
|
||||
log.Printf("supervised[%s]: PANIC recovered: %v\n%s", name, r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
fn(ctx)
|
||||
return false
|
||||
}
|
||||
|
||||
// --- Liveness registry -----------------------------------------------------
|
||||
//
|
||||
// Each subsystem calls Heartbeat(name) at the end of each tick / iteration.
|
||||
// Operators read the registry via /admin/liveness to detect stuck-but-not-
|
||||
// crashed subsystems (e.g. a tick that deadlocks without panicking).
|
||||
|
||||
var (
|
||||
livenessMu sync.RWMutex
|
||||
lastTicks = map[string]time.Time{}
|
||||
)
|
||||
|
||||
// Heartbeat records that subsystem `name` is alive as of now.
|
||||
func Heartbeat(name string) {
|
||||
livenessMu.Lock()
|
||||
lastTicks[name] = time.Now()
|
||||
livenessMu.Unlock()
|
||||
}
|
||||
|
||||
// LastTick returns the wall-clock time of the most recent Heartbeat for
|
||||
// subsystem `name`. Returns the zero time if the subsystem has never
|
||||
// heartbeated.
|
||||
func LastTick(name string) time.Time {
|
||||
livenessMu.RLock()
|
||||
defer livenessMu.RUnlock()
|
||||
return lastTicks[name]
|
||||
}
|
||||
|
||||
// Snapshot returns a copy of every subsystem's last-tick time, for admin
|
||||
// endpoints.
|
||||
func Snapshot() map[string]time.Time {
|
||||
livenessMu.RLock()
|
||||
defer livenessMu.RUnlock()
|
||||
out := make(map[string]time.Time, len(lastTicks))
|
||||
for k, v := range lastTicks {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// IsHealthy returns true iff every subsystem in `expected` has tickled
|
||||
// within `staleThreshold` ago. Use from /health (or a strict variant of it)
|
||||
// to surface stuck subsystems to an external orchestrator.
|
||||
func IsHealthy(expected []string, staleThreshold time.Duration) (healthy bool, stale []string) {
|
||||
livenessMu.RLock()
|
||||
defer livenessMu.RUnlock()
|
||||
now := time.Now()
|
||||
for _, name := range expected {
|
||||
last, ok := lastTicks[name]
|
||||
if !ok || now.Sub(last) > staleThreshold {
|
||||
stale = append(stale, name)
|
||||
}
|
||||
}
|
||||
return len(stale) == 0, stale
|
||||
}
|
||||
120
platform/internal/supervised/supervised_test.go
Normal file
120
platform/internal/supervised/supervised_test.go
Normal file
@ -0,0 +1,120 @@
|
||||
package supervised
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRunWithRecover_CleanReturnDoesNotRestart(t *testing.T) {
|
||||
var calls int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
RunWithRecover(ctx, "clean", func(c context.Context) {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
// Return immediately — no panic, not blocked on ctx.
|
||||
})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("RunWithRecover did not return after clean fn exit")
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt32(&calls); got != 1 {
|
||||
t.Errorf("fn called %d times on clean return; want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunWithRecover_PanicRestartsWithBackoff(t *testing.T) {
|
||||
var calls int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go RunWithRecover(ctx, "panic-test", func(c context.Context) {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
if atomic.LoadInt32(&calls) < 3 {
|
||||
panic("deliberate")
|
||||
}
|
||||
// On 3rd call, wait for ctx.Done so we can inspect calls cleanly.
|
||||
<-c.Done()
|
||||
})
|
||||
|
||||
// Give it time to panic + restart at least twice (1s + 2s backoffs).
|
||||
time.Sleep(4 * time.Second)
|
||||
cancel()
|
||||
|
||||
got := atomic.LoadInt32(&calls)
|
||||
if got < 3 {
|
||||
t.Errorf("fn called %d times after 4s of restarts; want >= 3", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunWithRecover_CtxDoneStopsRestart(t *testing.T) {
|
||||
var calls int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
RunWithRecover(ctx, "ctx-done", func(c context.Context) {
|
||||
atomic.AddInt32(&calls, 1)
|
||||
panic("always")
|
||||
})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(35 * time.Second):
|
||||
t.Fatal("RunWithRecover did not return after ctx cancel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLivenessRegistry(t *testing.T) {
|
||||
// Heartbeat records; LastTick reads back.
|
||||
before := time.Now()
|
||||
Heartbeat("testsubsys-A")
|
||||
after := time.Now()
|
||||
|
||||
last := LastTick("testsubsys-A")
|
||||
if last.Before(before) || last.After(after) {
|
||||
t.Errorf("LastTick=%v outside [%v, %v]", last, before, after)
|
||||
}
|
||||
|
||||
// Unknown subsystem → zero time.
|
||||
if !LastTick("nonexistent-subsys").IsZero() {
|
||||
t.Errorf("LastTick for unknown subsystem should be zero")
|
||||
}
|
||||
|
||||
// IsHealthy: fresh heartbeat → healthy; stale → not healthy.
|
||||
Heartbeat("testsubsys-B")
|
||||
healthy, stale := IsHealthy([]string{"testsubsys-A", "testsubsys-B"}, time.Minute)
|
||||
if !healthy || len(stale) != 0 {
|
||||
t.Errorf("expected healthy, got healthy=%v stale=%v", healthy, stale)
|
||||
}
|
||||
|
||||
// Force staleness by asking for an impossibly tight threshold.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
healthy, stale = IsHealthy([]string{"testsubsys-A"}, time.Nanosecond)
|
||||
if healthy || len(stale) != 1 {
|
||||
t.Errorf("expected stale testsubsys-A, got healthy=%v stale=%v", healthy, stale)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotIsCopy(t *testing.T) {
|
||||
Heartbeat("snap-test")
|
||||
s1 := Snapshot()
|
||||
// Mutating the returned map must not affect the registry.
|
||||
s1["snap-test"] = time.Time{}
|
||||
if LastTick("snap-test").IsZero() {
|
||||
t.Errorf("Snapshot returned a live reference; should be a copy")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user