molecule-core/workspace-server/internal/metrics/metrics.go
Hongming Wang a327d207da feat(rfc): poll-mode chat upload — phase 3 GC sweep + observability
Phase 3 of the poll-mode chat upload rollout. Stack atop Phase 2.

The platform's pending_uploads table grows once-per-uploaded-file with
no built-in cleanup. Phase 1's hard TTL (expires_at default 24h) makes
expired rows un-fetchable but doesn't actually delete them; Phase 1's
ack stamps acked_at but leaves the row indefinitely. Without a sweep
the table grows unbounded across normal traffic.

This PR adds:

- `Storage.Sweep(ctx, ackRetention)` — a single round-trip CTE that
  deletes acked rows past their retention window plus unacked rows
  past expires_at. Returns `(acked, expired)` deletion counts so
  Phase 3 dashboards can spot the stuck-fetch pattern (high expired,
  low acked) vs healthy churn.
- `pendinguploads.StartSweeper(ctx, storage, ackRetention)` —
  background goroutine that calls Sweep every 5 minutes (default).
  Runs once immediately on startup so a platform restart cleans up
  any rows that became eligible while we were down.
- Prometheus counters `molecule_pending_uploads_swept_total` with
  `outcome={acked,expired,error}` labels. Wired into the existing
  `/metrics` endpoint.
- Wired from cmd/server/main.go via supervised.RunWithRecover —
  one transient panic doesn't take the platform down with it.

Defaults:
  - SweepInterval = 5m (matches the dashboard refresh cadence)
  - DefaultAckRetention = 1h (gives the workspace at-least-once retry
    headroom in case it processed but failed to write the file before
    crashing)

Test coverage: 100% on storage_test.go (extended with sweepSQL pin +
six Sweep test cases including negative-retention clamp + zero-retention
immediate-delete + DB error wrapping) and sweeper_test.go (ticker-driven
+ ctx-cancel + nil-storage + transient-error-doesn't-crash + metric
counter assertions).

Closes the third of four phases tracked on the parent RFC; phase 4 is
the staging E2E test.
2026-05-05 05:00:13 -07:00

219 lines
9.4 KiB
Go

