Merge pull request #2884 from Molecule-AI/feat/phantom-busy-reset-metric-2865-1777976000

feat(metrics): add molecule_phantom_busy_resets_total counter (#2865)
This commit is contained in:
Hongming Wang 2026-05-05 11:50:28 +00:00 committed by GitHub
commit 529c3f3922
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 130 additions and 0 deletions

View File

@ -76,6 +76,21 @@ func TrackWSConnect() { atomic.AddInt64(&activeWSConns, 1) }
// Call from the WebSocket disconnect / cleanup path.
func TrackWSDisconnect() { atomic.AddInt64(&activeWSConns, -1) }
// phantomBusyResets is the cumulative count of workspace rows the
// phantom-busy sweep reset (active_tasks=0 → active_tasks=0+counter
// cleared). Surfaced as molecule_phantom_busy_resets_total — a high
// reset rate signals a regression in task-lifecycle accounting (most
// often: missing env vars cause claude --print to time out, the
// agent loop never decrements active_tasks, and the sweep cleans up
// the counter ~10 min later). Issue #2865.
var phantomBusyResets int64
// TrackPhantomBusyReset increments the phantom-busy reset counter.
// Called from sweepPhantomBusy in workspace-server/internal/scheduler/
// after each row whose active_tasks was reset to 0. Idempotent +
// goroutine-safe; called once per row per sweep tick.
func TrackPhantomBusyReset() { atomic.AddInt64(&phantomBusyResets, 1) }
// Handler returns a Gin handler that serialises all collected metrics in
// Prometheus text exposition format (v0.0.4). Mount this at GET /metrics.
func Handler() gin.HandlerFunc {
@ -144,6 +159,11 @@ func Handler() gin.HandlerFunc {
writeln(w, "# HELP molecule_websocket_connections_active Number of active WebSocket connections.")
writeln(w, "# TYPE molecule_websocket_connections_active gauge")
fmt.Fprintf(w, "molecule_websocket_connections_active %d\n", atomic.LoadInt64(&activeWSConns))
// ── Molecule AI scheduler ──────────────────────────────────────────────
writeln(w, "# HELP molecule_phantom_busy_resets_total Cumulative count of workspace rows reset by the phantom-busy sweep (active_tasks cleared after >10 min of activity_log silence). High reset rate signals task-lifecycle accounting regressions — see issue #2865.")
writeln(w, "# TYPE molecule_phantom_busy_resets_total counter")
fmt.Fprintf(w, "molecule_phantom_busy_resets_total %d\n", atomic.LoadInt64(&phantomBusyResets))
}
}

View File

@ -0,0 +1,104 @@
package metrics
// Tests for the phantom-busy reset counter wired up by issue #2865.
// The counter is exposed at /metrics as
// molecule_phantom_busy_resets_total. A high steady-state value
// signals task-lifecycle accounting regressions in the agent loop —
// see scheduler.sweepPhantomBusy for the writer.
import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/gin-gonic/gin"
)
// resetForTest zeroes the counter so a single test's TrackPhantomBusyReset
// calls don't compound onto a previous test's run. metrics.go's package-
// level state means every test that touches the counter must reset.
func resetForTest() {
atomic.StoreInt64(&phantomBusyResets, 0)
}
func TestTrackPhantomBusyReset_IncrementsCounter(t *testing.T) {
resetForTest()
for i := 0; i < 7; i++ {
TrackPhantomBusyReset()
}
got := atomic.LoadInt64(&phantomBusyResets)
if got != 7 {
t.Errorf("counter after 7 calls = %d, want 7", got)
}
}
func TestTrackPhantomBusyReset_RaceFreeUnderConcurrentWrites(t *testing.T) {
resetForTest()
var wg sync.WaitGroup
const goroutines = 50
const callsPerGoroutine = 200
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for j := 0; j < callsPerGoroutine; j++ {
TrackPhantomBusyReset()
}
}()
}
wg.Wait()
want := int64(goroutines * callsPerGoroutine)
got := atomic.LoadInt64(&phantomBusyResets)
if got != want {
t.Errorf("counter under concurrent writes = %d, want %d (lost increments → atomic broken)",
got, want)
}
}
func TestHandler_ExposesPhantomBusyResetsCounter(t *testing.T) {
resetForTest()
for i := 0; i < 3; i++ {
TrackPhantomBusyReset()
}
gin.SetMode(gin.TestMode)
r := gin.New()
r.GET("/metrics", Handler())
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
r.ServeHTTP(w, req)
body := w.Body.String()
// HELP + TYPE lines must precede the metric (Prometheus text exposition format).
if !strings.Contains(body, "# HELP molecule_phantom_busy_resets_total") {
t.Errorf("metrics output missing HELP line for molecule_phantom_busy_resets_total:\n%s", body)
}
if !strings.Contains(body, "# TYPE molecule_phantom_busy_resets_total counter") {
t.Errorf("metrics output missing TYPE line for molecule_phantom_busy_resets_total:\n%s", body)
}
if !strings.Contains(body, "molecule_phantom_busy_resets_total 3\n") {
t.Errorf("metrics output missing counter value 3:\n%s", body)
}
}
func TestHandler_PhantomBusyResetsZeroByDefault(t *testing.T) {
// Fresh process should report 0 — pin the contract so a future
// refactor that lazy-inits the counter to nil doesn't silently
// drop the metric from /metrics.
resetForTest()
gin.SetMode(gin.TestMode)
r := gin.New()
r.GET("/metrics", Handler())
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
r.ServeHTTP(w, req)
if !strings.Contains(w.Body.String(), "molecule_phantom_busy_resets_total 0\n") {
t.Errorf("metric must report 0 by default:\n%s", w.Body.String())
}
}

View File

@ -14,6 +14,7 @@ import (
cronlib "github.com/robfig/cron/v3"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
)
@ -741,6 +742,11 @@ func (s *Scheduler) sweepPhantomBusy(ctx context.Context) {
continue
}
log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes()))
// #2865: surface as molecule_phantom_busy_resets_total. High
// reset rate signals task-lifecycle accounting regressions
// (e.g. missing env vars causing claude --print timeouts that
// leave active_tasks elevated until this sweep fires).
metrics.TrackPhantomBusyReset()
count++
}
if err := rows.Err(); err != nil {