diff --git a/workspace-server/internal/metrics/metrics.go b/workspace-server/internal/metrics/metrics.go index 7f0852a8..77f72572 100644 --- a/workspace-server/internal/metrics/metrics.go +++ b/workspace-server/internal/metrics/metrics.go @@ -76,6 +76,21 @@ func TrackWSConnect() { atomic.AddInt64(&activeWSConns, 1) } // Call from the WebSocket disconnect / cleanup path. func TrackWSDisconnect() { atomic.AddInt64(&activeWSConns, -1) } +// phantomBusyResets is the cumulative count of workspace rows the +// phantom-busy sweep reset (active_tasks=0 → active_tasks=0+counter +// cleared). Surfaced as molecule_phantom_busy_resets_total — a high +// reset rate signals a regression in task-lifecycle accounting (most +// often: missing env vars cause claude --print to time out, the +// agent loop never decrements active_tasks, and the sweep cleans up +// the counter ~10 min later). Issue #2865. +var phantomBusyResets int64 + +// TrackPhantomBusyReset increments the phantom-busy reset counter. +// Called from sweepPhantomBusy in workspace-server/internal/scheduler/ +// after each row whose active_tasks was reset to 0. Idempotent + +// goroutine-safe; called once per row per sweep tick. +func TrackPhantomBusyReset() { atomic.AddInt64(&phantomBusyResets, 1) } + // Handler returns a Gin handler that serialises all collected metrics in // Prometheus text exposition format (v0.0.4). Mount this at GET /metrics. func Handler() gin.HandlerFunc { @@ -144,6 +159,11 @@ func Handler() gin.HandlerFunc { writeln(w, "# HELP molecule_websocket_connections_active Number of active WebSocket connections.") writeln(w, "# TYPE molecule_websocket_connections_active gauge") fmt.Fprintf(w, "molecule_websocket_connections_active %d\n", atomic.LoadInt64(&activeWSConns)) + + // ── Molecule AI scheduler ────────────────────────────────────────────── + writeln(w, "# HELP molecule_phantom_busy_resets_total Cumulative count of workspace rows reset by the phantom-busy sweep (active_tasks cleared after >10 min of activity_log silence). High reset rate signals task-lifecycle accounting regressions — see issue #2865.") + writeln(w, "# TYPE molecule_phantom_busy_resets_total counter") + fmt.Fprintf(w, "molecule_phantom_busy_resets_total %d\n", atomic.LoadInt64(&phantomBusyResets)) } } diff --git a/workspace-server/internal/metrics/metrics_test.go b/workspace-server/internal/metrics/metrics_test.go new file mode 100644 index 00000000..d722a1bd --- /dev/null +++ b/workspace-server/internal/metrics/metrics_test.go @@ -0,0 +1,104 @@ +package metrics + +// Tests for the phantom-busy reset counter wired up by issue #2865. +// The counter is exposed at /metrics as +// molecule_phantom_busy_resets_total. A high steady-state value +// signals task-lifecycle accounting regressions in the agent loop — +// see scheduler.sweepPhantomBusy for the writer. + +import ( + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/gin-gonic/gin" +) + +// resetForTest zeroes the counter so a single test's TrackPhantomBusyReset +// calls don't compound onto a previous test's run. metrics.go's package- +// level state means every test that touches the counter must reset. +func resetForTest() { + atomic.StoreInt64(&phantomBusyResets, 0) +} + +func TestTrackPhantomBusyReset_IncrementsCounter(t *testing.T) { + resetForTest() + for i := 0; i < 7; i++ { + TrackPhantomBusyReset() + } + got := atomic.LoadInt64(&phantomBusyResets) + if got != 7 { + t.Errorf("counter after 7 calls = %d, want 7", got) + } +} + +func TestTrackPhantomBusyReset_RaceFreeUnderConcurrentWrites(t *testing.T) { + resetForTest() + var wg sync.WaitGroup + const goroutines = 50 + const callsPerGoroutine = 200 + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < callsPerGoroutine; j++ { + TrackPhantomBusyReset() + } + }() + } + wg.Wait() + want := int64(goroutines * callsPerGoroutine) + got := atomic.LoadInt64(&phantomBusyResets) + if got != want { + t.Errorf("counter under concurrent writes = %d, want %d (lost increments → atomic broken)", + got, want) + } +} + +func TestHandler_ExposesPhantomBusyResetsCounter(t *testing.T) { + resetForTest() + for i := 0; i < 3; i++ { + TrackPhantomBusyReset() + } + + gin.SetMode(gin.TestMode) + r := gin.New() + r.GET("/metrics", Handler()) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + r.ServeHTTP(w, req) + + body := w.Body.String() + // HELP + TYPE lines must precede the metric (Prometheus text exposition format). + if !strings.Contains(body, "# HELP molecule_phantom_busy_resets_total") { + t.Errorf("metrics output missing HELP line for molecule_phantom_busy_resets_total:\n%s", body) + } + if !strings.Contains(body, "# TYPE molecule_phantom_busy_resets_total counter") { + t.Errorf("metrics output missing TYPE line for molecule_phantom_busy_resets_total:\n%s", body) + } + if !strings.Contains(body, "molecule_phantom_busy_resets_total 3\n") { + t.Errorf("metrics output missing counter value 3:\n%s", body) + } +} + +func TestHandler_PhantomBusyResetsZeroByDefault(t *testing.T) { + // Fresh process should report 0 — pin the contract so a future + // refactor that lazy-inits the counter to nil doesn't silently + // drop the metric from /metrics. + resetForTest() + + gin.SetMode(gin.TestMode) + r := gin.New() + r.GET("/metrics", Handler()) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + r.ServeHTTP(w, req) + + if !strings.Contains(w.Body.String(), "molecule_phantom_busy_resets_total 0\n") { + t.Errorf("metric must report 0 by default:\n%s", w.Body.String()) + } +} diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 0c6eb84f..e098586d 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -14,6 +14,7 @@ import ( cronlib "github.com/robfig/cron/v3" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" ) @@ -741,6 +742,11 @@ func (s *Scheduler) sweepPhantomBusy(ctx context.Context) { continue } log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes())) + // #2865: surface as molecule_phantom_busy_resets_total. High + // reset rate signals task-lifecycle accounting regressions + // (e.g. missing env vars causing claude --print timeouts that + // leave active_tasks elevated until this sweep fires). + metrics.TrackPhantomBusyReset() count++ } if err := rows.Err(); err != nil {