forked from molecule-ai/molecule-core
Merge pull request #2884 from Molecule-AI/feat/phantom-busy-reset-metric-2865-1777976000
feat(metrics): add molecule_phantom_busy_resets_total counter (#2865)
This commit is contained in:
commit
529c3f3922
@ -76,6 +76,21 @@ func TrackWSConnect() { atomic.AddInt64(&activeWSConns, 1) }
|
|||||||
// Call from the WebSocket disconnect / cleanup path.
|
// Call from the WebSocket disconnect / cleanup path.
|
||||||
func TrackWSDisconnect() { atomic.AddInt64(&activeWSConns, -1) }
|
func TrackWSDisconnect() { atomic.AddInt64(&activeWSConns, -1) }
|
||||||
|
|
||||||
|
// phantomBusyResets is the cumulative count of workspace rows the
|
||||||
|
// phantom-busy sweep reset (active_tasks=0 → active_tasks=0+counter
|
||||||
|
// cleared). Surfaced as molecule_phantom_busy_resets_total — a high
|
||||||
|
// reset rate signals a regression in task-lifecycle accounting (most
|
||||||
|
// often: missing env vars cause claude --print to time out, the
|
||||||
|
// agent loop never decrements active_tasks, and the sweep cleans up
|
||||||
|
// the counter ~10 min later). Issue #2865.
|
||||||
|
var phantomBusyResets int64
|
||||||
|
|
||||||
|
// TrackPhantomBusyReset increments the phantom-busy reset counter.
|
||||||
|
// Called from sweepPhantomBusy in workspace-server/internal/scheduler/
|
||||||
|
// after each row whose active_tasks was reset to 0. Idempotent +
|
||||||
|
// goroutine-safe; called once per row per sweep tick.
|
||||||
|
func TrackPhantomBusyReset() { atomic.AddInt64(&phantomBusyResets, 1) }
|
||||||
|
|
||||||
// Handler returns a Gin handler that serialises all collected metrics in
|
// Handler returns a Gin handler that serialises all collected metrics in
|
||||||
// Prometheus text exposition format (v0.0.4). Mount this at GET /metrics.
|
// Prometheus text exposition format (v0.0.4). Mount this at GET /metrics.
|
||||||
func Handler() gin.HandlerFunc {
|
func Handler() gin.HandlerFunc {
|
||||||
@ -144,6 +159,11 @@ func Handler() gin.HandlerFunc {
|
|||||||
writeln(w, "# HELP molecule_websocket_connections_active Number of active WebSocket connections.")
|
writeln(w, "# HELP molecule_websocket_connections_active Number of active WebSocket connections.")
|
||||||
writeln(w, "# TYPE molecule_websocket_connections_active gauge")
|
writeln(w, "# TYPE molecule_websocket_connections_active gauge")
|
||||||
fmt.Fprintf(w, "molecule_websocket_connections_active %d\n", atomic.LoadInt64(&activeWSConns))
|
fmt.Fprintf(w, "molecule_websocket_connections_active %d\n", atomic.LoadInt64(&activeWSConns))
|
||||||
|
|
||||||
|
// ── Molecule AI scheduler ──────────────────────────────────────────────
|
||||||
|
writeln(w, "# HELP molecule_phantom_busy_resets_total Cumulative count of workspace rows reset by the phantom-busy sweep (active_tasks cleared after >10 min of activity_log silence). High reset rate signals task-lifecycle accounting regressions — see issue #2865.")
|
||||||
|
writeln(w, "# TYPE molecule_phantom_busy_resets_total counter")
|
||||||
|
fmt.Fprintf(w, "molecule_phantom_busy_resets_total %d\n", atomic.LoadInt64(&phantomBusyResets))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
104
workspace-server/internal/metrics/metrics_test.go
Normal file
104
workspace-server/internal/metrics/metrics_test.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
// Tests for the phantom-busy reset counter wired up by issue #2865.
|
||||||
|
// The counter is exposed at /metrics as
|
||||||
|
// molecule_phantom_busy_resets_total. A high steady-state value
|
||||||
|
// signals task-lifecycle accounting regressions in the agent loop —
|
||||||
|
// see scheduler.sweepPhantomBusy for the writer.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// resetForTest zeroes the counter so a single test's TrackPhantomBusyReset
|
||||||
|
// calls don't compound onto a previous test's run. metrics.go's package-
|
||||||
|
// level state means every test that touches the counter must reset.
|
||||||
|
func resetForTest() {
|
||||||
|
atomic.StoreInt64(&phantomBusyResets, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrackPhantomBusyReset_IncrementsCounter(t *testing.T) {
|
||||||
|
resetForTest()
|
||||||
|
for i := 0; i < 7; i++ {
|
||||||
|
TrackPhantomBusyReset()
|
||||||
|
}
|
||||||
|
got := atomic.LoadInt64(&phantomBusyResets)
|
||||||
|
if got != 7 {
|
||||||
|
t.Errorf("counter after 7 calls = %d, want 7", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrackPhantomBusyReset_RaceFreeUnderConcurrentWrites(t *testing.T) {
|
||||||
|
resetForTest()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
const goroutines = 50
|
||||||
|
const callsPerGoroutine = 200
|
||||||
|
wg.Add(goroutines)
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < callsPerGoroutine; j++ {
|
||||||
|
TrackPhantomBusyReset()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
want := int64(goroutines * callsPerGoroutine)
|
||||||
|
got := atomic.LoadInt64(&phantomBusyResets)
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("counter under concurrent writes = %d, want %d (lost increments → atomic broken)",
|
||||||
|
got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandler_ExposesPhantomBusyResetsCounter(t *testing.T) {
|
||||||
|
resetForTest()
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
TrackPhantomBusyReset()
|
||||||
|
}
|
||||||
|
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
r := gin.New()
|
||||||
|
r.GET("/metrics", Handler())
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
body := w.Body.String()
|
||||||
|
// HELP + TYPE lines must precede the metric (Prometheus text exposition format).
|
||||||
|
if !strings.Contains(body, "# HELP molecule_phantom_busy_resets_total") {
|
||||||
|
t.Errorf("metrics output missing HELP line for molecule_phantom_busy_resets_total:\n%s", body)
|
||||||
|
}
|
||||||
|
if !strings.Contains(body, "# TYPE molecule_phantom_busy_resets_total counter") {
|
||||||
|
t.Errorf("metrics output missing TYPE line for molecule_phantom_busy_resets_total:\n%s", body)
|
||||||
|
}
|
||||||
|
if !strings.Contains(body, "molecule_phantom_busy_resets_total 3\n") {
|
||||||
|
t.Errorf("metrics output missing counter value 3:\n%s", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandler_PhantomBusyResetsZeroByDefault(t *testing.T) {
|
||||||
|
// Fresh process should report 0 — pin the contract so a future
|
||||||
|
// refactor that lazy-inits the counter to nil doesn't silently
|
||||||
|
// drop the metric from /metrics.
|
||||||
|
resetForTest()
|
||||||
|
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
r := gin.New()
|
||||||
|
r.GET("/metrics", Handler())
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||||
|
r.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if !strings.Contains(w.Body.String(), "molecule_phantom_busy_resets_total 0\n") {
|
||||||
|
t.Errorf("metric must report 0 by default:\n%s", w.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -14,6 +14,7 @@ import (
|
|||||||
cronlib "github.com/robfig/cron/v3"
|
cronlib "github.com/robfig/cron/v3"
|
||||||
|
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
||||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -741,6 +742,11 @@ func (s *Scheduler) sweepPhantomBusy(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes()))
|
log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes()))
|
||||||
|
// #2865: surface as molecule_phantom_busy_resets_total. High
|
||||||
|
// reset rate signals task-lifecycle accounting regressions
|
||||||
|
// (e.g. missing env vars causing claude --print timeouts that
|
||||||
|
// leave active_tasks elevated until this sweep fires).
|
||||||
|
metrics.TrackPhantomBusyReset()
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user