// Package metrics provides a lightweight Prometheus-format metrics endpoint
// for the Molecule AI platform. It requires no external dependencies — all
// serialization is done against the Prometheus text exposition format (v0.0.4)
// using the Go standard library.
//
// Exposed metrics:
//
// molecule_http_requests_total{method,path,status} - counter
// molecule_http_request_duration_seconds{method,path} - counter (sum, for avg rate)
// molecule_websocket_connections_active - gauge
// molecule_pending_uploads_swept_total{outcome} - counter (acked|expired|error)
// go_goroutines - gauge
// go_memstats_alloc_bytes - gauge
// go_memstats_sys_bytes - gauge
// go_memstats_heap_inuse_bytes - gauge
// go_gc_duration_seconds_total - counter
package metrics
import (
"fmt"
"net/http"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
)
// reqKey indexes per-route request counts and latency sums.
type reqKey struct {
method string
path string
status int
}
var (
mu sync.RWMutex
reqCounts = map[reqKey]int64{} // molecule_http_requests_total
reqDurSums = map[reqKey]float64{} // sum of durations (seconds)
activeWSConns int64 // molecule_websocket_connections_active
// pendinguploads sweeper counters — atomic so the sweeper goroutine
// doesn't contend with the /metrics handler.
pendingUploadsSweptAcked int64 // molecule_pending_uploads_swept_total{outcome="acked"}
pendingUploadsSweptExpired int64 // molecule_pending_uploads_swept_total{outcome="expired"}
pendingUploadsSweepErrors int64 // molecule_pending_uploads_swept_total{outcome="error"}
)
// Middleware records per-request counts and latency.
// Register this before route handlers in the Gin engine.
func Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
duration := time.Since(start).Seconds()
// Use the matched route pattern (e.g. "/workspaces/:id") so high-cardinality
// workspace UUIDs don't explode the label space.
path := c.FullPath()
if path == "" {
path = "unmatched"
}
k := reqKey{
method: c.Request.Method,
path: path,
status: c.Writer.Status(),
}
mu.Lock()
reqCounts[k]++
reqDurSums[k] += duration
mu.Unlock()
}
}
// TrackWSConnect increments the active WebSocket connections gauge.
// Call from the WebSocket upgrade handler after a successful upgrade.
func TrackWSConnect() { atomic.AddInt64(&activeWSConns, 1) }
// TrackWSDisconnect decrements the active WebSocket connections gauge.
// Call from the WebSocket disconnect / cleanup path.
func TrackWSDisconnect() { atomic.AddInt64(&activeWSConns, -1) }
// phantomBusyResets is the cumulative count of workspace rows the
// phantom-busy sweep reset (active_tasks=0 → active_tasks=0+counter
// cleared). Surfaced as molecule_phantom_busy_resets_total — a high
// reset rate signals a regression in task-lifecycle accounting (most
// often: missing env vars cause claude --print to time out, the
// agent loop never decrements active_tasks, and the sweep cleans up
// the counter ~10 min later). Issue #2865.
var phantomBusyResets int64
// TrackPhantomBusyReset increments the phantom-busy reset counter.
// Called from sweepPhantomBusy in workspace-server/internal/scheduler/
// after each row whose active_tasks was reset to 0. Idempotent +
// goroutine-safe; called once per row per sweep tick.
func TrackPhantomBusyReset() { atomic.AddInt64(&phantomBusyResets, 1) }
// PendingUploadsSwept records a successful sweep cycle. acked/expired
// are added to the per-outcome counters so dashboards can spot the
// stuck-fetch pattern (high expired, low acked) vs healthy churn.
func PendingUploadsSwept(acked, expired int) {
if acked > 0 {
atomic.AddInt64(&pendingUploadsSweptAcked, int64(acked))
}
if expired > 0 {
atomic.AddInt64(&pendingUploadsSweptExpired, int64(expired))
}
}
// PendingUploadsSweepError records a sweeper-cycle failure (transient
// DB error etc). Counted separately so the rate of errored sweeps is
// observable independent of how many rows the successful sweeps deleted.
func PendingUploadsSweepError() {
atomic.AddInt64(&pendingUploadsSweepErrors, 1)
}
// PendingUploadsSweepCounts returns the current (acked, expired, error)
// totals. Exposed for tests that need a deterministic delta probe of
// the sweeper's metric writes — the /metrics endpoint is the production
// observability surface; this is a unit-test escape hatch.
func PendingUploadsSweepCounts() (acked, expired, errored int64) {
return atomic.LoadInt64(&pendingUploadsSweptAcked),
atomic.LoadInt64(&pendingUploadsSweptExpired),
atomic.LoadInt64(&pendingUploadsSweepErrors)
}
// Handler returns a Gin handler that serialises all collected metrics in
// Prometheus text exposition format (v0.0.4). Mount this at GET /metrics.
func Handler() gin.HandlerFunc {
return func(c *gin.Context) {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
c.Header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
w := c.Writer
w.WriteHeader(http.StatusOK)
// ── Go runtime ─────────────────────────────────────────────────────
writeln(w, "# HELP go_goroutines Number of goroutines currently running.")
writeln(w, "# TYPE go_goroutines gauge")
fmt.Fprintf(w, "go_goroutines %d\n", runtime.NumGoroutine())
writeln(w, "# HELP go_memstats_alloc_bytes Bytes of allocated heap objects.")
writeln(w, "# TYPE go_memstats_alloc_bytes gauge")
fmt.Fprintf(w, "go_memstats_alloc_bytes %d\n", ms.Alloc)
writeln(w, "# HELP go_memstats_sys_bytes Total bytes of memory obtained from the OS.")
writeln(w, "# TYPE go_memstats_sys_bytes gauge")
fmt.Fprintf(w, "go_memstats_sys_bytes %d\n", ms.Sys)
writeln(w, "# HELP go_memstats_heap_inuse_bytes Bytes in in-use heap spans.")
writeln(w, "# TYPE go_memstats_heap_inuse_bytes gauge")
fmt.Fprintf(w, "go_memstats_heap_inuse_bytes %d\n", ms.HeapInuse)
writeln(w, "# HELP go_gc_duration_seconds_total Cumulative GC pause time.")
writeln(w, "# TYPE go_gc_duration_seconds_total counter")
fmt.Fprintf(w, "go_gc_duration_seconds_total %g\n", float64(ms.PauseTotalNs)/1e9)
// ── Molecule AI HTTP ───────────────────────────────────────────────────
writeln(w, "# HELP molecule_http_requests_total Total HTTP requests served, by method, path, and status.")
writeln(w, "# TYPE molecule_http_requests_total counter")
writeln(w, "# HELP molecule_http_request_duration_seconds_total Cumulative HTTP request duration in seconds.")
writeln(w, "# TYPE molecule_http_request_duration_seconds_total counter")
// Snapshot under lock, then write unlocked (avoids holding lock during slow HTTP writes)
mu.RLock()
countsCopy := make(map[reqKey]int64, len(reqCounts))
for k, v := range reqCounts {
countsCopy[k] = v
}
durCopy := make(map[reqKey]float64, len(reqDurSums))
for k, v := range reqDurSums {
durCopy[k] = v
}
mu.RUnlock()
for k, count := range countsCopy {
fmt.Fprintf(w,
"molecule_http_requests_total{method=%q,path=%q,status=\"%d\"} %d\n",
k.method, k.path, k.status, count,
)
}
for k, sum := range durCopy {
fmt.Fprintf(w,
"molecule_http_request_duration_seconds_total{method=%q,path=%q,status=\"%d\"} %g\n",
k.method, k.path, k.status, sum,
)
}
// ── Molecule AI WebSocket ──────────────────────────────────────────────
writeln(w, "# HELP molecule_websocket_connections_active Number of active WebSocket connections.")
writeln(w, "# TYPE molecule_websocket_connections_active gauge")
fmt.Fprintf(w, "molecule_websocket_connections_active %d\n", atomic.LoadInt64(&activeWSConns))
// ── Molecule AI scheduler ──────────────────────────────────────────────
writeln(w, "# HELP molecule_phantom_busy_resets_total Cumulative count of workspace rows reset by the phantom-busy sweep (active_tasks cleared after >10 min of activity_log silence). High reset rate signals task-lifecycle accounting regressions — see issue #2865.")
writeln(w, "# TYPE molecule_phantom_busy_resets_total counter")
fmt.Fprintf(w, "molecule_phantom_busy_resets_total %d\n", atomic.LoadInt64(&phantomBusyResets))
// ── Pending-uploads sweeper ────────────────────────────────────────────
writeln(w, "# HELP molecule_pending_uploads_swept_total Pending-uploads rows deleted by the GC sweeper, by outcome.")
writeln(w, "# TYPE molecule_pending_uploads_swept_total counter")
fmt.Fprintf(w, "molecule_pending_uploads_swept_total{outcome=\"acked\"} %d\n",
atomic.LoadInt64(&pendingUploadsSweptAcked))
fmt.Fprintf(w, "molecule_pending_uploads_swept_total{outcome=\"expired\"} %d\n",
atomic.LoadInt64(&pendingUploadsSweptExpired))
fmt.Fprintf(w, "molecule_pending_uploads_swept_total{outcome=\"error\"} %d\n",
atomic.LoadInt64(&pendingUploadsSweepErrors))
}
}
func writeln(w http.ResponseWriter, s string) {
fmt.Fprintln(w, s)
}