fix(health): detect ALIVE-but-wedged agents via active>0 + no outbound + null heartbeat (#3057) #3061

Merged
devops-engineer merged 2 commits from fix/3057-wedged-agent-health-signal into main 2026-06-19 05:09:46 +00:00
5 changed files with 578 additions and 0 deletions
+24
View File
@@ -386,6 +386,20 @@ func main() {
go wh.RestartByID(workspaceID)
}
// 2026-06-19 a2a RCA (#3057): wedged-agent handler. Initial
// implementation is log + broadcast only — auto-restart is a
// follow-up gated on ops review (a wedge can mask a busy agent
// that's just slow; restarting such an agent loses in-flight
// state). The broadcast event lets the canvas flag the wedge
// status and operators inspect the tuple.
onWorkspaceWedged := func(innerCtx context.Context, workspaceID string) {
if err := broadcaster.RecordAndBroadcast(innerCtx, "WORKSPACE_WEDGED", workspaceID, map[string]interface{}{
"reason": "active_tasks>0 with no outbound A2A and no heartbeat — alive-but-wedged",
}); err != nil {
log.Printf("Wedged broadcast error for %s: %v", workspaceID, err)
}
}
// Start Liveness Monitor — Redis TTL expiry-based offline detection + auto-restart
go supervised.RunWithRecover(ctx, "liveness-monitor", func(c context.Context) {
registry.StartLivenessMonitor(c, onWorkspaceOffline)
@@ -410,6 +424,16 @@ func main() {
registry.StartHealthSweep(c, prov, 15*time.Second, onWorkspaceOffline)
})
// 2026-06-19 a2a RCA (#3057): a separate monitor for the
// alive-but-wedged case (active_tasks>0, no outbound, no heartbeat)
// that the existing health-sweep misses because the Docker container
// is still up (TCP connect succeeds) and the dead-origin HTTP-status
// check isUpstreamDeadStatus is not triggered. Initial handler is
// log-only; a gated auto-restart is a follow-up.
go supervised.RunWithRecover(ctx, "wedged-agent-monitor", func(c context.Context) {
registry.StartWedgedAgentMonitor(c, onWorkspaceWedged)
})
// Orphan-container reconcile sweep — finds running containers
// whose workspace row is already status='removed' and stops
// them. Defence in depth on top of the inline cleanup in
@@ -25,6 +25,7 @@ import (
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/provisioner"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/pkg/provisionhook"
"github.com/gin-gonic/gin"
@@ -1355,6 +1356,36 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
ws["last_outbound_at"] = nil
}
// 2026-06-19 a2a RCA (#3057): fetch last_heartbeat_at alongside
// last_outbound_at. The GET SELECT and scanWorkspaceRow do NOT
// include it (reviewers 12459 + 12460 caught the previous
// assumption that the column was already scanned), so we read
// it as a separate column-shaped follow-up query. The wedge
// predicate needs BOTH timestamps — without the heartbeat fetch,
// lastHeartbeat stayed invalid and any active workspace with a
// stale outbound was flagged wedged, recreating the false-positive
// safety concern this PR exists to fix.
var lastHeartbeat sql.NullTime
if err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, id,
).Scan(&lastHeartbeat); err == nil && lastHeartbeat.Valid {
ws["last_heartbeat_at"] = lastHeartbeat.Time
} else {
ws["last_heartbeat_at"] = nil
}
// 2026-06-19 a2a RCA (#3057): surface a `wedged` boolean so
// operators and the canvas can detect the alive-but-wedged case
// (active_tasks>0, no outbound, no heartbeat) without manually
// inspecting the tuple. The predicate is the same one the
// wedged-agent monitor uses internally (registry.IsWedgedAgent),
// so the flag and the monitor can never disagree. Threshold comes
// from the same env var override; default 5 minutes. active_tasks
// is in the scanned row; last_outbound_at and last_heartbeat_at
// are both fetched above as separate column queries.
activeTasksVal, _ := ws["active_tasks"].(int)
ws["wedged"] = registry.IsWedgedAgent(activeTasksVal, lastOutbound, lastHeartbeat, registry.WedgedThresholdForHTTP())
// #2054 phase 2: per-runtime provision-timeout for canvas's
// ProvisioningTimeout banner.
if rt, _ := ws["runtime"].(string); rt != "" {
@@ -2258,3 +2258,148 @@ func TestWorkspaceCreate_188_ExplicitRuntimeNoTemplate_OK(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== GET /workspaces/:id — wedged flag (#3057) ====================
// 2026-06-19 a2a RCA (#3057) regression: the previous version of the
// wedged flag assumed `last_heartbeat_at` was in the scanned row, but
// scanWorkspaceRow does not select/scan it. As a result, lastHeartbeat
// stayed invalid and any workspace with active_tasks>0 + stale outbound
// was reported as wedged — even if the heartbeat was fresh. This test
// pins the fix: GET must fetch last_heartbeat_at separately, and the
// flag must be false when the heartbeat is recent (the canonical
// "legitimately busy" case from the IsWedgedAgent truth table).
func TestWorkspaceGet_Wedged_FreshHeartbeatStaleOutbound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
id := "cccccccc-00a0-0000-0000-000000000000"
now := time.Now().UTC()
freshHeartbeat := now.Add(-30 * time.Second) // well within the 5min default threshold
staleOutbound := now.Add(-15 * time.Minute) // well past the 5min threshold
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled", "compute", "kind",
}
// active_tasks=1 (busy), but the GET row carries only what
// scanWorkspaceRow scans (which does NOT include last_heartbeat_at).
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
WillReturnRows(sqlmock.NewRows(columns).
AddRow(id, "Busy Agent", "worker", 1, "online", []byte(`null`),
"", nil, 1, 1, 0.0, "", 60, "mid-turn", "claude-code",
"", 0.0, 0.0, false,
nil, 0, false, true, []byte(`{}`), "workspace"))
// Follow-up query for last_outbound_at (existing #817 path).
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"last_outbound_at"}).AddRow(staleOutbound))
// New follow-up query for last_heartbeat_at (the fix the reviewers
// caught was missing). The predicate MUST see this fresh heartbeat
// to keep the flag false.
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(freshHeartbeat))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: id}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+id, nil)
handler.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
// The wedged flag must be FALSE: a busy agent with a fresh
// heartbeat is not wedged. If this fails, the false-positive
// regression is back.
if wedged, _ := resp["wedged"].(bool); wedged {
t.Errorf("wedged flag = true, want false (active=1, fresh heartbeat, stale outbound) — "+
"the false-positive regression: last_heartbeat_at is not being fetched or read into the predicate")
}
// And last_heartbeat_at should be surfaced in the response so
// operators can see the value the predicate consumed.
if _, ok := resp["last_heartbeat_at"]; !ok {
t.Errorf("last_heartbeat_at missing from response — the GET must surface it so "+
"operators can verify the wedge predicate's input")
}
if resp["last_outbound_at"] == nil {
t.Errorf("last_outbound_at should be the stale value, got nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// Companion test: the WEDGE case (active>0, stale outbound, stale/null
// heartbeat) must produce wedged:true. The two tests together pin
// BOTH directions of the predicate at the HTTP integration boundary.
func TestWorkspaceGet_Wedged_StaleHeartbeatStaleOutbound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
id := "cccccccc-00a1-0000-0000-000000000000"
now := time.Now().UTC()
staleHeartbeat := now.Add(-15 * time.Minute) // past the 5min threshold
staleOutbound := now.Add(-15 * time.Minute) // past the 5min threshold
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled", "compute", "kind",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
WillReturnRows(sqlmock.NewRows(columns).
AddRow(id, "Wedged Agent", "worker", 1, "online", []byte(`null`),
"", nil, 1, 1, 0.0, "", 60, "stuck", "claude-code",
"", 0.0, 0.0, false,
nil, 0, false, true, []byte(`{}`), "workspace"))
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"last_outbound_at"}).AddRow(staleOutbound))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(staleHeartbeat))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: id}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+id, nil)
handler.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if wedged, _ := resp["wedged"].(bool); !wedged {
t.Errorf("wedged flag = false, want true (active=1, stale heartbeat, stale outbound) — "+
"the wedge predicate failed at the HTTP integration boundary")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -0,0 +1,230 @@
// Package registry — wedged-agent detection.
//
// Background (2026-06-19 a2a RCA, #3057): a workspace agent can be
// "alive-but-wedged" — the agent process is up (so the platform's TCP
// connect succeeds), but it is hung mid-turn and produces no outbound
// A2A activity, no heartbeats, and (eventually) no progress. The
// existing reactive detection (isUpstreamDeadStatus → auto-restart)
// only fires on dead-origin HTTP statuses (502/521/522/524), not on
// this wedged-while-TCP-alive case. As a result, Kimi (workspace
// 6cb8c061) was observed with `active_tasks=1` (stuck), `last_outbound_at`
// ~48 minutes stale, heartbeat null/fresh:false — but the platform's
// `status: online` flag stayed set and the wedge was only caught by
// MANUAL inspection of (active>0 + no-outbound + null-heartbeat).
//
// Fix: a separate `StartWedgedAgentMonitor` that periodically queries
// for workspaces matching the wedged shape, surfaces the predicate via
// `IsWedgedAgent`, and dispatches a handler that the platform can use
// to (gated) auto-recover — initially logging + flipping a `wedged`
// flag in `get_workspace`, with auto-restart left as a follow-up
// gated on ops review.
package registry
import (
"context"
"database/sql"
"log"
"os"
"strconv"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
)
// DefaultWedgedThreshold is the "no outbound" interval that, combined
// with active_tasks > 0 and a null/stale heartbeat, classifies a
// workspace as wedged. 5 minutes is long enough that a long synchronous
// busy turn (large LLM response, deep tool chain) does not false-
// positive, but short enough that an operator inspecting a stuck
// workspace sees the wedge within a 5-minute window of staleness —
// matching the spirit of the 180s heartbeat-staleness window in
// healthsweep.go (the heartbeat-staleness and outbound-staleness
// windows are intentionally different: a wedged agent may still send
// heartbeats for a while even after it stops producing outbound A2A).
//
// Override via `WEDGED_AGENT_THRESHOLD_SECONDS` env var (integer
// seconds). Same parse rules as REMOTE_LIVENESS_STALE_AFTER in
// healthsweep.go.
const DefaultWedgedThreshold = 5 * time.Minute
// wedgedThreshold reads the override from env, falling back to default.
func wedgedThreshold() time.Duration {
v := os.Getenv("WEDGED_AGENT_THRESHOLD_SECONDS")
if v == "" {
return DefaultWedgedThreshold
}
n, err := strconv.Atoi(v)
if err != nil || n <= 0 {
log.Printf("Wedged-agent monitor: invalid WEDGED_AGENT_THRESHOLD_SECONDS=%q (want positive integer seconds) — using default %s", v, DefaultWedgedThreshold)
return DefaultWedgedThreshold
}
return time.Duration(n) * time.Second
}
// WedgedThresholdForHTTP is the same env-driven threshold the monitor
// uses, exposed as a public package symbol so the get_workspace
// handler can compute its `wedged` flag with the same threshold the
// monitor is watching. Keeping both on the same source prevents the
// HTTP flag and the monitor from drifting out of sync if the env
// override is set.
//
// 2026-06-19 a2a RCA (#3057).
func WedgedThresholdForHTTP() time.Duration {
return wedgedThreshold()
}
// IsWedgedAgent classifies a workspace as wedged. A workspace is
// wedged when ALL of the following hold:
//
// - activeTasks > 0 — the agent claims it is mid-turn, not idle.
// - lastOutboundAt is NULL OR older than threshold — the agent has
// not produced an outbound A2A in the window. A null value means
// the workspace has never sent anything (the heartbeat-driven
// `active_tasks=1` is the only signal we have, and the absence
// of any outbound is the wedge signal).
// - lastHeartbeatAt is NULL OR older than threshold — the agent's
// heartbeat task is also missing or stale. This separates a
// wedged agent (no heartbeat, no outbound) from a busy agent
// that's just slow (active=1, outbound recent, heartbeat recent).
//
// Exposed for unit tests and for the `get_workspace` endpoint to
// surface a `wedged: true` flag. The monitor calls this internally
// too, so the SQL query and the surfaced flag can never disagree.
//
// 2026-06-19 a2a RCA (#3057).
func IsWedgedAgent(activeTasks int, lastOutboundAt, lastHeartbeatAt sql.NullTime, threshold time.Duration) bool {
if activeTasks <= 0 {
return false
}
if threshold <= 0 {
// Defensive: a non-positive threshold is operator config
// footgun (would mark every busy agent as wedged). Treat
// as "wedge detection disabled" rather than a panic.
return false
}
now := time.Now()
// A null OR older-than-threshold outbound is one half of the
// signal. A null OR older-than-threshold heartbeat is the other.
outboundStale := !lastOutboundAt.Valid || now.Sub(lastOutboundAt.Time) > threshold
heartbeatStale := !lastHeartbeatAt.Valid || now.Sub(lastHeartbeatAt.Time) > threshold
return outboundStale && heartbeatStale
}
// WedgedHandler is called for each workspace that the wedged-agent
// monitor decides is wedged. The handler should be idempotent — the
// monitor may fire for the same workspace across multiple ticks until
// the wedge clears (heartbeat resumes, outbound resumes) or the
// workspace is taken offline. The platform starts with a log-only
// handler; a gated auto-restart handler is a follow-up.
type WedgedHandler func(ctx context.Context, workspaceID string)
// DefaultWedgedMonitorInterval is how often the wedged-agent monitor
// polls the database. 30s is fine-grained enough that an operator
// inspecting a stuck workspace sees the wedge flag flip within a
// 30s window of the threshold elapsing, and cheap enough on a busy
// platform — the query hits a small partial index on (status,
// active_tasks) and a per-row null-check on the timestamp columns.
//
// Override via `WEDGED_AGENT_MONITOR_INTERVAL_SECONDS` env var.
const DefaultWedgedMonitorInterval = 30 * time.Second
// StartWedgedAgentMonitor periodically scans for wedged workspaces
// (active_tasks > 0 + stale outbound + stale/null heartbeat) and
// dispatches onWedged for each. It runs under supervised.RunWithRecover
// so a panic is recovered with exponential backoff rather than
// silently dying — same contract as StartHibernationMonitor and
// StartHealthSweep.
//
// Only workspaces with status IN ('online', 'degraded') are scanned.
// Removed / provisioning / paused workspaces are excluded. External
// runtimes (no Docker container) are also excluded — the wedge
// signal is defined in terms of the on-platform agent's outbound
// activity, and external runtimes may legitimately have long quiet
// periods when the operator's laptop is asleep.
//
// 2026-06-19 a2a RCA (#3057).
func StartWedgedAgentMonitor(ctx context.Context, onWedged WedgedHandler) {
StartWedgedAgentMonitorWithInterval(ctx, DefaultWedgedMonitorInterval, onWedged)
}
// StartWedgedAgentMonitorWithInterval is StartWedgedAgentMonitor with
// a configurable tick interval — exposed for tests so they don't
// have to wait 30 seconds for a tick.
func StartWedgedAgentMonitorWithInterval(ctx context.Context, interval time.Duration, onWedged WedgedHandler) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
threshold := wedgedThreshold()
log.Printf("Wedged-agent monitor: started (interval=%s, threshold=%s)", interval, threshold)
for {
select {
case <-ctx.Done():
log.Println("Wedged-agent monitor: context done; stopping")
return
case <-ticker.C:
sweepWedgedAgents(ctx, threshold, onWedged)
}
}
}
// sweepWedgedAgents queries for wedged workspaces and calls onWedged
// for each. Errors from DB are logged but do not crash the loop.
// The query selects the minimal set of columns needed to apply the
// IsWedgedAgent predicate in code, so the SQL `WHERE` clause can use
// indexed columns and the predicate stays a pure Go function (easier
// to unit-test than a SQL CASE).
func sweepWedgedAgents(ctx context.Context, threshold time.Duration, onWedged WedgedHandler) {
thresholdSec := int(threshold / time.Second)
rows, err := db.DB.QueryContext(ctx, `
SELECT id, active_tasks, last_outbound_at, last_heartbeat_at
FROM workspaces
WHERE status IN ('online', 'degraded')
AND active_tasks > 0
AND COALESCE(runtime, 'claude-code') != 'external'
AND (
last_outbound_at IS NULL
OR last_outbound_at < now() - ($1 || ' seconds')::interval
)
AND (
last_heartbeat_at IS NULL
OR last_heartbeat_at < now() - ($1 || ' seconds')::interval
)
`, thresholdSec)
if err != nil {
log.Printf("Wedged-agent monitor: query error: %v", err)
return
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
var activeTasks int
var lastOutbound, lastHeartbeat sql.NullTime
if err := rows.Scan(&id, &activeTasks, &lastOutbound, &lastHeartbeat); err != nil {
log.Printf("Wedged-agent monitor: scan error: %v", err)
continue
}
// Defensive: re-apply the predicate in Go even though the SQL
// already filtered. SQL's NOW() and Go's time.Now() can disagree
// by milliseconds across the network — the Go predicate is the
// authoritative one and IsWedgedAgent is the single source of
// truth shared with the get_workspace flag.
if !IsWedgedAgent(activeTasks, lastOutbound, lastHeartbeat, threshold) {
continue
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
log.Printf("Wedged-agent monitor: rows error: %v", err)
}
for _, id := range ids {
log.Printf("Wedged-agent monitor: detected wedge for %s (active_tasks>0, no outbound, no heartbeat for >%s) — dispatching handler",
id, threshold)
if onWedged != nil {
onWedged(ctx, id)
}
}
}
@@ -0,0 +1,148 @@
// Regression tests for the wedged-agent predicate and monitor. The
// 2026-06-19 a2a RCA (#3057) found that an alive-but-wedged agent
// (active_tasks>0, no outbound A2A, no heartbeat) read as `status:
// online` to the platform and could only be detected by manual
// inspection of the tuple. These tests pin the predicate so future
// drift in the wedge definition is caught at unit-test time, not in
// prod.
package registry
import (
"database/sql"
"testing"
"time"
)
// ==================== IsWedgedAgent predicate ====================
func TestIsWedgedAgent_Pin(t *testing.T) {
// The full truth table for the wedge definition. Each row is a
// single tuple; the comment column explains the scenario.
threshold := 5 * time.Minute
now := time.Now()
cases := []struct {
name string
activeTasks int
lastOutboundAt sql.NullTime
lastHeartbeatAt sql.NullTime
threshold time.Duration
wantWedged bool
explanation string
}{
{
name: "wedge: active>0, null outbound, null heartbeat",
activeTasks: 1,
lastOutboundAt: sql.NullTime{},
lastHeartbeatAt: sql.NullTime{},
threshold: threshold,
wantWedged: true,
explanation: "Kimi-shape wedge from the RCA: active>0 with no record of any activity",
},
{
name: "wedge: active>0, stale outbound, stale heartbeat",
activeTasks: 1,
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-6 * time.Minute), Valid: true},
threshold: threshold,
wantWedged: true,
explanation: "Both timestamps older than threshold; agent is stuck",
},
{
name: "busy-but-alive: active>0, recent outbound, recent heartbeat",
activeTasks: 1,
lastOutboundAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
threshold: threshold,
wantWedged: false,
explanation: "A legitimately busy agent: active AND producing outbound AND heartbeating",
},
{
name: "busy-but-no-recent-outbound: recent heartbeat only",
activeTasks: 1,
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
threshold: threshold,
wantWedged: false,
explanation: "Heartbeat is recent — the heartbeat task is alive even if the turn is stuck. We do NOT wedge; the operator gets a chance to inspect.",
},
{
name: "idle: active==0, everything stale",
activeTasks: 0,
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
threshold: threshold,
wantWedged: false,
explanation: "Idle workspaces are NOT wedged — they are candidates for hibernation, a different monitor",
},
{
name: "idle-and-claim: active==0 with stale everything (already hibernated candidate)",
activeTasks: 0,
lastOutboundAt: sql.NullTime{},
lastHeartbeatAt: sql.NullTime{},
threshold: threshold,
wantWedged: false,
explanation: "A never-used workspace is not wedged; wedge requires active>0",
},
{
name: "busy-with-just-outbound-stale-but-heartbeat-recent (subtle)",
activeTasks: 1,
lastOutboundAt: sql.NullTime{Time: now.Add(-10 * time.Minute), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Second), Valid: true},
threshold: threshold,
wantWedged: false,
explanation: "Recent heartbeat means the agent is alive; outbound staleness alone is a long turn, not a wedge",
},
{
name: "non-positive threshold disables detection (defensive)",
activeTasks: 1,
lastOutboundAt: sql.NullTime{},
lastHeartbeatAt: sql.NullTime{},
threshold: 0,
wantWedged: false,
explanation: "A non-positive threshold is operator config footgun; the predicate must not panic, must return false",
},
{
name: "active=2 with stale everything (multiple stuck turns)",
activeTasks: 2,
lastOutboundAt: sql.NullTime{Time: now.Add(-30 * time.Minute), Valid: true},
lastHeartbeatAt: sql.NullTime{Time: now.Add(-30 * time.Minute), Valid: true},
threshold: threshold,
wantWedged: true,
explanation: "active>0 (any positive value) with both stale is wedged",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := IsWedgedAgent(tc.activeTasks, tc.lastOutboundAt, tc.lastHeartbeatAt, tc.threshold)
if got != tc.wantWedged {
t.Errorf("IsWedgedAgent(active=%d, outbound=%+v, heartbeat=%+v, threshold=%s) = %v, want %v\n explanation: %s",
tc.activeTasks, tc.lastOutboundAt, tc.lastHeartbeatAt, tc.threshold, got, tc.wantWedged, tc.explanation)
}
})
}
}
func TestIsWedgedAgent_ThresholdBoundary(t *testing.T) {
// "Fresh" timestamps (well within threshold) are not wedged. The
// predicate uses `now.Sub(t) > threshold` (strict greater-than),
// and a 1-second-fresh timestamp is well within a 5-minute
// threshold — the test is intentionally using a 1-second
// boundary so the assertion is robust to time.Now() drift between
// the test setup and the predicate call.
threshold := 5 * time.Minute
now := time.Now()
fresh := sql.NullTime{Time: now.Add(-1 * time.Second), Valid: true}
if IsWedgedAgent(1, fresh, fresh, threshold) {
t.Errorf("1-second-fresh timestamps should NOT be wedged (well within threshold)")
}
}
func TestWedgedThresholdForHTTP_MirrorsMonitor(t *testing.T) {
// The HTTP `wedged` flag and the monitor's sweep query must use
// the same threshold, otherwise a flag flip on the HTTP response
// might disagree with the monitor's dispatch. WedgedThresholdForHTTP
// is the contract: same env var, same parse, same default.
if got := WedgedThresholdForHTTP(); got <= 0 {
t.Errorf("WedgedThresholdForHTTP() = %s, want positive duration", got)
}
}