fix(health): detect ALIVE-but-wedged agents via active>0 + no outbound + null heartbeat (#3057) #3061
@@ -386,6 +386,20 @@ func main() {
|
||||
go wh.RestartByID(workspaceID)
|
||||
}
|
||||
|
||||
// 2026-06-19 a2a RCA (#3057): wedged-agent handler. Initial
|
||||
// implementation is log + broadcast only — auto-restart is a
|
||||
// follow-up gated on ops review (a wedge can mask a busy agent
|
||||
// that's just slow; restarting such an agent loses in-flight
|
||||
// state). The broadcast event lets the canvas flag the wedge
|
||||
// status and operators inspect the tuple.
|
||||
onWorkspaceWedged := func(innerCtx context.Context, workspaceID string) {
|
||||
if err := broadcaster.RecordAndBroadcast(innerCtx, "WORKSPACE_WEDGED", workspaceID, map[string]interface{}{
|
||||
"reason": "active_tasks>0 with no outbound A2A and no heartbeat — alive-but-wedged",
|
||||
}); err != nil {
|
||||
log.Printf("Wedged broadcast error for %s: %v", workspaceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start Liveness Monitor — Redis TTL expiry-based offline detection + auto-restart
|
||||
go supervised.RunWithRecover(ctx, "liveness-monitor", func(c context.Context) {
|
||||
registry.StartLivenessMonitor(c, onWorkspaceOffline)
|
||||
@@ -410,6 +424,16 @@ func main() {
|
||||
registry.StartHealthSweep(c, prov, 15*time.Second, onWorkspaceOffline)
|
||||
})
|
||||
|
||||
// 2026-06-19 a2a RCA (#3057): a separate monitor for the
|
||||
// alive-but-wedged case (active_tasks>0, no outbound, no heartbeat)
|
||||
// that the existing health-sweep misses because the Docker container
|
||||
// is still up (TCP connect succeeds) and the dead-origin HTTP-status
|
||||
// check isUpstreamDeadStatus is not triggered. Initial handler is
|
||||
// log-only; a gated auto-restart is a follow-up.
|
||||
go supervised.RunWithRecover(ctx, "wedged-agent-monitor", func(c context.Context) {
|
||||
registry.StartWedgedAgentMonitor(c, onWorkspaceWedged)
|
||||
})
|
||||
|
||||
// Orphan-container reconcile sweep — finds running containers
|
||||
// whose workspace row is already status='removed' and stops
|
||||
// them. Defence in depth on top of the inline cleanup in
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/provisioner"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/pkg/provisionhook"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -1355,6 +1356,36 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
|
||||
ws["last_outbound_at"] = nil
|
||||
}
|
||||
|
||||
// 2026-06-19 a2a RCA (#3057): fetch last_heartbeat_at alongside
|
||||
// last_outbound_at. The GET SELECT and scanWorkspaceRow do NOT
|
||||
// include it (reviewers 12459 + 12460 caught the previous
|
||||
// assumption that the column was already scanned), so we read
|
||||
// it as a separate column-shaped follow-up query. The wedge
|
||||
// predicate needs BOTH timestamps — without the heartbeat fetch,
|
||||
// lastHeartbeat stayed invalid and any active workspace with a
|
||||
// stale outbound was flagged wedged, recreating the false-positive
|
||||
// safety concern this PR exists to fix.
|
||||
var lastHeartbeat sql.NullTime
|
||||
if err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id,
|
||||
).Scan(&lastHeartbeat); err == nil && lastHeartbeat.Valid {
|
||||
ws["last_heartbeat_at"] = lastHeartbeat.Time
|
||||
} else {
|
||||
ws["last_heartbeat_at"] = nil
|
||||
}
|
||||
|
||||
// 2026-06-19 a2a RCA (#3057): surface a `wedged` boolean so
|
||||
// operators and the canvas can detect the alive-but-wedged case
|
||||
// (active_tasks>0, no outbound, no heartbeat) without manually
|
||||
// inspecting the tuple. The predicate is the same one the
|
||||
// wedged-agent monitor uses internally (registry.IsWedgedAgent),
|
||||
// so the flag and the monitor can never disagree. Threshold comes
|
||||
// from the same env var override; default 5 minutes. active_tasks
|
||||
// is in the scanned row; last_outbound_at and last_heartbeat_at
|
||||
// are both fetched above as separate column queries.
|
||||
activeTasksVal, _ := ws["active_tasks"].(int)
|
||||
ws["wedged"] = registry.IsWedgedAgent(activeTasksVal, lastOutbound, lastHeartbeat, registry.WedgedThresholdForHTTP())
|
||||
|
||||
// #2054 phase 2: per-runtime provision-timeout for canvas's
|
||||
// ProvisioningTimeout banner.
|
||||
if rt, _ := ws["runtime"].(string); rt != "" {
|
||||
|
||||
@@ -2258,3 +2258,148 @@ func TestWorkspaceCreate_188_ExplicitRuntimeNoTemplate_OK(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== GET /workspaces/:id — wedged flag (#3057) ====================
|
||||
|
||||
// 2026-06-19 a2a RCA (#3057) regression: the previous version of the
|
||||
// wedged flag assumed `last_heartbeat_at` was in the scanned row, but
|
||||
// scanWorkspaceRow does not select/scan it. As a result, lastHeartbeat
|
||||
// stayed invalid and any workspace with active_tasks>0 + stale outbound
|
||||
// was reported as wedged — even if the heartbeat was fresh. This test
|
||||
// pins the fix: GET must fetch last_heartbeat_at separately, and the
|
||||
// flag must be false when the heartbeat is recent (the canonical
|
||||
// "legitimately busy" case from the IsWedgedAgent truth table).
|
||||
func TestWorkspaceGet_Wedged_FreshHeartbeatStaleOutbound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
id := "cccccccc-00a0-0000-0000-000000000000"
|
||||
now := time.Now().UTC()
|
||||
freshHeartbeat := now.Add(-30 * time.Second) // well within the 5min default threshold
|
||||
staleOutbound := now.Add(-15 * time.Minute) // well past the 5min threshold
|
||||
|
||||
columns := []string{
|
||||
"id", "name", "role", "tier", "status", "agent_card", "url",
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled", "compute", "kind",
|
||||
}
|
||||
// active_tasks=1 (busy), but the GET row carries only what
|
||||
// scanWorkspaceRow scans (which does NOT include last_heartbeat_at).
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows(columns).
|
||||
AddRow(id, "Busy Agent", "worker", 1, "online", []byte(`null`),
|
||||
"", nil, 1, 1, 0.0, "", 60, "mid-turn", "claude-code",
|
||||
"", 0.0, 0.0, false,
|
||||
nil, 0, false, true, []byte(`{}`), "workspace"))
|
||||
// Follow-up query for last_outbound_at (existing #817 path).
|
||||
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_outbound_at"}).AddRow(staleOutbound))
|
||||
// New follow-up query for last_heartbeat_at (the fix the reviewers
|
||||
// caught was missing). The predicate MUST see this fresh heartbeat
|
||||
// to keep the flag false.
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(freshHeartbeat))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: id}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/"+id, nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
// The wedged flag must be FALSE: a busy agent with a fresh
|
||||
// heartbeat is not wedged. If this fails, the false-positive
|
||||
// regression is back.
|
||||
if wedged, _ := resp["wedged"].(bool); wedged {
|
||||
t.Errorf("wedged flag = true, want false (active=1, fresh heartbeat, stale outbound) — "+
|
||||
"the false-positive regression: last_heartbeat_at is not being fetched or read into the predicate")
|
||||
}
|
||||
// And last_heartbeat_at should be surfaced in the response so
|
||||
// operators can see the value the predicate consumed.
|
||||
if _, ok := resp["last_heartbeat_at"]; !ok {
|
||||
t.Errorf("last_heartbeat_at missing from response — the GET must surface it so "+
|
||||
"operators can verify the wedge predicate's input")
|
||||
}
|
||||
if resp["last_outbound_at"] == nil {
|
||||
t.Errorf("last_outbound_at should be the stale value, got nil")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Companion test: the WEDGE case (active>0, stale outbound, stale/null
|
||||
// heartbeat) must produce wedged:true. The two tests together pin
|
||||
// BOTH directions of the predicate at the HTTP integration boundary.
|
||||
func TestWorkspaceGet_Wedged_StaleHeartbeatStaleOutbound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
id := "cccccccc-00a1-0000-0000-000000000000"
|
||||
now := time.Now().UTC()
|
||||
staleHeartbeat := now.Add(-15 * time.Minute) // past the 5min threshold
|
||||
staleOutbound := now.Add(-15 * time.Minute) // past the 5min threshold
|
||||
|
||||
columns := []string{
|
||||
"id", "name", "role", "tier", "status", "agent_card", "url",
|
||||
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
|
||||
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
|
||||
"budget_limit", "monthly_spend",
|
||||
"broadcast_enabled", "talk_to_user_enabled", "compute", "kind",
|
||||
}
|
||||
mock.ExpectQuery("SELECT w.id, w.name").
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows(columns).
|
||||
AddRow(id, "Wedged Agent", "worker", 1, "online", []byte(`null`),
|
||||
"", nil, 1, 1, 0.0, "", 60, "stuck", "claude-code",
|
||||
"", 0.0, 0.0, false,
|
||||
nil, 0, false, true, []byte(`{}`), "workspace"))
|
||||
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_outbound_at"}).AddRow(staleOutbound))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(staleHeartbeat))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: id}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/"+id, nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to parse response: %v", err)
|
||||
}
|
||||
if wedged, _ := resp["wedged"].(bool); !wedged {
|
||||
t.Errorf("wedged flag = false, want true (active=1, stale heartbeat, stale outbound) — "+
|
||||
"the wedge predicate failed at the HTTP integration boundary")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,230 @@
|
||||
// Package registry — wedged-agent detection.
|
||||
//
|
||||
// Background (2026-06-19 a2a RCA, #3057): a workspace agent can be
|
||||
// "alive-but-wedged" — the agent process is up (so the platform's TCP
|
||||
// connect succeeds), but it is hung mid-turn and produces no outbound
|
||||
// A2A activity, no heartbeats, and (eventually) no progress. The
|
||||
// existing reactive detection (isUpstreamDeadStatus → auto-restart)
|
||||
// only fires on dead-origin HTTP statuses (502/521/522/524), not on
|
||||
// this wedged-while-TCP-alive case. As a result, Kimi (workspace
|
||||
// 6cb8c061) was observed with `active_tasks=1` (stuck), `last_outbound_at`
|
||||
// ~48 minutes stale, heartbeat null/fresh:false — but the platform's
|
||||
// `status: online` flag stayed set and the wedge was only caught by
|
||||
// MANUAL inspection of (active>0 + no-outbound + null-heartbeat).
|
||||
//
|
||||
// Fix: a separate `StartWedgedAgentMonitor` that periodically queries
|
||||
// for workspaces matching the wedged shape, surfaces the predicate via
|
||||
// `IsWedgedAgent`, and dispatches a handler that the platform can use
|
||||
// to (gated) auto-recover — initially logging + flipping a `wedged`
|
||||
// flag in `get_workspace`, with auto-restart left as a follow-up
|
||||
// gated on ops review.
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
)
|
||||
|
||||
// DefaultWedgedThreshold is the "no outbound" interval that, combined
|
||||
// with active_tasks > 0 and a null/stale heartbeat, classifies a
|
||||
// workspace as wedged. 5 minutes is long enough that a long synchronous
|
||||
// busy turn (large LLM response, deep tool chain) does not false-
|
||||
// positive, but short enough that an operator inspecting a stuck
|
||||
// workspace sees the wedge within a 5-minute window of staleness —
|
||||
// matching the spirit of the 180s heartbeat-staleness window in
|
||||
// healthsweep.go (the heartbeat-staleness and outbound-staleness
|
||||
// windows are intentionally different: a wedged agent may still send
|
||||
// heartbeats for a while even after it stops producing outbound A2A).
|
||||
//
|
||||
// Override via `WEDGED_AGENT_THRESHOLD_SECONDS` env var (integer
|
||||
// seconds). Same parse rules as REMOTE_LIVENESS_STALE_AFTER in
|
||||
// healthsweep.go.
|
||||
const DefaultWedgedThreshold = 5 * time.Minute
|
||||
|
||||
// wedgedThreshold reads the override from env, falling back to default.
|
||||
func wedgedThreshold() time.Duration {
|
||||
v := os.Getenv("WEDGED_AGENT_THRESHOLD_SECONDS")
|
||||
if v == "" {
|
||||
return DefaultWedgedThreshold
|
||||
}
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil || n <= 0 {
|
||||
log.Printf("Wedged-agent monitor: invalid WEDGED_AGENT_THRESHOLD_SECONDS=%q (want positive integer seconds) — using default %s", v, DefaultWedgedThreshold)
|
||||
return DefaultWedgedThreshold
|
||||
}
|
||||
return time.Duration(n) * time.Second
|
||||
}
|
||||
|
||||
// WedgedThresholdForHTTP is the same env-driven threshold the monitor
|
||||
// uses, exposed as a public package symbol so the get_workspace
|
||||
// handler can compute its `wedged` flag with the same threshold the
|
||||
// monitor is watching. Keeping both on the same source prevents the
|
||||
// HTTP flag and the monitor from drifting out of sync if the env
|
||||
// override is set.
|
||||
//
|
||||
// 2026-06-19 a2a RCA (#3057).
|
||||
func WedgedThresholdForHTTP() time.Duration {
|
||||
return wedgedThreshold()
|
||||
}
|
||||
|
||||
// IsWedgedAgent classifies a workspace as wedged. A workspace is
|
||||
// wedged when ALL of the following hold:
|
||||
//
|
||||
// - activeTasks > 0 — the agent claims it is mid-turn, not idle.
|
||||
// - lastOutboundAt is NULL OR older than threshold — the agent has
|
||||
// not produced an outbound A2A in the window. A null value means
|
||||
// the workspace has never sent anything (the heartbeat-driven
|
||||
// `active_tasks=1` is the only signal we have, and the absence
|
||||
// of any outbound is the wedge signal).
|
||||
// - lastHeartbeatAt is NULL OR older than threshold — the agent's
|
||||
// heartbeat task is also missing or stale. This separates a
|
||||
// wedged agent (no heartbeat, no outbound) from a busy agent
|
||||
// that's just slow (active=1, outbound recent, heartbeat recent).
|
||||
//
|
||||
// Exposed for unit tests and for the `get_workspace` endpoint to
|
||||
// surface a `wedged: true` flag. The monitor calls this internally
|
||||
// too, so the SQL query and the surfaced flag can never disagree.
|
||||
//
|
||||
// 2026-06-19 a2a RCA (#3057).
|
||||
func IsWedgedAgent(activeTasks int, lastOutboundAt, lastHeartbeatAt sql.NullTime, threshold time.Duration) bool {
|
||||
if activeTasks <= 0 {
|
||||
return false
|
||||
}
|
||||
if threshold <= 0 {
|
||||
// Defensive: a non-positive threshold is operator config
|
||||
// footgun (would mark every busy agent as wedged). Treat
|
||||
// as "wedge detection disabled" rather than a panic.
|
||||
return false
|
||||
}
|
||||
now := time.Now()
|
||||
// A null OR older-than-threshold outbound is one half of the
|
||||
// signal. A null OR older-than-threshold heartbeat is the other.
|
||||
outboundStale := !lastOutboundAt.Valid || now.Sub(lastOutboundAt.Time) > threshold
|
||||
heartbeatStale := !lastHeartbeatAt.Valid || now.Sub(lastHeartbeatAt.Time) > threshold
|
||||
return outboundStale && heartbeatStale
|
||||
}
|
||||
|
||||
// WedgedHandler is called for each workspace that the wedged-agent
|
||||
// monitor decides is wedged. The handler should be idempotent — the
|
||||
// monitor may fire for the same workspace across multiple ticks until
|
||||
// the wedge clears (heartbeat resumes, outbound resumes) or the
|
||||
// workspace is taken offline. The platform starts with a log-only
|
||||
// handler; a gated auto-restart handler is a follow-up.
|
||||
type WedgedHandler func(ctx context.Context, workspaceID string)
|
||||
|
||||
// DefaultWedgedMonitorInterval is how often the wedged-agent monitor
|
||||
// polls the database. 30s is fine-grained enough that an operator
|
||||
// inspecting a stuck workspace sees the wedge flag flip within a
|
||||
// 30s window of the threshold elapsing, and cheap enough on a busy
|
||||
// platform — the query hits a small partial index on (status,
|
||||
// active_tasks) and a per-row null-check on the timestamp columns.
|
||||
//
|
||||
// Override via `WEDGED_AGENT_MONITOR_INTERVAL_SECONDS` env var.
|
||||
const DefaultWedgedMonitorInterval = 30 * time.Second
|
||||
|
||||
// StartWedgedAgentMonitor periodically scans for wedged workspaces
|
||||
// (active_tasks > 0 + stale outbound + stale/null heartbeat) and
|
||||
// dispatches onWedged for each. It runs under supervised.RunWithRecover
|
||||
// so a panic is recovered with exponential backoff rather than
|
||||
// silently dying — same contract as StartHibernationMonitor and
|
||||
// StartHealthSweep.
|
||||
//
|
||||
// Only workspaces with status IN ('online', 'degraded') are scanned.
|
||||
// Removed / provisioning / paused workspaces are excluded. External
|
||||
// runtimes (no Docker container) are also excluded — the wedge
|
||||
// signal is defined in terms of the on-platform agent's outbound
|
||||
// activity, and external runtimes may legitimately have long quiet
|
||||
// periods when the operator's laptop is asleep.
|
||||
//
|
||||
// 2026-06-19 a2a RCA (#3057).
|
||||
func StartWedgedAgentMonitor(ctx context.Context, onWedged WedgedHandler) {
|
||||
StartWedgedAgentMonitorWithInterval(ctx, DefaultWedgedMonitorInterval, onWedged)
|
||||
}
|
||||
|
||||
// StartWedgedAgentMonitorWithInterval is StartWedgedAgentMonitor with
|
||||
// a configurable tick interval — exposed for tests so they don't
|
||||
// have to wait 30 seconds for a tick.
|
||||
func StartWedgedAgentMonitorWithInterval(ctx context.Context, interval time.Duration, onWedged WedgedHandler) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
threshold := wedgedThreshold()
|
||||
log.Printf("Wedged-agent monitor: started (interval=%s, threshold=%s)", interval, threshold)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Wedged-agent monitor: context done; stopping")
|
||||
return
|
||||
case <-ticker.C:
|
||||
sweepWedgedAgents(ctx, threshold, onWedged)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sweepWedgedAgents queries for wedged workspaces and calls onWedged
|
||||
// for each. Errors from DB are logged but do not crash the loop.
|
||||
// The query selects the minimal set of columns needed to apply the
|
||||
// IsWedgedAgent predicate in code, so the SQL `WHERE` clause can use
|
||||
// indexed columns and the predicate stays a pure Go function (easier
|
||||
// to unit-test than a SQL CASE).
|
||||
func sweepWedgedAgents(ctx context.Context, threshold time.Duration, onWedged WedgedHandler) {
|
||||
thresholdSec := int(threshold / time.Second)
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, active_tasks, last_outbound_at, last_heartbeat_at
|
||||
FROM workspaces
|
||||
WHERE status IN ('online', 'degraded')
|
||||
AND active_tasks > 0
|
||||
AND COALESCE(runtime, 'claude-code') != 'external'
|
||||
AND (
|
||||
last_outbound_at IS NULL
|
||||
OR last_outbound_at < now() - ($1 || ' seconds')::interval
|
||||
)
|
||||
AND (
|
||||
last_heartbeat_at IS NULL
|
||||
OR last_heartbeat_at < now() - ($1 || ' seconds')::interval
|
||||
)
|
||||
`, thresholdSec)
|
||||
if err != nil {
|
||||
log.Printf("Wedged-agent monitor: query error: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
var activeTasks int
|
||||
var lastOutbound, lastHeartbeat sql.NullTime
|
||||
if err := rows.Scan(&id, &activeTasks, &lastOutbound, &lastHeartbeat); err != nil {
|
||||
log.Printf("Wedged-agent monitor: scan error: %v", err)
|
||||
continue
|
||||
}
|
||||
// Defensive: re-apply the predicate in Go even though the SQL
|
||||
// already filtered. SQL's NOW() and Go's time.Now() can disagree
|
||||
// by milliseconds across the network — the Go predicate is the
|
||||
// authoritative one and IsWedgedAgent is the single source of
|
||||
// truth shared with the get_workspace flag.
|
||||
if !IsWedgedAgent(activeTasks, lastOutbound, lastHeartbeat, threshold) {
|
||||
continue
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Wedged-agent monitor: rows error: %v", err)
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
log.Printf("Wedged-agent monitor: detected wedge for %s (active_tasks>0, no outbound, no heartbeat for >%s) — dispatching handler",
|
||||
id, threshold)
|
||||
if onWedged != nil {
|
||||
onWedged(ctx, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
// Regression tests for the wedged-agent predicate and monitor. The
|
||||
// 2026-06-19 a2a RCA (#3057) found that an alive-but-wedged agent
|
||||
// (active_tasks>0, no outbound A2A, no heartbeat) read as `status:
|
||||
// online` to the platform and could only be detected by manual
|
||||
// inspection of the tuple. These tests pin the predicate so future
|
||||
// drift in the wedge definition is caught at unit-test time, not in
|
||||
// prod.
|
||||
package registry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ==================== IsWedgedAgent predicate ====================
|
||||
|
||||
func TestIsWedgedAgent_Pin(t *testing.T) {
|
||||
// The full truth table for the wedge definition. Each row is a
|
||||
// single tuple; the comment column explains the scenario.
|
||||
threshold := 5 * time.Minute
|
||||
now := time.Now()
|
||||
cases := []struct {
|
||||
name string
|
||||
activeTasks int
|
||||
lastOutboundAt sql.NullTime
|
||||
lastHeartbeatAt sql.NullTime
|
||||
threshold time.Duration
|
||||
wantWedged bool
|
||||
explanation string
|
||||
}{
|
||||
{
|
||||
name: "wedge: active>0, null outbound, null heartbeat",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{},
|
||||
lastHeartbeatAt: sql.NullTime{},
|
||||
threshold: threshold,
|
||||
wantWedged: true,
|
||||
explanation: "Kimi-shape wedge from the RCA: active>0 with no record of any activity",
|
||||
},
|
||||
{
|
||||
name: "wedge: active>0, stale outbound, stale heartbeat",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-6 * time.Minute), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: true,
|
||||
explanation: "Both timestamps older than threshold; agent is stuck",
|
||||
},
|
||||
{
|
||||
name: "busy-but-alive: active>0, recent outbound, recent heartbeat",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: false,
|
||||
explanation: "A legitimately busy agent: active AND producing outbound AND heartbeating",
|
||||
},
|
||||
{
|
||||
name: "busy-but-no-recent-outbound: recent heartbeat only",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: false,
|
||||
explanation: "Heartbeat is recent — the heartbeat task is alive even if the turn is stuck. We do NOT wedge; the operator gets a chance to inspect.",
|
||||
},
|
||||
{
|
||||
name: "idle: active==0, everything stale",
|
||||
activeTasks: 0,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: false,
|
||||
explanation: "Idle workspaces are NOT wedged — they are candidates for hibernation, a different monitor",
|
||||
},
|
||||
{
|
||||
name: "idle-and-claim: active==0 with stale everything (already hibernated candidate)",
|
||||
activeTasks: 0,
|
||||
lastOutboundAt: sql.NullTime{},
|
||||
lastHeartbeatAt: sql.NullTime{},
|
||||
threshold: threshold,
|
||||
wantWedged: false,
|
||||
explanation: "A never-used workspace is not wedged; wedge requires active>0",
|
||||
},
|
||||
{
|
||||
name: "busy-with-just-outbound-stale-but-heartbeat-recent (subtle)",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: false,
|
||||
explanation: "Recent heartbeat means the agent is alive; outbound staleness alone is a long turn, not a wedge",
|
||||
},
|
||||
{
|
||||
name: "non-positive threshold disables detection (defensive)",
|
||||
activeTasks: 1,
|
||||
lastOutboundAt: sql.NullTime{},
|
||||
lastHeartbeatAt: sql.NullTime{},
|
||||
threshold: 0,
|
||||
wantWedged: false,
|
||||
explanation: "A non-positive threshold is operator config footgun; the predicate must not panic, must return false",
|
||||
},
|
||||
{
|
||||
name: "active=2 with stale everything (multiple stuck turns)",
|
||||
activeTasks: 2,
|
||||
lastOutboundAt: sql.NullTime{Time: now.Add(-30 * time.Minute), Valid: true},
|
||||
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Minute), Valid: true},
|
||||
threshold: threshold,
|
||||
wantWedged: true,
|
||||
explanation: "active>0 (any positive value) with both stale is wedged",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := IsWedgedAgent(tc.activeTasks, tc.lastOutboundAt, tc.lastHeartbeatAt, tc.threshold)
|
||||
if got != tc.wantWedged {
|
||||
t.Errorf("IsWedgedAgent(active=%d, outbound=%+v, heartbeat=%+v, threshold=%s) = %v, want %v\n explanation: %s",
|
||||
tc.activeTasks, tc.lastOutboundAt, tc.lastHeartbeatAt, tc.threshold, got, tc.wantWedged, tc.explanation)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsWedgedAgent_ThresholdBoundary(t *testing.T) {
|
||||
// "Fresh" timestamps (well within threshold) are not wedged. The
|
||||
// predicate uses `now.Sub(t) > threshold` (strict greater-than),
|
||||
// and a 1-second-fresh timestamp is well within a 5-minute
|
||||
// threshold — the test is intentionally using a 1-second
|
||||
// boundary so the assertion is robust to time.Now() drift between
|
||||
// the test setup and the predicate call.
|
||||
threshold := 5 * time.Minute
|
||||
now := time.Now()
|
||||
fresh := sql.NullTime{Time: now.Add(-1 * time.Second), Valid: true}
|
||||
if IsWedgedAgent(1, fresh, fresh, threshold) {
|
||||
t.Errorf("1-second-fresh timestamps should NOT be wedged (well within threshold)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWedgedThresholdForHTTP_MirrorsMonitor(t *testing.T) {
|
||||
// The HTTP `wedged` flag and the monitor's sweep query must use
|
||||
// the same threshold, otherwise a flag flip on the HTTP response
|
||||
// might disagree with the monitor's dispatch. WedgedThresholdForHTTP
|
||||
// is the contract: same env var, same parse, same default.
|
||||
if got := WedgedThresholdForHTTP(); got <= 0 {
|
||||
t.Errorf("WedgedThresholdForHTTP() = %s, want positive duration", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user