fix(registry): surface degraded status when register persistently 401s (core#2530) #2585

Merged
agent-reviewer merged 10 commits from fix/core-2530-register-failure-degraded into main 2026-06-11 14:21:37 +00:00
7 changed files with 358 additions and 68 deletions
@@ -384,9 +384,9 @@ func TestHeartbeat_ExactThreshold_Degraded(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// error_rate == 0.5 should trigger degraded (>= 0.5)
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-edge").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs(models.StatusDegraded, "ws-edge").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -425,9 +425,9 @@ func TestHeartbeat_DegradedRecovery(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Currently degraded, error_rate < 0.1 → should recover to online
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-rec").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs(models.StatusOnline, "ws-rec").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -467,9 +467,9 @@ func TestHeartbeat_ErrorRateDegrade_Guarded(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Stale read: heartbeat started before CascadeDelete set status='removed'
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-degrade-guard").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// Guarded UPDATE returns 0 rows because row is actually 'removed'
mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'online'").
@@ -513,9 +513,9 @@ func TestHeartbeat_DegradedRecovery_Guarded(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Stale read: heartbeat started before CascadeDelete set status='removed'
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-recover-guard").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
// Guarded UPDATE returns 0 rows because row is actually 'removed'
mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'degraded'").
@@ -695,6 +695,8 @@ func TestDiscover_TargetOffline(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"name", "runtime"}).AddRow("Offline Agent", "claude-code"))
// No cached internal URL → falls to DB status check → offline
// NOTE: Discover (discovery.go) selects ONLY status — distinct from the
// heartbeat evaluateStatus 2-col SELECT in registry.go.
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
WithArgs("ws-off").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("offline"))
@@ -237,9 +237,9 @@ func TestHeartbeatHandler_Normal(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -276,9 +276,9 @@ func TestHeartbeatHandler_Degraded(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT — currently online
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// Expect status transition to degraded
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -324,9 +324,9 @@ func TestHeartbeatHandler_Recovery(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT — currently degraded
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
// Expect status transition back to online
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -720,9 +720,9 @@ func TestHeartbeatHandler_TaskChanged(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -906,9 +906,9 @@ func TestHeartbeatHandler_TaskUnchanged(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// NO TASK_UPDATED broadcast expected — task didn't change
@@ -949,9 +949,9 @@ func TestHeartbeatHandler_TaskCleared(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// TASK_UPDATED broadcast expected — changed from "old task" to ""
// (BroadcastOnly doesn't hit sqlmock, so no expectation needed)
@@ -1008,9 +1008,9 @@ func TestHeartbeatHandler_AlwaysBroadcastsHeartbeat(t *testing.T) {
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-123", 0.0, "", 1, 500, "doing work").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-123").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -48,9 +48,9 @@ func TestHeartbeat_NativeStatusMgmt_SkipsDegradeInference(t *testing.T) {
// MUST NOT. We deliberately don't ExpectExec the degrade UPDATE
// — sqlmock fails the test if any UPDATE happens that wasn't
// expected, which is the regression cover.
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-native-status").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -99,9 +99,9 @@ func TestHeartbeat_NativeStatusMgmt_SkipsRecovery(t *testing.T) {
// evaluateStatus SELECT — currently degraded; recovery branch
// would normally fire UPDATE → online + WORKSPACE_ONLINE broadcast.
// Under native_status_mgmt, neither should run.
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-native-recovery").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -144,9 +144,9 @@ func TestHeartbeat_NativeStatusMgmt_WedgedStillRespected(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus SELECT — currently online, wedged branch SHOULD fire
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-wedged").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// Wedged degrade UPDATE — must still happen even with native_status_mgmt
mock.ExpectExec("UPDATE workspaces SET status =").
+41 -3
View File
@@ -336,9 +336,21 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// 403 (platform kind guard), 5xx (DB/internal error), or success from
// client timeout / unreachable $PLATFORM_URL.
registerStart := time.Now()
authOK := false
defer func(wsID string) {
if status := c.Writer.Status(); status != http.StatusOK {
log.Printf("Registry register: workspace=%s boot_register_failed status=%d duration=%s", wsID, status, time.Since(registerStart))
// #2530: record register failure so heartbeat can surface degraded status.
// #2585 hardening: only stamp after the caller has authenticated
// (requireWorkspaceToken succeeded). Unauthenticated 401s must NOT
// mutate workspace state — otherwise anyone can POST /registry/register
// without a bearer and force a false-degraded status via the heartbeat
// 5-minute failure window.
if authOK {
if _, err := db.DB.ExecContext(context.Background(), `UPDATE workspaces SET last_register_failure_at = now() WHERE id = $1`, wsID); err != nil {
log.Printf("Registry register: failed to record failure timestamp for %s: %v", wsID, err)
}
}
}
}(payload.ID)
@@ -374,6 +386,7 @@ func (h *RegistryHandler) Register(c *gin.Context) {
if err := h.requireWorkspaceToken(ctx, c, payload.ID); err != nil {
return // 401 response already written by requireWorkspaceToken
}
authOK = true
// SECURITY (privilege-escalation fix): the public register path must never
// CREATE or PROMOTE a row to kind='platform'. The org root is minted only by
@@ -631,6 +644,11 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// whichever sub-step failed (read or mint). If the secret never lands,
// chat upload surfaces the issue loudly with the RFC-#2312 hint.
// #2530: clear register failure on success — the workspace is healthy.
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET last_register_failure_at = NULL WHERE id = $1`, payload.ID); err != nil {
log.Printf("Registry register: failed to clear failure timestamp for %s: %v", payload.ID, err)
}
c.JSON(http.StatusOK, response)
}
@@ -852,11 +870,13 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
ctx := c.Request.Context()
var currentStatus string
err := db.DB.QueryRowContext(ctx, `SELECT status FROM workspaces WHERE id = $1`, payload.WorkspaceID).
Scan(&currentStatus)
var lastRegisterFailure sql.NullTime
err := db.DB.QueryRowContext(ctx, `SELECT status, last_register_failure_at FROM workspaces WHERE id = $1`, payload.WorkspaceID).
Scan(&currentStatus, &lastRegisterFailure)
if err != nil {
return
}
hasRecentRegisterFailure := lastRegisterFailure.Valid && time.Since(lastRegisterFailure.Time) < 5*time.Minute
// Self-reported runtime wedge: takes precedence over the error_rate
// path. The heartbeat task lives in its own asyncio task and keeps
@@ -903,6 +923,21 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
})
}
// #2530: degrade when register has persistently failed within the last
// 5 minutes. A workspace whose auth token was lost after container re-create
// will 401 on every boot register; heartbeats keep it looking online while
// canvas chat delivery silently starves. Surfacing degraded gives the user
// a visible restart/credential-repair hint.
if currentStatus == "online" && hasRecentRegisterFailure {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'online'`, models.StatusDegraded, payload.WorkspaceID); err != nil {
log.Printf("Heartbeat: failed to mark %s degraded (register failure): %v", payload.WorkspaceID, err)
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceDegraded), payload.WorkspaceID, map[string]interface{}{
"register_failure": true,
"sample_error": "Register failed — workspace auth token may be stale. Restart or reprovision to recover.",
})
}
// Recovery from degraded → online when BOTH the error rate has
// fallen back AND the workspace is no longer reporting a wedge.
// The wedge condition is sticky for the process lifetime
@@ -912,7 +947,10 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
//
// Skipped under native_status_mgmt for the same reason as the
// degrade branch above: the adapter owns the transition.
if !nativeStatus && currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" {
//
// #2530: also require no recent register failure — the workspace stays
// degraded until a successful register clears the failure timestamp.
if !nativeStatus && currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" && !hasRecentRegisterFailure {
// #73 guard: heartbeat recovery must not resurrect a removed workspace.
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'degraded'`, models.StatusOnline, payload.WorkspaceID); err != nil {
log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err)
@@ -8,6 +8,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
@@ -112,9 +113,9 @@ func TestHeartbeatHandler_OfflineToOnline(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT — currently offline
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-offline").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("offline"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("offline", nil))
// Expect status transition back to online
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -162,9 +163,9 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT — currently provisioning
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-provisioning").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("provisioning"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("provisioning", nil))
// Expect status transition to online (#1784)
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -213,9 +214,9 @@ func TestHeartbeatHandler_FailedToOnline(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus SELECT — currently failed (provision-timeout sweeper flip)
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-failed").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("failed"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("failed", nil))
// the new failed → online recovery transition
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -262,9 +263,9 @@ func TestHeartbeatHandler_AwaitingAgentToOnline(t *testing.T) {
WithArgs("ws-external", 0.0, "", 0, 60, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-external").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("awaiting_agent"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("awaiting_agent", nil))
// The new branch — UPDATE ... WHERE status = 'awaiting_agent'
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -385,9 +386,9 @@ func TestHeartbeatHandler_OnlineStaysOnline(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus: online with error_rate 0.2 — below 0.5 threshold, stays online
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-stable").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -434,9 +435,9 @@ func TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus: currentStatus = online
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-wedged").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// The wedge-handling branch fires the degraded UPDATE with the
// `AND status = 'online'` guard (race-safe against concurrent
@@ -487,9 +488,9 @@ func TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// currentStatus = degraded
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-still-wedged").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
// No additional UPDATE expected — the recovery branch's
// `runtime_state == ""` guard blocks the flip back to online.
@@ -532,9 +533,9 @@ func TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears(t *testing.T) {
WithArgs("ws-recovered", 0.0, "", 0, 30, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-recovered").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("degraded", nil))
// Recovery UPDATE fires (degraded → online).
mock.ExpectExec("UPDATE workspaces SET status =").
@@ -735,7 +736,7 @@ func TestHeartbeat_SkipsRemovedRows(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 0))
// evaluateStatus SELECT
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-zombie").
WillReturnError(sql.ErrNoRows) // row effectively removed from view
@@ -776,9 +777,9 @@ func TestHeartbeatHandler_BackfillsAgentCard_WhenNull(t *testing.T) {
WithArgs("ws-nocard", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-nocard").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow(models.StatusOnline))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow(models.StatusOnline, nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -816,9 +817,9 @@ func TestHeartbeatHandler_SkipsAgentCardBackfill_WhenAlreadySet(t *testing.T) {
WithArgs("ws-hascard", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-hascard").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow(models.StatusOnline))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow(models.StatusOnline, nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1454,9 +1455,9 @@ func TestHeartbeat_MonthlySpend_WithinBounds(t *testing.T) {
WithArgs("ws-spend-ok", 0.0, "", 0, 0, "", int64(15000)). // $150.00
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-spend-ok").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1490,9 +1491,9 @@ func TestHeartbeat_MonthlySpend_NegativeClamped(t *testing.T) {
WithArgs("ws-spend-neg", 0.0, "", 0, 0, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-spend-neg").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1526,9 +1527,9 @@ func TestHeartbeat_MonthlySpend_OverflowClamped(t *testing.T) {
WithArgs("ws-spend-overflow", 0.0, "", 0, 0, "", int64(1_000_000_000_000)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-spend-overflow").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1561,9 +1562,9 @@ func TestHeartbeat_MonthlySpend_ExactCap(t *testing.T) {
WithArgs("ws-spend-cap", 0.0, "", 0, 0, "", int64(1_000_000_000_000)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-spend-cap").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1597,9 +1598,9 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) {
WithArgs("ws-spend-zero", 0.0, "", 0, 0, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id").
WithArgs("ws-spend-zero").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -2202,9 +2203,9 @@ func TestHeartbeatHandler_DeliversPlatformInboundSecret(t *testing.T) {
WithArgs("ws-with-secret", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-with-secret").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// readOrLazyHealInboundSecret — short-circuit: secret already on file.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
@@ -2257,9 +2258,9 @@ func TestHeartbeatHandler_LazyHealsPlatformInboundSecret(t *testing.T) {
WithArgs("ws-needs-heal", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-needs-heal").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// readOrLazyHealInboundSecret — NULL column triggers mint.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
@@ -2313,9 +2314,9 @@ func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) {
WithArgs("ws-heal-fails", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-heal-fails").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
// Read returns NULL → mint is attempted...
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
@@ -2349,3 +2350,239 @@ func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_FailureRecordsLastRegisterFailure (#2530 / #2585): an
// AUTHENTICATED non-200 register must stamp last_register_failure_at so
// heartbeat can surface degraded status. Unauthenticated 401s must NOT stamp.
func TestRegister_FailureRecordsLastRegisterFailure(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// Bootstrap-allowed: no live tokens → requireWorkspaceToken returns nil,
// so authOK becomes true.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-reg-fail").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// Authenticated post-auth failure: push-mode with no URL 400s at the
// url-required check (AFTER requireWorkspaceToken sets authOK=true), so the
// deferred handler stamps last_register_failure_at. (An invalid delivery_mode
// would 400 BEFORE auth — authOK=false — and must NOT stamp.)
mock.ExpectExec("UPDATE workspaces SET last_register_failure_at = now").
WithArgs("ws-reg-fail").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-reg-fail","agent_card":{"name":"test"},"delivery_mode":"push"}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_Unauthenticated401DoesNotStamp (#2585): a 401 from missing or
// invalid bearer must NOT stamp last_register_failure_at, otherwise an
// unauthenticated caller could force any workspace into degraded status.
func TestRegister_Unauthenticated401DoesNotStamp(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// HasAnyLiveToken returns 1 — workspace already has an active token.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-reg-unauth").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-reg-unauth","url":"http://example.com","agent_card":{"name":"test"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d: %s", w.Code, w.Body.String())
}
// No UPDATE expectation — unauthenticated 401 must not mutate workspace state.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_SuccessClearsLastRegisterFailure (#2530): a successful register
// must clear last_register_failure_at so heartbeat can recover to online.
func TestRegister_SuccessClearsLastRegisterFailure(t *testing.T) {
// SaaS mode so the platform-tunnel hostname (ws-reg-ok.moleculesai.app) is
// allowed while its DNS settles, instead of failing the SSRF DNS lookup in CI.
t.Setenv("MOLECULE_DEPLOY_MODE", "saas")
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// Mock sequence mirrors the actual Register success path in registry.go.
// 1. requireWorkspaceToken → HasAnyLiveToken: no live tokens → bootstrap-allowed.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-reg-ok").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// 2. resolveDeliveryMode (production selects delivery_mode AND runtime).
mock.ExpectQuery("SELECT delivery_mode, runtime FROM workspaces").
WithArgs("ws-reg-ok").
WillReturnError(sql.ErrNoRows)
// 3. agent_card identity reconcile (best-effort; no row yet).
mock.ExpectQuery("SELECT name, role FROM workspaces").
WithArgs("ws-reg-ok").
WillReturnError(sql.ErrNoRows)
// 4. Upsert workspace row.
mock.ExpectExec("INSERT INTO workspaces").
WillReturnResult(sqlmock.NewResult(0, 1))
// 5. Read-back URL for the WORKSPACE_ONLINE broadcast (best-effort).
mock.ExpectQuery("SELECT url FROM workspaces").
WithArgs("ws-reg-ok").
WillReturnError(sql.ErrNoRows)
// 6-7. Issue token (no live tokens → mint).
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-reg-ok").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(0, 1))
// 8-9. Lazy-heal platform_inbound_secret: read misses (ErrNoRows →
// ErrNoInboundSecret) so IssuePlatformInboundSecret mints inline.
mock.ExpectQuery("SELECT platform_inbound_secret FROM workspaces").
WithArgs("ws-reg-ok").
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("UPDATE workspaces SET platform_inbound_secret").
WithArgs(sqlmock.AnyArg(), "ws-reg-ok").
WillReturnResult(sqlmock.NewResult(0, 1))
// 10. Clear last_register_failure_at on success.
mock.ExpectExec("UPDATE workspaces SET last_register_failure_at = NULL").
WithArgs("ws-reg-ok").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-reg-ok","url":"http://ws-reg-ok.moleculesai.app/a2a","agent_card":{"name":"test"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestHeartbeat_RecentRegisterFailure_DegradesWorkspace (#2530): when
// last_register_failure_at is within the 5-minute window, heartbeat must
// flip the workspace from online to degraded.
func TestHeartbeat_RecentRegisterFailure_DegradesWorkspace(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-degrade-reg").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
// heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-degrade-reg", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus SELECT — online with recent register failure
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-degrade-reg").
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).
AddRow("online", time.Now().Add(-2*time.Minute)))
// Degrade UPDATE
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs(models.StatusDegraded, "ws-degrade-reg").
WillReturnResult(sqlmock.NewResult(0, 1))
// Broadcast degraded event
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-degrade-reg","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestHeartbeat_RecentRegisterFailure_BlocksRecovery (#2530): a degraded
// workspace with a recent register failure must NOT be flipped back to online
// by heartbeat, even when error_rate is low and runtime_state is empty.
func TestHeartbeat_RecentRegisterFailure_BlocksRecovery(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-no-recover").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
// heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-no-recover", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus SELECT — degraded with recent register failure
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-no-recover").
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).
AddRow("degraded", time.Now().Add(-2*time.Minute)))
// NO recovery UPDATE expected — register failure blocks recovery.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-no-recover","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE workspaces DROP COLUMN IF EXISTS last_register_failure_at;
COMMIT;
@@ -0,0 +1,8 @@
-- #2530: track persistent register failures so heartbeat can surface
-- degraded status when a workspace cannot re-register (e.g. lost auth token
-- after container re-create).
BEGIN;
ALTER TABLE workspaces ADD COLUMN IF NOT EXISTS last_register_failure_at TIMESTAMPTZ;
COMMIT;